From 999df1552f4758a86f1e687ae6bf2eda2aa6506f Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Tue, 26 Sep 2023 18:11:39 -0400 Subject: [PATCH 1/9] use sqlparse.parse upfront. takes too long --- ossdbtoolsservice/query/batch.py | 15 +++++++-------- ossdbtoolsservice/query/query.py | 15 +++++++++------ 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/ossdbtoolsservice/query/batch.py b/ossdbtoolsservice/query/batch.py index c30dd17db..c2a5009de 100644 --- a/ossdbtoolsservice/query/batch.py +++ b/ossdbtoolsservice/query/batch.py @@ -8,6 +8,7 @@ import psycopg import uuid import sqlparse +from sqlparse.sql import Statement from ossdbtoolsservice.driver import ServerConnection from ossdbtoolsservice.utils.time import get_time_str, get_elapsed_time_str @@ -42,7 +43,7 @@ class Batch: def __init__( self, - batch_text: str, + statement: Statement, ordinal: int, selection: SelectionData, batch_events: BatchEvents = None, @@ -50,7 +51,8 @@ def __init__( ) -> None: self.id = ordinal self.selection = selection - self.batch_text = batch_text + self.statement = statement + self.batch_text = str(statement) self.status_message: str = None self._execution_start_time: datetime = None @@ -201,14 +203,11 @@ def create_result_set(storage_type: ResultSetStorageType, result_set_id: int, ba return InMemoryResultSet(result_set_id, batch_id) -def create_batch(batch_text: str, ordinal: int, selection: SelectionData, batch_events: BatchEvents, storage_type: ResultSetStorageType) -> Batch: - sql = sqlparse.parse(batch_text) - statement = sql[0] - +def create_batch(statement: Statement, ordinal: int, selection: SelectionData, batch_events: BatchEvents, storage_type: ResultSetStorageType) -> Batch: if statement.get_type().lower() == 'select': into_checker = [True for token in statement.tokens if token.normalized == 'INTO'] cte_checker = [True for token in statement.tokens if token.ttype == sqlparse.tokens.Keyword.CTE] if len(into_checker) == 0 and len(cte_checker) == 0: # SELECT INTO and CTE keywords can't be used in named cursor - return SelectBatch(batch_text, ordinal, selection, batch_events, storage_type) + return SelectBatch(statement, ordinal, selection, batch_events, storage_type) - return Batch(batch_text, ordinal, selection, batch_events, storage_type) + return Batch(statement, ordinal, selection, batch_events, storage_type) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index 941ac0bae..c1b1127e0 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -7,6 +7,7 @@ from typing import Callable, Dict, List, Optional # noqa import sqlparse +from sqlparse.sql import Statement from ossdbtoolsservice.driver import ServerConnection from ossdbtoolsservice.query import Batch, BatchEvents, create_batch, ResultSetStorageType from ossdbtoolsservice.query.contracts import SaveResultsRequestParams, SelectionData @@ -64,10 +65,11 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self.is_canceled = False # Initialize the batches - statements = sqlparse.split(query_text) + statements = sqlparse.parse(query_text) selection_data = compute_selection_data_for_batches(statements, query_text) - for index, batch_text in enumerate(statements): + for index, batch in enumerate(statements): + batch_text = str(batch) # Skip any empty text formatted_text = sqlparse.format(batch_text, strip_comments=True).strip() if not formatted_text or formatted_text == ';': @@ -89,7 +91,7 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self._user_transaction = True batch = create_batch( - sql_statement_text, + batch, len(self.batches), selection_data[index], query_events.batch_events, @@ -168,7 +170,7 @@ def save_as(self, params: SaveResultsRequestParams, file_factory: FileStreamFact self.batches[params.batch_index].save_as(params, file_factory, on_success, on_failure) -def compute_selection_data_for_batches(batches: List[str], full_text: str) -> List[SelectionData]: +def compute_selection_data_for_batches(batches: List[Statement], full_text: str) -> List[SelectionData]: # Map the starting index of each line to the line number line_map: Dict[int, int] = {} search_offset = 0 @@ -181,14 +183,15 @@ def compute_selection_data_for_batches(batches: List[str], full_text: str) -> Li selection_data: List[SelectionData] = [] search_offset = 0 for batch in batches: + batch_text = str(batch) # Calculate the starting line number and column - start_index = full_text.index(batch, search_offset) + start_index = full_text.index(batch_text, search_offset) start_line_index = max(filter(lambda line_index: line_index <= start_index, line_map.keys())) start_line_num = line_map[start_line_index] start_col_num = start_index - start_line_index # Calculate the ending line number and column - end_index = start_index + len(batch) + end_index = start_index + len(batch_text) end_line_index = max(filter(lambda line_index: line_index < end_index, line_map.keys())) end_line_num = line_map[end_line_index] end_col_num = end_index - end_line_index From 8cc0d049abe29e61996994c60cc249a19b719e39 Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Tue, 26 Sep 2023 18:11:56 -0400 Subject: [PATCH 2/9] Revert "use sqlparse.parse upfront. takes too long" This reverts commit 999df1552f4758a86f1e687ae6bf2eda2aa6506f. --- ossdbtoolsservice/query/batch.py | 15 ++++++++------- ossdbtoolsservice/query/query.py | 15 ++++++--------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/ossdbtoolsservice/query/batch.py b/ossdbtoolsservice/query/batch.py index c2a5009de..c30dd17db 100644 --- a/ossdbtoolsservice/query/batch.py +++ b/ossdbtoolsservice/query/batch.py @@ -8,7 +8,6 @@ import psycopg import uuid import sqlparse -from sqlparse.sql import Statement from ossdbtoolsservice.driver import ServerConnection from ossdbtoolsservice.utils.time import get_time_str, get_elapsed_time_str @@ -43,7 +42,7 @@ class Batch: def __init__( self, - statement: Statement, + batch_text: str, ordinal: int, selection: SelectionData, batch_events: BatchEvents = None, @@ -51,8 +50,7 @@ def __init__( ) -> None: self.id = ordinal self.selection = selection - self.statement = statement - self.batch_text = str(statement) + self.batch_text = batch_text self.status_message: str = None self._execution_start_time: datetime = None @@ -203,11 +201,14 @@ def create_result_set(storage_type: ResultSetStorageType, result_set_id: int, ba return InMemoryResultSet(result_set_id, batch_id) -def create_batch(statement: Statement, ordinal: int, selection: SelectionData, batch_events: BatchEvents, storage_type: ResultSetStorageType) -> Batch: +def create_batch(batch_text: str, ordinal: int, selection: SelectionData, batch_events: BatchEvents, storage_type: ResultSetStorageType) -> Batch: + sql = sqlparse.parse(batch_text) + statement = sql[0] + if statement.get_type().lower() == 'select': into_checker = [True for token in statement.tokens if token.normalized == 'INTO'] cte_checker = [True for token in statement.tokens if token.ttype == sqlparse.tokens.Keyword.CTE] if len(into_checker) == 0 and len(cte_checker) == 0: # SELECT INTO and CTE keywords can't be used in named cursor - return SelectBatch(statement, ordinal, selection, batch_events, storage_type) + return SelectBatch(batch_text, ordinal, selection, batch_events, storage_type) - return Batch(statement, ordinal, selection, batch_events, storage_type) + return Batch(batch_text, ordinal, selection, batch_events, storage_type) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index c1b1127e0..941ac0bae 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -7,7 +7,6 @@ from typing import Callable, Dict, List, Optional # noqa import sqlparse -from sqlparse.sql import Statement from ossdbtoolsservice.driver import ServerConnection from ossdbtoolsservice.query import Batch, BatchEvents, create_batch, ResultSetStorageType from ossdbtoolsservice.query.contracts import SaveResultsRequestParams, SelectionData @@ -65,11 +64,10 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self.is_canceled = False # Initialize the batches - statements = sqlparse.parse(query_text) + statements = sqlparse.split(query_text) selection_data = compute_selection_data_for_batches(statements, query_text) - for index, batch in enumerate(statements): - batch_text = str(batch) + for index, batch_text in enumerate(statements): # Skip any empty text formatted_text = sqlparse.format(batch_text, strip_comments=True).strip() if not formatted_text or formatted_text == ';': @@ -91,7 +89,7 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self._user_transaction = True batch = create_batch( - batch, + sql_statement_text, len(self.batches), selection_data[index], query_events.batch_events, @@ -170,7 +168,7 @@ def save_as(self, params: SaveResultsRequestParams, file_factory: FileStreamFact self.batches[params.batch_index].save_as(params, file_factory, on_success, on_failure) -def compute_selection_data_for_batches(batches: List[Statement], full_text: str) -> List[SelectionData]: +def compute_selection_data_for_batches(batches: List[str], full_text: str) -> List[SelectionData]: # Map the starting index of each line to the line number line_map: Dict[int, int] = {} search_offset = 0 @@ -183,15 +181,14 @@ def compute_selection_data_for_batches(batches: List[Statement], full_text: str) selection_data: List[SelectionData] = [] search_offset = 0 for batch in batches: - batch_text = str(batch) # Calculate the starting line number and column - start_index = full_text.index(batch_text, search_offset) + start_index = full_text.index(batch, search_offset) start_line_index = max(filter(lambda line_index: line_index <= start_index, line_map.keys())) start_line_num = line_map[start_line_index] start_col_num = start_index - start_line_index # Calculate the ending line number and column - end_index = start_index + len(batch_text) + end_index = start_index + len(batch) end_line_index = max(filter(lambda line_index: line_index < end_index, line_map.keys())) end_line_num = line_map[end_line_index] end_col_num = end_index - end_line_index From 48ec86d905cf0d9f9773ae8ca69662b0101af6e2 Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Tue, 26 Sep 2023 18:18:11 -0400 Subject: [PATCH 3/9] improve compute_selection_data_for_batches performance --- ossdbtoolsservice/query/query.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index 941ac0bae..56c2e82eb 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -180,16 +180,26 @@ def compute_selection_data_for_batches(batches: List[str], full_text: str) -> Li # Iterate through the batches to build selection data selection_data: List[SelectionData] = [] search_offset = 0 + line_map_keys = sorted(line_map.keys()) + l, r = 0, 0 + start_line_index, end_line_index = 0, 0 for batch in batches: # Calculate the starting line number and column - start_index = full_text.index(batch, search_offset) - start_line_index = max(filter(lambda line_index: line_index <= start_index, line_map.keys())) - start_line_num = line_map[start_line_index] - start_col_num = start_index - start_line_index + start_index = full_text.index(batch, search_offset) # batch start index + # start_line_index = max(filter(lambda line_index: line_index <= start_index, line_map_keys)) # find the character index of the batch start line + while l < len(line_map_keys) and line_map_keys[l] <= start_index: + start_line_index = line_map_keys[l] + l += 1 + + start_line_num = line_map[start_line_index] # map that to the line number + start_col_num = start_index - start_line_index # Calculate the ending line number and column end_index = start_index + len(batch) - end_line_index = max(filter(lambda line_index: line_index < end_index, line_map.keys())) + # end_line_index = max(filter(lambda line_index: line_index < end_index, line_map_keys)) + while r < len(line_map_keys) and line_map_keys[r] < end_index: + end_line_index = line_map_keys[r] + r += 1 end_line_num = line_map[end_line_index] end_col_num = end_index - end_line_index From 4f53fe9d4749d6a07b5492031c437a804031e0c5 Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Wed, 27 Sep 2023 03:16:23 -0400 Subject: [PATCH 4/9] one shot query execution --- ossdbtoolsservice/query/batch.py | 16 ++++----- ossdbtoolsservice/query/query.py | 61 +++++++++++++++++--------------- 2 files changed, 40 insertions(+), 37 deletions(-) diff --git a/ossdbtoolsservice/query/batch.py b/ossdbtoolsservice/query/batch.py index c30dd17db..bbfa69e9d 100644 --- a/ossdbtoolsservice/query/batch.py +++ b/ossdbtoolsservice/query/batch.py @@ -202,13 +202,13 @@ def create_result_set(storage_type: ResultSetStorageType, result_set_id: int, ba def create_batch(batch_text: str, ordinal: int, selection: SelectionData, batch_events: BatchEvents, storage_type: ResultSetStorageType) -> Batch: - sql = sqlparse.parse(batch_text) - statement = sql[0] - - if statement.get_type().lower() == 'select': - into_checker = [True for token in statement.tokens if token.normalized == 'INTO'] - cte_checker = [True for token in statement.tokens if token.ttype == sqlparse.tokens.Keyword.CTE] - if len(into_checker) == 0 and len(cte_checker) == 0: # SELECT INTO and CTE keywords can't be used in named cursor - return SelectBatch(batch_text, ordinal, selection, batch_events, storage_type) + # sql = sqlparse.parse(batch_text) + # statement = sql[0] + + # if statement.get_type().lower() == 'select': + # into_checker = [True for token in statement.tokens if token.normalized == 'INTO'] + # cte_checker = [True for token in statement.tokens if token.ttype == sqlparse.tokens.Keyword.CTE] + # if len(into_checker) == 0 and len(cte_checker) == 0: # SELECT INTO and CTE keywords can't be used in named cursor + # return SelectBatch(batch_text, ordinal, selection, batch_events, storage_type) return Batch(batch_text, ordinal, selection, batch_events, storage_type) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index 56c2e82eb..8cca7a58e 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -64,38 +64,41 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self.is_canceled = False # Initialize the batches - statements = sqlparse.split(query_text) - selection_data = compute_selection_data_for_batches(statements, query_text) - - for index, batch_text in enumerate(statements): - # Skip any empty text - formatted_text = sqlparse.format(batch_text, strip_comments=True).strip() - if not formatted_text or formatted_text == ';': - continue - - sql_statement_text = batch_text - - # Create and save the batch - if bool(self._execution_plan_options): - if self._execution_plan_options.include_estimated_execution_plan_xml: - sql_statement_text = Query.EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) - elif self._execution_plan_options.include_actual_execution_plan_xml: - self._disable_auto_commit = True - sql_statement_text = Query.ANALYZE_EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) - - # Check if user defined transaction - if formatted_text.lower().startswith('begin'): + # statements = sqlparse.split(query_text) + selection_data = compute_selection_data_for_batches([query_text], query_text) + + # for index, batch_text in enumerate(statements): + index = 0 + batch_text = query_text + # Skip any empty text + # formatted_text = sqlparse.format(batch_text, strip_comments=True).strip() + # if not formatted_text or formatted_text == ';': + # return None + formatted_text = batch_text + + sql_statement_text = batch_text + + # Create and save the batch + if bool(self._execution_plan_options): + if self._execution_plan_options.include_estimated_execution_plan_xml: + sql_statement_text = Query.EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) + elif self._execution_plan_options.include_actual_execution_plan_xml: self._disable_auto_commit = True - self._user_transaction = True + sql_statement_text = Query.ANALYZE_EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) - batch = create_batch( - sql_statement_text, - len(self.batches), - selection_data[index], - query_events.batch_events, - query_execution_settings.result_set_storage_type) + # Check if user defined transaction + if formatted_text.lower().startswith('begin'): + self._disable_auto_commit = True + self._user_transaction = True - self._batches.append(batch) + batch = create_batch( + sql_statement_text, + len(self.batches), + selection_data[index], + query_events.batch_events, + query_execution_settings.result_set_storage_type) + + self._batches.append(batch) @property def owner_uri(self) -> str: From a562ca5225e0e0c90a79fe30a57b9b4c9bc58487 Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Sun, 1 Oct 2023 21:08:57 -0400 Subject: [PATCH 5/9] add transaction states --- ossdbtoolsservice/driver/types/driver.py | 16 ++++++++++++++++ .../driver/types/psycopg_driver.py | 18 ++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/ossdbtoolsservice/driver/types/driver.py b/ossdbtoolsservice/driver/types/driver.py index 5bd2f680d..2f73c8470 100644 --- a/ossdbtoolsservice/driver/types/driver.py +++ b/ossdbtoolsservice/driver/types/driver.py @@ -61,6 +61,11 @@ def default_database(self) -> str: def database_error(self) -> Exception: """ Returns the type of database error this connection throws""" + @property + @abstractmethod + def transaction_is_active(self) -> bool: + """Returns bool indicating if transaction is active""" + @property @abstractmethod def transaction_in_error(self) -> bool: @@ -71,6 +76,11 @@ def transaction_in_error(self) -> bool: def transaction_is_idle(self) -> bool: """Returns bool indicating if transaction is currently idle""" + @property + @abstractmethod + def transaction_in_unknown(self) -> bool: + """Returns bool indicating if transaction is active""" + @property @abstractmethod def transaction_in_trans(self) -> bool: @@ -179,6 +189,12 @@ def close(self): Closes this current connection. """ + + def transaction_status(self): + """ + Gets the current transaction status if it exists + """ + @abstractmethod def set_transaction_in_error(self): """ diff --git a/ossdbtoolsservice/driver/types/psycopg_driver.py b/ossdbtoolsservice/driver/types/psycopg_driver.py index 8cd2711a9..13c72c70b 100644 --- a/ossdbtoolsservice/driver/types/psycopg_driver.py +++ b/ossdbtoolsservice/driver/types/psycopg_driver.py @@ -150,6 +150,11 @@ def database_error(self): """Returns the type of database error this connection throws""" return self._database_error + @property + def transaction_is_active(self) -> bool: + """Returns bool indicating if transaction is active""" + return self._conn.info.transaction_status is TransactionStatus.ACTIVE + @property def transaction_in_error(self) -> bool: """Returns bool indicating if transaction is in error""" @@ -159,6 +164,11 @@ def transaction_in_error(self) -> bool: def transaction_is_idle(self) -> bool: """Returns bool indicating if transaction is currently idle""" return self._conn.info.transaction_status is TransactionStatus.IDLE + + @property + def transaction_in_unknown(self) -> bool: + """Returns bool indicating if transaction is in unknown state""" + return self._conn.info.transaction_status is TransactionStatus.UNKNOWN @property def transaction_in_trans(self) -> bool: @@ -315,6 +325,14 @@ def close(self): """ self._conn.close() + def transaction_status(self): + """ + Gets the current transaction status if it exists + """ + if self._conn and self._conn.info: + return self._conn.info.transaction_status + return None + def set_transaction_in_error(self): """ Sets if current connection is in error From 2044378975d3ab634222a7df4b7e7e461beda637 Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Mon, 2 Oct 2023 10:37:29 -0400 Subject: [PATCH 6/9] initial changes to query.execute --- ossdbtoolsservice/query/batch.py | 5 +- ossdbtoolsservice/query/query.py | 135 ++++++++++++++++++++++--------- 2 files changed, 99 insertions(+), 41 deletions(-) diff --git a/ossdbtoolsservice/query/batch.py b/ossdbtoolsservice/query/batch.py index bbfa69e9d..f5dfe1d75 100644 --- a/ossdbtoolsservice/query/batch.py +++ b/ossdbtoolsservice/query/batch.py @@ -201,7 +201,7 @@ def create_result_set(storage_type: ResultSetStorageType, result_set_id: int, ba return InMemoryResultSet(result_set_id, batch_id) -def create_batch(batch_text: str, ordinal: int, selection: SelectionData, batch_events: BatchEvents, storage_type: ResultSetStorageType) -> Batch: +def create_batch(batch_text: str, ordinal: int, selection: SelectionData, batch_events: BatchEvents, storage_type: ResultSetStorageType, select_batch: bool = False) -> Batch: # sql = sqlparse.parse(batch_text) # statement = sql[0] @@ -210,5 +210,6 @@ def create_batch(batch_text: str, ordinal: int, selection: SelectionData, batch_ # cte_checker = [True for token in statement.tokens if token.ttype == sqlparse.tokens.Keyword.CTE] # if len(into_checker) == 0 and len(cte_checker) == 0: # SELECT INTO and CTE keywords can't be used in named cursor # return SelectBatch(batch_text, ordinal, selection, batch_events, storage_type) - + if select_batch: + return SelectBatch(batch_text, ordinal, selection, batch_events, storage_type) return Batch(batch_text, ordinal, selection, batch_events, storage_type) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index 8cca7a58e..483c8a1dd 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -3,6 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +import datetime from enum import Enum from typing import Callable, Dict, List, Optional # noqa @@ -11,6 +12,7 @@ from ossdbtoolsservice.query import Batch, BatchEvents, create_batch, ResultSetStorageType from ossdbtoolsservice.query.contracts import SaveResultsRequestParams, SelectionData from ossdbtoolsservice.query.data_storage import FileStreamFactory +import psycopg class QueryEvents: @@ -60,45 +62,47 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self._current_batch_index = 0 self._batches: List[Batch] = [] self._execution_plan_options = query_execution_settings.execution_plan_options + self._query_events = query_events + self._query_execution_settings = query_execution_settings self.is_canceled = False # Initialize the batches # statements = sqlparse.split(query_text) - selection_data = compute_selection_data_for_batches([query_text], query_text) - - # for index, batch_text in enumerate(statements): - index = 0 - batch_text = query_text - # Skip any empty text - # formatted_text = sqlparse.format(batch_text, strip_comments=True).strip() - # if not formatted_text or formatted_text == ';': - # return None - formatted_text = batch_text - - sql_statement_text = batch_text - - # Create and save the batch - if bool(self._execution_plan_options): - if self._execution_plan_options.include_estimated_execution_plan_xml: - sql_statement_text = Query.EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) - elif self._execution_plan_options.include_actual_execution_plan_xml: - self._disable_auto_commit = True - sql_statement_text = Query.ANALYZE_EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) - - # Check if user defined transaction - if formatted_text.lower().startswith('begin'): - self._disable_auto_commit = True - self._user_transaction = True - - batch = create_batch( - sql_statement_text, - len(self.batches), - selection_data[index], - query_events.batch_events, - query_execution_settings.result_set_storage_type) - - self._batches.append(batch) + # selection_data = compute_selection_data_for_batches([query_text], query_text) + + # # for index, batch_text in enumerate(statements): + # index = 0 + # batch_text = query_text + # # Skip any empty text + # # formatted_text = sqlparse.format(batch_text, strip_comments=True).strip() + # # if not formatted_text or formatted_text == ';': + # # return None + # formatted_text = batch_text + + # sql_statement_text = batch_text + + # # Create and save the batch + # if bool(self._execution_plan_options): + # if self._execution_plan_options.include_estimated_execution_plan_xml: + # sql_statement_text = Query.EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) + # elif self._execution_plan_options.include_actual_execution_plan_xml: + # self._disable_auto_commit = True + # sql_statement_text = Query.ANALYZE_EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) + + # # Check if user defined transaction + # if formatted_text.lower().startswith('begin'): + # self._disable_auto_commit = True + # self._user_transaction = True + + # batch = create_batch( + # sql_statement_text, + # len(self.batches), + # selection_data[index], + # query_events.batch_events, + # query_execution_settings.result_set_storage_type) + + # self._batches.append(batch) @property def owner_uri(self) -> str: @@ -119,6 +123,14 @@ def batches(self) -> List[Batch]: @property def current_batch_index(self) -> int: return self._current_batch_index + + @property + def query_events(self) -> QueryEvents: + return self._query_events + + @property + def query_execution_settings(self) -> QueryExecutionSettings: + return self._query_execution_settings def execute(self, connection: ServerConnection, retry_state=False): """ @@ -143,13 +155,52 @@ def execute(self, connection: ServerConnection, retry_state=False): if self._disable_auto_commit and connection.transaction_is_idle: connection.autocommit = False - for batch_index, batch in enumerate(self._batches): - self._current_batch_index = batch_index + # for batch_index, batch in enumerate(self._batches): + # self._current_batch_index = batch_index + + # if self.is_canceled: + # break + + # Start a cursor block + batch_events: BatchEvents = None + if self.query_events is not None and self.query_events.batch_events is not None: + batch_events = self.query_events.batch_events + with connection.cursor() as cur: + connection.connection.add_notice_handler(lambda msg: self.notice_handler(msg, connection)) + batch_ordinal = 0 + cur.execute(self.query_text) + curr_resultset = True + while curr_resultset: + batch_obj = create_batch( + None, + batch_ordinal, + None, + self.query_events.batch_events, + self.query_execution_settings.result_set_storage_type, + # Cursor description will have a result if it contains tuples + True + ) + + # use callbacks + if batch_events and batch_events._on_execution_started: + batch_events._on_execution_started(batch_obj) + + batch_obj.after_execute(cur) + + if batch_events and batch_events._on_execution_completed: + batch_events._on_execution_completed(batch_obj) + + if cur and cur.statusmessage is not None: + batch_obj.status_message = cur.statusmessage + + batch_obj._has_executed = True + + # Append the batch object and update while loop values + self.batches.append(batch_obj) + curr_resultset = cur.nextset() + batch_ordinal += 1 - if self.is_canceled: - break - batch.execute(connection) finally: # We can only set autocommit when the connection is open. if connection.open and connection.transaction_is_idle: @@ -157,6 +208,12 @@ def execute(self, connection: ServerConnection, retry_state=False): connection.set_user_transaction(False) self._disable_auto_commit = False self._execution_state = ExecutionState.EXECUTED + + def notice_handler(self, notice: str, conn: ServerConnection): + if not conn.user_transaction: + self._notices.append('{0}: {1}'.format(notice.severity, notice.message_primary)) + elif not notice.message_primary == 'there is already a transaction in progress': + self._notices.append('WARNING: {0}'.format(notice.message_primary)) def get_subset(self, batch_index: int, start_index: int, end_index: int): if batch_index < 0 or batch_index >= len(self._batches): From 88a7f2b08497c84d8915cff8371a34c50bcd34fd Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Mon, 2 Oct 2023 12:46:51 -0400 Subject: [PATCH 7/9] error messages and results showing in one-shot --- ossdbtoolsservice/query/query.py | 68 +++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index 483c8a1dd..d22f62cad 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -3,9 +3,9 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -import datetime +from datetime import datetime from enum import Enum -from typing import Callable, Dict, List, Optional # noqa +from typing import Callable, Dict, List, Optional, Tuple # noqa import sqlparse from ossdbtoolsservice.driver import ServerConnection @@ -66,6 +66,7 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self._query_execution_settings = query_execution_settings self.is_canceled = False + self.selection_data = compute_selection_data_for_batches([self.query_text], self.query_text)[0] # Initialize the batches # statements = sqlparse.split(query_text) @@ -168,35 +169,50 @@ def execute(self, connection: ServerConnection, retry_state=False): with connection.cursor() as cur: connection.connection.add_notice_handler(lambda msg: self.notice_handler(msg, connection)) batch_ordinal = 0 - cur.execute(self.query_text) + start_time = datetime.now() + + try: + cur.execute(self.query_text) + end_time = datetime.now() + except psycopg.DatabaseError as e: + end_time = datetime.now() + self.handle_database_error_during_execute(connection, (start_time, end_time), batch_events) + # Exit + raise e + curr_resultset = True while curr_resultset: batch_obj = create_batch( - None, + self.query_text, batch_ordinal, - None, + self.selection_data, self.query_events.batch_events, - self.query_execution_settings.result_set_storage_type, - # Cursor description will have a result if it contains tuples - True + self.query_execution_settings.result_set_storage_type ) - # use callbacks + # Only set end execution time to first batch summary as we cannot collect individual statement execution times + batch_obj._execution_start_time = start_time + if batch_ordinal == 0: + batch_obj._execution_end_time = end_time + + # Call start callback if batch_events and batch_events._on_execution_started: batch_events._on_execution_started(batch_obj) batch_obj.after_execute(cur) + batch_obj._has_executed = True + + # Call Completed callback if batch_events and batch_events._on_execution_completed: batch_events._on_execution_completed(batch_obj) if cur and cur.statusmessage is not None: batch_obj.status_message = cur.statusmessage - batch_obj._has_executed = True - # Append the batch object and update while loop values self.batches.append(batch_obj) + self._current_batch_index = batch_ordinal curr_resultset = cur.nextset() batch_ordinal += 1 @@ -208,12 +224,34 @@ def execute(self, connection: ServerConnection, retry_state=False): connection.set_user_transaction(False) self._disable_auto_commit = False self._execution_state = ExecutionState.EXECUTED + + def handle_database_error_during_execute(self, conn: ServerConnection, execution_times: Tuple[datetime, datetime], batch_events: BatchEvents): + start_time, end_time = execution_times + batch_obj = create_batch( + self.query_text, + 0, + self.selection_data, + self.query_events.batch_events, + self.query_execution_settings.result_set_storage_type + ) + batch_obj._execution_start_time = start_time + batch_obj._execution_end_time = end_time + + # Call start callback + if batch_events and batch_events._on_execution_started: + batch_events._on_execution_started(batch_obj) + + batch_obj._has_error = True + self.batches.append(batch_obj) + self._current_batch_index = 0 + conn.set_transaction_in_error() def notice_handler(self, notice: str, conn: ServerConnection): - if not conn.user_transaction: - self._notices.append('{0}: {1}'.format(notice.severity, notice.message_primary)) - elif not notice.message_primary == 'there is already a transaction in progress': - self._notices.append('WARNING: {0}'.format(notice.message_primary)) + if self.batches and len(self.batches) > 0: + if not conn.user_transaction: + self.batches[self.current_batch_index]._notices.append('{0}: {1}'.format(notice.severity, notice.message_primary)) + elif not notice.message_primary == 'there is already a transaction in progress': + self.batches[self.current_batch_index].append('WARNING: {0}'.format(notice.message_primary)) def get_subset(self, batch_index: int, start_index: int, end_index: int): if batch_index < 0 or batch_index >= len(self._batches): From cc39a29609e45dee526f1daf5628dea85b9f30fc Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Mon, 2 Oct 2023 12:52:55 -0400 Subject: [PATCH 8/9] cancelable --- ossdbtoolsservice/query/query.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index d22f62cad..30e0a073c 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -172,6 +172,8 @@ def execute(self, connection: ServerConnection, retry_state=False): start_time = datetime.now() try: + if self.is_canceled: + return cur.execute(self.query_text) end_time = datetime.now() except psycopg.DatabaseError as e: @@ -189,6 +191,9 @@ def execute(self, connection: ServerConnection, retry_state=False): self.query_events.batch_events, self.query_execution_settings.result_set_storage_type ) + # Break if canceled + if self.is_canceled: + break # Only set end execution time to first batch summary as we cannot collect individual statement execution times batch_obj._execution_start_time = start_time From 43685e3d17404343c456a735e3766c3747fbbb87 Mon Sep 17 00:00:00 2001 From: samir-puranik Date: Tue, 3 Oct 2023 11:58:43 -0400 Subject: [PATCH 9/9] show only first 100 results. show all the notices in last batch result --- ossdbtoolsservice/query/batch.py | 44 ------- ossdbtoolsservice/query/query.py | 123 +++++++----------- .../query_execution_service.py | 15 ++- ossdbtoolsservice/utils/constants.py | 4 + 4 files changed, 59 insertions(+), 127 deletions(-) diff --git a/ossdbtoolsservice/query/batch.py b/ossdbtoolsservice/query/batch.py index f5dfe1d75..96cfa8e58 100644 --- a/ossdbtoolsservice/query/batch.py +++ b/ossdbtoolsservice/query/batch.py @@ -107,50 +107,6 @@ def notices(self) -> List[str]: def get_cursor(self, connection: ServerConnection): return connection.cursor() - def execute(self, conn: ServerConnection) -> None: - """ - Execute the batch using a cursor retrieved from the given connection - - :raises DatabaseError: if an error is encountered while running the batch's query - """ - self._execution_start_time = datetime.now() - - if self._batch_events and self._batch_events._on_execution_started: - self._batch_events._on_execution_started(self) - - cursor = self.get_cursor(conn) - - conn.connection.add_notice_handler(lambda msg: self.notice_handler(msg, conn)) - - if self.batch_text.startswith('begin') and conn.transaction_in_trans: - self._notices.append('WARNING: there is already a transaction in progress') - - try: - cursor.execute(self.batch_text) - - # Commit the transaction if autocommit is True - if conn.autocommit: - conn.commit() - - self.after_execute(cursor) - except psycopg.DatabaseError as e: - self._has_error = True - conn.set_transaction_in_error() - raise e - finally: - if cursor and cursor.statusmessage is not None: - self.status_message = cursor.statusmessage - # We are doing this because when the execute fails for named cursors - # cursor is not activated on the server which results in failure on close - # Hence we are checking if the cursor was really executed for us to close it - if cursor and cursor.rowcount != -1 and cursor.rowcount is not None: - cursor.close() - self._has_executed = True - self._execution_end_time = datetime.now() - - if self._batch_events and self._batch_events._on_execution_completed: - self._batch_events._on_execution_completed(self) - def after_execute(self, cursor) -> None: if cursor.description is not None: self.create_result_set(cursor) diff --git a/ossdbtoolsservice/query/query.py b/ossdbtoolsservice/query/query.py index 30e0a073c..ff39386af 100644 --- a/ossdbtoolsservice/query/query.py +++ b/ossdbtoolsservice/query/query.py @@ -13,6 +13,7 @@ from ossdbtoolsservice.query.contracts import SaveResultsRequestParams, SelectionData from ossdbtoolsservice.query.data_storage import FileStreamFactory import psycopg +from utils import constants class QueryEvents: @@ -61,49 +62,27 @@ def __init__(self, owner_uri: str, query_text: str, query_execution_settings: Qu self._user_transaction = False self._current_batch_index = 0 self._batches: List[Batch] = [] + self._notices: List[str] = [] self._execution_plan_options = query_execution_settings.execution_plan_options self._query_events = query_events self._query_execution_settings = query_execution_settings self.is_canceled = False + # Use the same selection data for all batches. We want to avoid parsing and splitting into separate SQL statements self.selection_data = compute_selection_data_for_batches([self.query_text], self.query_text)[0] - # Initialize the batches - # statements = sqlparse.split(query_text) - # selection_data = compute_selection_data_for_batches([query_text], query_text) - - # # for index, batch_text in enumerate(statements): - # index = 0 - # batch_text = query_text - # # Skip any empty text - # # formatted_text = sqlparse.format(batch_text, strip_comments=True).strip() - # # if not formatted_text or formatted_text == ';': - # # return None - # formatted_text = batch_text - - # sql_statement_text = batch_text - # # Create and save the batch - # if bool(self._execution_plan_options): - # if self._execution_plan_options.include_estimated_execution_plan_xml: - # sql_statement_text = Query.EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) - # elif self._execution_plan_options.include_actual_execution_plan_xml: - # self._disable_auto_commit = True - # sql_statement_text = Query.ANALYZE_EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) - - # # Check if user defined transaction - # if formatted_text.lower().startswith('begin'): - # self._disable_auto_commit = True - # self._user_transaction = True - - # batch = create_batch( - # sql_statement_text, - # len(self.batches), - # selection_data[index], - # query_events.batch_events, - # query_execution_settings.result_set_storage_type) - - # self._batches.append(batch) + if bool(self._execution_plan_options): + if self._execution_plan_options.include_estimated_execution_plan_xml: + sql_statement_text = Query.EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) + elif self._execution_plan_options.include_actual_execution_plan_xml: + self._disable_auto_commit = True + sql_statement_text = Query.ANALYZE_EXPLAIN_QUERY_TEMPLATE.format(sql_statement_text) + + # Check if user defined transaction + if self.query_text.lower().startswith('begin'): + self._disable_auto_commit = True + self._user_transaction = True @property def owner_uri(self) -> str: @@ -156,19 +135,13 @@ def execute(self, connection: ServerConnection, retry_state=False): if self._disable_auto_commit and connection.transaction_is_idle: connection.autocommit = False - # for batch_index, batch in enumerate(self._batches): - # self._current_batch_index = batch_index - - # if self.is_canceled: - # break - # Start a cursor block batch_events: BatchEvents = None if self.query_events is not None and self.query_events.batch_events is not None: batch_events = self.query_events.batch_events + + connection.connection.add_notice_handler(lambda msg: self.notice_handler(msg, connection)) with connection.cursor() as cur: - connection.connection.add_notice_handler(lambda msg: self.notice_handler(msg, connection)) - batch_ordinal = 0 start_time = datetime.now() try: @@ -183,44 +156,34 @@ def execute(self, connection: ServerConnection, retry_state=False): raise e curr_resultset = True - while curr_resultset: - batch_obj = create_batch( - self.query_text, - batch_ordinal, - self.selection_data, - self.query_events.batch_events, - self.query_execution_settings.result_set_storage_type - ) + while curr_resultset and len(self.batches) <= constants.MAX_BATCH_RESULT_MESSAGES: # Break if canceled if self.is_canceled: break - # Only set end execution time to first batch summary as we cannot collect individual statement execution times - batch_obj._execution_start_time = start_time - if batch_ordinal == 0: - batch_obj._execution_end_time = end_time - - # Call start callback - if batch_events and batch_events._on_execution_started: - batch_events._on_execution_started(batch_obj) + # Create and append a new batch object + batch_obj = self.create_next_batch(self.current_batch_index, (start_time, end_time), batch_events) + # Create the result set if necessary and set to _has_executed batch_obj.after_execute(cur) - batch_obj._has_executed = True - # Call Completed callback - if batch_events and batch_events._on_execution_completed: - batch_events._on_execution_completed(batch_obj) - if cur and cur.statusmessage is not None: batch_obj.status_message = cur.statusmessage - # Append the batch object and update while loop values - self.batches.append(batch_obj) - self._current_batch_index = batch_ordinal + # Update while loop values curr_resultset = cur.nextset() - batch_ordinal += 1 - + self._current_batch_index += 1 + + # Call Completed callback + if batch_events and batch_events._on_execution_completed: + if not curr_resultset or len(self.batches) >= constants.MAX_BATCH_RESULT_MESSAGES: + batch_obj._notices = self._notices + batch_obj.notices.append(f"WARNING: This query has reached the max limit of {constants.MAX_BATCH_RESULT_MESSAGES} results. The rest of the query has been executed, but furthter results will not be shown") + batch_events._on_execution_completed(batch_obj) + break + else: + batch_events._on_execution_completed(batch_obj) finally: # We can only set autocommit when the connection is open. @@ -230,33 +193,41 @@ def execute(self, connection: ServerConnection, retry_state=False): self._disable_auto_commit = False self._execution_state = ExecutionState.EXECUTED - def handle_database_error_during_execute(self, conn: ServerConnection, execution_times: Tuple[datetime, datetime], batch_events: BatchEvents): + def create_next_batch(self, ordinal: int, execution_times: Tuple[datetime, datetime], batch_events: BatchEvents, empty_selection_data = False): start_time, end_time = execution_times batch_obj = create_batch( self.query_text, - 0, + self.current_batch_index, self.selection_data, self.query_events.batch_events, self.query_execution_settings.result_set_storage_type ) + self.batches.append(batch_obj) + + # Only set end execution time to first batch summary as we cannot collect individual statement execution times batch_obj._execution_start_time = start_time - batch_obj._execution_end_time = end_time + if self.current_batch_index == 0: + batch_obj._execution_end_time = end_time # Call start callback if batch_events and batch_events._on_execution_started: batch_events._on_execution_started(batch_obj) + return batch_obj + + def handle_database_error_during_execute(self, conn: ServerConnection, execution_times: Tuple[datetime, datetime], batch_events: BatchEvents): + batch_obj = self.create_next_batch(0, execution_times, batch_events) batch_obj._has_error = True self.batches.append(batch_obj) self._current_batch_index = 0 conn.set_transaction_in_error() - def notice_handler(self, notice: str, conn: ServerConnection): - if self.batches and len(self.batches) > 0: + def notice_handler(self, notice: psycopg.errors.Diagnostic, conn: ServerConnection): + # Add notices to last batch element if not conn.user_transaction: - self.batches[self.current_batch_index]._notices.append('{0}: {1}'.format(notice.severity, notice.message_primary)) + self._notices.append('{0}: {1}'.format(notice.severity, notice.message_primary)) elif not notice.message_primary == 'there is already a transaction in progress': - self.batches[self.current_batch_index].append('WARNING: {0}'.format(notice.message_primary)) + self._notices.append('WARNING: {0}'.format(notice.message_primary)) def get_subset(self, batch_index: int, start_index: int, end_index: int): if batch_index < 0 or batch_index >= len(self._batches): diff --git a/ossdbtoolsservice/query_execution/query_execution_service.py b/ossdbtoolsservice/query_execution/query_execution_service.py index 019c586f5..6fa353847 100644 --- a/ossdbtoolsservice/query_execution/query_execution_service.py +++ b/ossdbtoolsservice/query_execution/query_execution_service.py @@ -9,6 +9,7 @@ from typing import Callable, Dict, List # noqa import sqlparse import ntpath +from utils import constants from ossdbtoolsservice.hosting import RequestContext, ServiceProvider @@ -238,12 +239,6 @@ def _batch_execution_started_callback(batch: Batch) -> None: def _batch_execution_finished_callback(batch: Batch) -> None: # Send back notices as a separate message to avoid error coloring / highlighting of text - notices = batch.notices - if notices: - notice_messages = '\n'.join(notices) - notice_message_params = self.build_message_params(worker_args.owner_uri, batch.id, notice_messages, False) - _check_and_fire(worker_args.on_message_notification, notice_message_params) - batch_summary = batch.batch_summary # send query/resultSetComplete response @@ -255,6 +250,12 @@ def _batch_execution_finished_callback(batch: Batch) -> None: rows_message = _create_rows_affected_message(batch) message_params = self.build_message_params(worker_args.owner_uri, batch.id, rows_message, False) _check_and_fire(worker_args.on_message_notification, message_params) + + notices = batch.notices + if notices: + notice_messages = '\n'.join(notices) + notice_message_params = self.build_message_params(worker_args.owner_uri, batch.id, notice_messages, False) + _check_and_fire(worker_args.on_message_notification, notice_message_params) # send query/batchComplete and query/complete response batch_event_params = BatchNotificationParams(batch_summary, worker_args.owner_uri) @@ -368,7 +369,7 @@ def _execute_query_request_worker(self, worker_args: ExecuteRequestWorkerArgs, r self._resolve_query_exception(e, query, worker_args) finally: # Send a query complete notification - batch_summaries = [batch.batch_summary for batch in query.batches] + batch_summaries = [batch.batch_summary for batch in query.batches[:constants.MAX_BATCH_RESULT_MESSAGES]] query_complete_params = QueryCompleteNotificationParams(worker_args.owner_uri, batch_summaries) _check_and_fire(worker_args.on_query_complete, query_complete_params) diff --git a/ossdbtoolsservice/utils/constants.py b/ossdbtoolsservice/utils/constants.py index 054fd0352..8a16ed229 100644 --- a/ossdbtoolsservice/utils/constants.py +++ b/ossdbtoolsservice/utils/constants.py @@ -14,6 +14,10 @@ COSMOS_PG_DEFAULT_DB = "COSMOSPGSQL" PG_DEFAULT_DB = PG_PROVIDER_NAME +# Max number of results that will be shown from a query's batch results. Will show only first 100 result messages. +# TODO: Show LAST 100 result messages +MAX_BATCH_RESULT_MESSAGES = 100 + DEFAULT_DB = { PG_DEFAULT_DB: "postgres", COSMOS_PG_DEFAULT_DB: "citus"