From 02aea8526077845bfa8271727abbb579def3be5a Mon Sep 17 00:00:00 2001 From: Charlie Date: Fri, 23 Jan 2026 11:29:14 -0500 Subject: [PATCH] Handle empty datatables responses --- app/datatables.py | 125 +++++++++++++++++++++++++++------------------- 1 file changed, 74 insertions(+), 51 deletions(-) diff --git a/app/datatables.py b/app/datatables.py index 3228585..4e93c09 100644 --- a/app/datatables.py +++ b/app/datatables.py @@ -2,6 +2,7 @@ from flask import request from flask_sqlalchemy import SQLAlchemy from sqlalchemy import text, func, or_, cast, String, select +from sqlalchemy.exc import OperationalError, ProgrammingError from sqlalchemy.sql import Select # `db` will be provided by the application using this module. @@ -37,13 +38,27 @@ def datatables_response(query): query: sqlalchemy.sql.Select | str Query to execute. May be a SQLAlchemy Select object or a raw SQL string. """ + values = request.values + + def empty_response(columns): + return { + "draw": int(values.get("draw", 1)), + "recordsTotal": 0, + "recordsFiltered": 0, + "data": [], + "columns": list(columns), + } + if isinstance(query, str): base_sql = query.rstrip(";") - values = request.values start = values.get("start", 0, type=int) length = values.get("length", 20, type=int) - columns = query_columns(query) + try: + columns = query_columns(query) + except (OperationalError, ProgrammingError): + return empty_response([]) + quoted_cols = [f'"{c}"' for c in columns] search_value = values.get("search[value]") @@ -79,31 +94,35 @@ def datatables_response(query): filters.append(f"{col_expr} {like_op} :col_{idx}") base_select = f"SELECT * FROM ({base_sql}) AS q" - total_records = db.session.scalar( - text(f"SELECT COUNT(*) FROM ({base_sql}) AS q") - ) - - filtered_sql = base_select - if filters: - filtered_sql += " WHERE " + " AND ".join(filters) - - records_filtered = db.session.scalar( - text(f"SELECT COUNT(*) FROM ({filtered_sql}) AS sq"), - params, - ) - - order_idx = values.get("order[0][column]", type=int) - if order_idx is not None and 0 <= order_idx < len(columns): - col = columns[order_idx] - direction = ( - "DESC" if values.get("order[0][dir]", "asc") == "desc" else "ASC" + try: + total_records = db.session.scalar( + text(f"SELECT COUNT(*) FROM ({base_sql}) AS q") + ) + + filtered_sql = base_select + if filters: + filtered_sql += " WHERE " + " AND ".join(filters) + + records_filtered = db.session.scalar( + text(f"SELECT COUNT(*) FROM ({filtered_sql}) AS sq"), + params, ) - filtered_sql += f' ORDER BY "{col}" {direction}' - paginated_sql = filtered_sql + " LIMIT :limit OFFSET :offset" - params.update({"limit": length, "offset": start}) + order_idx = values.get("order[0][column]", type=int) + if order_idx is not None and 0 <= order_idx < len(columns): + col = columns[order_idx] + direction = ( + "DESC" if values.get("order[0][dir]", "asc") == "desc" else "ASC" + ) + filtered_sql += f' ORDER BY "{col}" {direction}' + + paginated_sql = filtered_sql + " LIMIT :limit OFFSET :offset" + params.update({"limit": length, "offset": start}) + + rows = db.session.execute(text(paginated_sql), params).mappings().all() + except (OperationalError, ProgrammingError): + return empty_response(columns) - rows = db.session.execute(text(paginated_sql), params).mappings().all() data = [dict(r) for r in rows] return { @@ -120,34 +139,38 @@ def datatables_response(query): # Ensure ORM selections return column mappings instead of model objects columns = [c.key for c in query.selected_columns] base_query = query.with_only_columns(query.selected_columns) - total_records = _execute_count(base_query) + try: + total_records = _execute_count(base_query) + + search_value = values.get("search[value]") + if search_value: + filters = [ + cast(c, String).ilike(f"%{search_value}%") + for c in query.selected_columns + ] + base_query = base_query.where(or_(*filters)) + + for idx, col in enumerate(query.selected_columns): + val = values.get(f"columns[{idx}][search][value]") + if val: + base_query = base_query.where(cast(col, String).ilike(f"%{val}%")) + + order_idx = values.get("order[0][column]") + if order_idx is not None: + col = query.selected_columns[int(order_idx)] + if values.get("order[0][dir]", "asc") == "desc": + col = col.desc() + base_query = base_query.order_by(col) + + records_filtered = _execute_count(base_query) + + start = values.get("start", 0, type=int) + length = values.get("length", 20, type=int) + paginated = base_query.offset(start).limit(length) + rows = db.session.execute(paginated).all() + except (OperationalError, ProgrammingError): + return empty_response(columns) - values = request.values - search_value = values.get("search[value]") - if search_value: - filters = [ - cast(c, String).ilike(f"%{search_value}%") for c in query.selected_columns - ] - base_query = base_query.where(or_(*filters)) - - for idx, col in enumerate(query.selected_columns): - val = values.get(f"columns[{idx}][search][value]") - if val: - base_query = base_query.where(cast(col, String).ilike(f"%{val}%")) - - order_idx = values.get("order[0][column]") - if order_idx is not None: - col = query.selected_columns[int(order_idx)] - if values.get("order[0][dir]", "asc") == "desc": - col = col.desc() - base_query = base_query.order_by(col) - - records_filtered = _execute_count(base_query) - - start = values.get("start", 0, type=int) - length = values.get("length", 20, type=int) - paginated = base_query.offset(start).limit(length) - rows = db.session.execute(paginated).all() data = [dict(r._mapping) for r in rows] return {