From 0fad71ac023ba1400bd18abfa0fa446fef03ad86 Mon Sep 17 00:00:00 2001 From: Justin Clift Date: Wed, 21 Jun 2023 13:56:21 +1000 Subject: [PATCH 1/4] Copy the contents of requirements_all_ds.txt into requirements.txt Also update httplib2 to version 0.15.0, as 0.14.0 is apparently too old. When trying this out from the command line on my local Ubuntu 20.04 system, pip installs most of them but throws these errors: ERROR: boto3 1.14.63 has requirement botocore<1.18.0,>=1.17.63, but you'll have botocore 1.13.50 which is incompatible. ERROR: dql 0.6.2 has requirement botocore>=1.17.55, but you'll have botocore 1.13.50 which is incompatible. ERROR: dql 0.6.2 has requirement pyparsing==2.1.4, but you'll have pyparsing 2.3.0 which is incompatible. ERROR: snowflake-connector-python 3.0.2 has requirement charset-normalizer<3,>=2, but you'll have charset-normalizer 3.1.0 which is incompatible. ERROR: snowflake-connector-python 3.0.2 has requirement cryptography<41.0.0,>=3.1.0, but you'll have cryptography 2.8 which is incompatible. ERROR: firebolt-sdk 0.16.2 has requirement cryptography>=3.4.0, but you'll have cryptography 2.8 which is incompatible. ERROR: firebolt-sdk 0.16.2 has requirement python-dateutil>=2.8.2, but you'll have python-dateutil 2.8.0 which is incompatible. ERROR: firebolt-sdk 0.16.2 has requirement sqlparse>=0.4.2, but you'll have sqlparse 0.3.0 which is incompatible. ERROR: pinotdb 0.5.0 has requirement httpx<0.24.0,>=0.23.0, but you'll have httpx 0.24.0 which is incompatible. We may need to drop those data sources in the short term to get things to a fully working state, then come back to them at some later point. --- requirements.txt | 56 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index db68533766..aaeb74c8eb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ itsdangerous==1.1.0 click==6.7 MarkupSafe==1.1.1 pyOpenSSL==19.0.0 -httplib2==0.14.0 +httplib2==0.15.0 wtforms==2.2.1 Flask-RESTful==0.3.7 Flask-Login==0.4.1 @@ -64,3 +64,57 @@ werkzeug==0.16.1 # ldap3==2.2.4 Authlib==0.15.5 advocate==1.0.0 + +# Copied from requirements_all_ds.txt, as a test to see if it fixes the "missing data source libraries" +# problem in our initial preview build +google-api-python-client==1.7.11 +protobuf==3.18.3 +gspread==3.1.0 +impyla==0.16.0 +influxdb==5.2.3 +mysqlclient==2.1.1 +oauth2client==4.1.3 +pyhive==0.6.1 +pymongo[tls,srv]==4.3.3 +vertica-python==0.9.5 +td-client==1.0.0 +pymssql==2.1.5 +dql==0.6.2 +dynamo3==1.0.0 +boto3>=1.14.0,<1.15.0 +botocore>=1.13,<=1.17.55 +sasl>=0.1.3 +thrift>=0.8.0 +thrift_sasl>=0.1.0 +cassandra-driver==3.21.0 +memsql==3.2.0 +atsd_client==3.0.5 +simple_salesforce==0.74.3 +PyAthena>=1.5.0,<=1.11.5 +#pymapd==0.19.0 +qds-sdk>=1.9.6 +# ibm-db>=2.0.9 +pydruid==0.5.7 +requests_aws_sign==0.1.5 +snowflake-connector-python==3.0.2 +phoenixdb==0.7 +# certifi is needed to support MongoDB and SSL: +certifi>=2019.9.11 +pydgraph==2.0.2 +azure-kusto-data==0.0.35 +pyexasol==0.12.0 +python-rapidjson==0.8.0 +pyodbc==4.0.28 +trino~=0.305 +cmem-cmempy==21.2.3 +xlrd==2.0.1 +openpyxl==3.0.7 +firebolt-sdk +databend-sqlalchemy==0.2.4 +pandas==1.3.4 +nzpy>=1.15 +nzalchemy +python-arango==6.1.0 +pinotdb>=0.4.5 +pyarrow==10.0.0 + From 546a99cbc6193471718d961e13fd7d739b4ebfde Mon Sep 17 00:00:00 2001 From: Justin Clift Date: Wed, 21 Jun 2023 14:23:56 +1000 Subject: [PATCH 2/4] Downgrade dql library, as per #68 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index aaeb74c8eb..76022d0929 100644 --- a/requirements.txt +++ b/requirements.txt @@ -79,7 +79,7 @@ pymongo[tls,srv]==4.3.3 vertica-python==0.9.5 td-client==1.0.0 pymssql==2.1.5 -dql==0.6.2 +dql==0.5.26 dynamo3==1.0.0 boto3>=1.14.0,<1.15.0 botocore>=1.13,<=1.17.55 From 2669886bf1fcd4161f984885c2e6c8e157eba8e2 Mon Sep 17 00:00:00 2001 From: Justin Clift Date: Wed, 21 Jun 2023 14:53:28 +1000 Subject: [PATCH 3/4] Drop httplib2 back down to 0.14.0, to see if that's the problem --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 76022d0929..a123232ab9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ itsdangerous==1.1.0 click==6.7 MarkupSafe==1.1.1 pyOpenSSL==19.0.0 -httplib2==0.15.0 +httplib2==0.14.0 wtforms==2.2.1 Flask-RESTful==0.3.7 Flask-Login==0.4.1 From bd7eef578c5736d95555721b83947803a1539549 Mon Sep 17 00:00:00 2001 From: Justin Clift Date: Wed, 21 Jun 2023 15:19:51 +1000 Subject: [PATCH 4/4] Completely remove some of the libraries which were throwing errors Yep, this is just a temporary thing to see if it works --- redash/__init__.py | 1 - redash/query_runner/amazon_elasticsearch.py | 67 --- redash/query_runner/athena.py | 273 ---------- redash/query_runner/cloudwatch.py | 124 ----- redash/query_runner/cloudwatch_insights.py | 156 ------ redash/query_runner/dynamodb_sql.py | 144 ----- redash/query_runner/firebolt.py | 95 ---- redash/query_runner/pg.py | 556 -------------------- redash/query_runner/pinot.py | 134 ----- redash/query_runner/snowflake.py | 191 ------- redash/settings/__init__.py | 7 - requirements.txt | 8 - requirements_all_ds.txt | 7 - requirements_dev.txt | 4 +- tests/query_runner/test_athena.py | 241 --------- 15 files changed, 1 insertion(+), 2007 deletions(-) delete mode 100644 redash/query_runner/amazon_elasticsearch.py delete mode 100644 redash/query_runner/athena.py delete mode 100644 redash/query_runner/cloudwatch.py delete mode 100644 redash/query_runner/cloudwatch_insights.py delete mode 100644 redash/query_runner/dynamodb_sql.py delete mode 100644 redash/query_runner/firebolt.py delete mode 100644 redash/query_runner/pg.py delete mode 100644 redash/query_runner/pinot.py delete mode 100644 redash/query_runner/snowflake.py delete mode 100644 tests/query_runner/test_athena.py diff --git a/redash/__init__.py b/redash/__init__.py index 86cf335b60..c6301d7cf2 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -36,7 +36,6 @@ def setup_logging(): for name in [ "passlib", "requests.packages.urllib3", - "snowflake.connector", "apiclient", ]: logging.getLogger(name).setLevel("ERROR") diff --git a/redash/query_runner/amazon_elasticsearch.py b/redash/query_runner/amazon_elasticsearch.py deleted file mode 100644 index 529892af89..0000000000 --- a/redash/query_runner/amazon_elasticsearch.py +++ /dev/null @@ -1,67 +0,0 @@ -from .elasticsearch2 import ElasticSearch2 -from . import register - -try: - from requests_aws_sign import AWSV4Sign - from botocore import session, credentials - - enabled = True -except ImportError: - enabled = False - - -class AmazonElasticsearchService(ElasticSearch2): - @classmethod - def name(cls): - return "Amazon Elasticsearch Service" - - @classmethod - def enabled(cls): - return enabled - - @classmethod - def type(cls): - return "aws_es" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "server": {"type": "string", "title": "Endpoint"}, - "region": {"type": "string"}, - "access_key": {"type": "string", "title": "Access Key"}, - "secret_key": {"type": "string", "title": "Secret Key"}, - "use_aws_iam_profile": { - "type": "boolean", - "title": "Use AWS IAM Profile", - }, - }, - "secret": ["secret_key"], - "order": [ - "server", - "region", - "access_key", - "secret_key", - "use_aws_iam_profile", - ], - "required": ["server", "region"], - } - - def __init__(self, configuration): - super(AmazonElasticsearchService, self).__init__(configuration) - - region = configuration["region"] - cred = None - if configuration.get("use_aws_iam_profile", False): - cred = credentials.get_credentials(session.Session()) - else: - cred = credentials.Credentials( - access_key=configuration.get("access_key", ""), - secret_key=configuration.get("secret_key", ""), - ) - - self.auth = AWSV4Sign(cred, region, "es") - - -register(AmazonElasticsearchService) diff --git a/redash/query_runner/athena.py b/redash/query_runner/athena.py deleted file mode 100644 index 2fbd962490..0000000000 --- a/redash/query_runner/athena.py +++ /dev/null @@ -1,273 +0,0 @@ -import logging -import os - -from redash.query_runner import * -from redash.settings import parse_boolean -from redash.utils import json_dumps, json_loads - -logger = logging.getLogger(__name__) -ANNOTATE_QUERY = parse_boolean(os.environ.get("ATHENA_ANNOTATE_QUERY", "true")) -SHOW_EXTRA_SETTINGS = parse_boolean( - os.environ.get("ATHENA_SHOW_EXTRA_SETTINGS", "true") -) -ASSUME_ROLE = parse_boolean(os.environ.get("ATHENA_ASSUME_ROLE", "false")) -OPTIONAL_CREDENTIALS = parse_boolean( - os.environ.get("ATHENA_OPTIONAL_CREDENTIALS", "true") -) - -try: - import pyathena - import boto3 - - enabled = True -except ImportError: - enabled = False - - -_TYPE_MAPPINGS = { - "boolean": TYPE_BOOLEAN, - "tinyint": TYPE_INTEGER, - "smallint": TYPE_INTEGER, - "integer": TYPE_INTEGER, - "bigint": TYPE_INTEGER, - "double": TYPE_FLOAT, - "varchar": TYPE_STRING, - "timestamp": TYPE_DATETIME, - "date": TYPE_DATE, - "varbinary": TYPE_STRING, - "array": TYPE_STRING, - "map": TYPE_STRING, - "row": TYPE_STRING, - "decimal": TYPE_FLOAT, -} - - -class SimpleFormatter(object): - def format(self, operation, parameters=None): - return operation - - -class Athena(BaseQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def name(cls): - return "Amazon Athena" - - @classmethod - def configuration_schema(cls): - schema = { - "type": "object", - "properties": { - "region": {"type": "string", "title": "AWS Region"}, - "aws_access_key": {"type": "string", "title": "AWS Access Key"}, - "aws_secret_key": {"type": "string", "title": "AWS Secret Key"}, - "s3_staging_dir": { - "type": "string", - "title": "S3 Staging (Query Results) Bucket Path", - }, - "schema": { - "type": "string", - "title": "Schema Name", - "default": "default", - }, - "glue": {"type": "boolean", "title": "Use Glue Data Catalog"}, - "work_group": { - "type": "string", - "title": "Athena Work Group", - "default": "primary", - }, - "cost_per_tb": { - "type": "number", - "title": "Athena cost per Tb scanned (USD)", - "default": 5, - }, - }, - "required": ["region", "s3_staging_dir"], - "extra_options": ["glue", "cost_per_tb"], - "order": [ - "region", - "s3_staging_dir", - "schema", - "work_group", - "cost_per_tb", - ], - "secret": ["aws_secret_key"], - } - - if SHOW_EXTRA_SETTINGS: - schema["properties"].update( - { - "encryption_option": { - "type": "string", - "title": "Encryption Option", - }, - "kms_key": {"type": "string", "title": "KMS Key"}, - } - ) - schema["extra_options"].append("encryption_option") - schema["extra_options"].append("kms_key") - - if ASSUME_ROLE: - del schema["properties"]["aws_access_key"] - del schema["properties"]["aws_secret_key"] - schema["secret"] = [] - - schema["order"].insert(1, "iam_role") - schema["order"].insert(2, "external_id") - schema["properties"].update( - { - "iam_role": {"type": "string", "title": "IAM role to assume"}, - "external_id": { - "type": "string", - "title": "External ID to be used while STS assume role", - }, - } - ) - else: - schema["order"].insert(1, "aws_access_key") - schema["order"].insert(2, "aws_secret_key") - - if not OPTIONAL_CREDENTIALS and not ASSUME_ROLE: - schema["required"] += ["aws_access_key", "aws_secret_key"] - - return schema - - @classmethod - def enabled(cls): - return enabled - - def annotate_query(self, query, metadata): - if ANNOTATE_QUERY: - return super(Athena, self).annotate_query(query, metadata) - return query - - @classmethod - def type(cls): - return "athena" - - def _get_iam_credentials(self, user=None): - if ASSUME_ROLE: - role_session_name = "redash" if user is None else user.email - sts = boto3.client("sts") - creds = sts.assume_role( - RoleArn=self.configuration.get("iam_role"), - RoleSessionName=role_session_name, - ExternalId=self.configuration.get("external_id"), - ) - return { - "aws_access_key_id": creds["Credentials"]["AccessKeyId"], - "aws_secret_access_key": creds["Credentials"]["SecretAccessKey"], - "aws_session_token": creds["Credentials"]["SessionToken"], - "region_name": self.configuration["region"], - } - else: - return { - "aws_access_key_id": self.configuration.get("aws_access_key", None), - "aws_secret_access_key": self.configuration.get("aws_secret_key", None), - "region_name": self.configuration["region"], - } - - def __get_schema_from_glue(self): - client = boto3.client("glue", **self._get_iam_credentials()) - schema = {} - - database_paginator = client.get_paginator("get_databases") - table_paginator = client.get_paginator("get_tables") - - for databases in database_paginator.paginate(): - for database in databases["DatabaseList"]: - iterator = table_paginator.paginate(DatabaseName=database["Name"]) - for table in iterator.search("TableList[]"): - table_name = "%s.%s" % (database["Name"], table["Name"]) - if 'StorageDescriptor' not in table: - logger.warning("Glue table doesn't have StorageDescriptor: %s", table_name) - continue - if table_name not in schema: - column = [ - columns["Name"] - for columns in table["StorageDescriptor"]["Columns"] - ] - schema[table_name] = {"name": table_name, "columns": column} - for partition in table.get("PartitionKeys", []): - schema[table_name]["columns"].append(partition["Name"]) - return list(schema.values()) - - def get_schema(self, get_stats=False): - if self.configuration.get("glue", False): - return self.__get_schema_from_glue() - - schema = {} - query = """ - SELECT table_schema, table_name, column_name - FROM information_schema.columns - WHERE table_schema NOT IN ('information_schema') - """ - - results, error = self.run_query(query, None) - if error is not None: - self._handle_run_query_error(error) - - results = json_loads(results) - for row in results["rows"]: - table_name = "{0}.{1}".format(row["table_schema"], row["table_name"]) - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - schema[table_name]["columns"].append(row["column_name"]) - - return list(schema.values()) - - def run_query(self, query, user): - cursor = pyathena.connect( - s3_staging_dir=self.configuration["s3_staging_dir"], - schema_name=self.configuration.get("schema", "default"), - encryption_option=self.configuration.get("encryption_option", None), - kms_key=self.configuration.get("kms_key", None), - work_group=self.configuration.get("work_group", "primary"), - formatter=SimpleFormatter(), - **self._get_iam_credentials(user=user) - ).cursor() - - try: - cursor.execute(query) - column_tuples = [ - (i[0], _TYPE_MAPPINGS.get(i[1], None)) for i in cursor.description - ] - columns = self.fetch_columns(column_tuples) - rows = [ - dict(zip(([c["name"] for c in columns]), r)) - for i, r in enumerate(cursor.fetchall()) - ] - qbytes = None - athena_query_id = None - try: - qbytes = cursor.data_scanned_in_bytes - except AttributeError as e: - logger.debug("Athena Upstream can't get data_scanned_in_bytes: %s", e) - try: - athena_query_id = cursor.query_id - except AttributeError as e: - logger.debug("Athena Upstream can't get query_id: %s", e) - - price = self.configuration.get("cost_per_tb", 5) - data = { - "columns": columns, - "rows": rows, - "metadata": { - "data_scanned": qbytes, - "athena_query_id": athena_query_id, - "query_cost": price * qbytes * 10e-12, - }, - } - - json_data = json_dumps(data, ignore_nan=True) - error = None - except Exception: - if cursor.query_id: - cursor.cancel() - raise - - return json_data, error - - -register(Athena) diff --git a/redash/query_runner/cloudwatch.py b/redash/query_runner/cloudwatch.py deleted file mode 100644 index c4640a537d..0000000000 --- a/redash/query_runner/cloudwatch.py +++ /dev/null @@ -1,124 +0,0 @@ -import yaml -import datetime - -from redash.query_runner import BaseQueryRunner, register -from redash.utils import json_dumps, parse_human_time - -try: - import boto3 - enabled = True -except ImportError: - enabled = False - -def parse_response(results): - columns = [ - {"name": "id", "type": "string"}, - {"name": "label", "type": "string"}, - {"name": "timestamp", "type": "datetime"}, - {"name": "value", "type": "float"}, - ] - - rows = [] - - for metric in results: - for i, value in enumerate(metric["Values"]): - rows.append( - { - "id": metric["Id"], - "label": metric["Label"], - "timestamp": metric["Timestamps"][i], - "value": value, - } - ) - - return rows, columns - - -def parse_query(query): - query = yaml.safe_load(query) - - for timeKey in ["StartTime", "EndTime"]: - if isinstance(query.get(timeKey), str): - query[timeKey] = int(parse_human_time(query[timeKey]).timestamp()) - if not query.get("EndTime"): - query["EndTime"] = int(datetime.datetime.now().timestamp()) - - return query - - -class CloudWatch(BaseQueryRunner): - should_annotate_query = False - - @classmethod - def name(cls): - return "Amazon CloudWatch" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "region": {"type": "string", "title": "AWS Region"}, - "aws_access_key": {"type": "string", "title": "AWS Access Key"}, - "aws_secret_key": {"type": "string", "title": "AWS Secret Key"}, - }, - "required": ["region", "aws_access_key", "aws_secret_key"], - "order": ["region", "aws_access_key", "aws_secret_key"], - "secret": ["aws_secret_key"], - } - - @classmethod - def enabled(cls): - return enabled - - def __init__(self, configuration): - super(CloudWatch, self).__init__(configuration) - self.syntax = "yaml" - - def test_connection(self): - self.get_schema() - - def _get_client(self): - cloudwatch = boto3.client( - "cloudwatch", - region_name=self.configuration.get("region"), - aws_access_key_id=self.configuration.get("aws_access_key"), - aws_secret_access_key=self.configuration.get("aws_secret_key"), - ) - return cloudwatch - - def get_schema(self, get_stats=False): - client = self._get_client() - - paginator = client.get_paginator("list_metrics") - - metrics = {} - for page in paginator.paginate(): - for metric in page["Metrics"]: - if metric["Namespace"] not in metrics: - metrics[metric["Namespace"]] = { - "name": metric["Namespace"], - "columns": [], - } - - if metric["MetricName"] not in metrics[metric["Namespace"]]["columns"]: - metrics[metric["Namespace"]]["columns"].append(metric["MetricName"]) - - return list(metrics.values()) - - def run_query(self, query, user): - cloudwatch = self._get_client() - - query = parse_query(query) - - results = [] - paginator = cloudwatch.get_paginator("get_metric_data") - for page in paginator.paginate(**query): - results += page["MetricDataResults"] - - rows, columns = parse_response(results) - - return json_dumps({"rows": rows, "columns": columns}), None - - -register(CloudWatch) diff --git a/redash/query_runner/cloudwatch_insights.py b/redash/query_runner/cloudwatch_insights.py deleted file mode 100644 index 139d5b678a..0000000000 --- a/redash/query_runner/cloudwatch_insights.py +++ /dev/null @@ -1,156 +0,0 @@ -import yaml -import datetime -import time - -from redash.query_runner import BaseQueryRunner, register -from redash.utils import json_dumps, parse_human_time - -try: - import boto3 - from botocore.exceptions import ParamValidationError - enabled = True -except ImportError: - enabled = False - -POLL_INTERVAL = 3 -TIMEOUT = 180 - - -def parse_response(response): - results = response["results"] - rows = [] - field_orders = {} - - for row in results: - record = {} - rows.append(record) - - for order, col in enumerate(row): - if col["field"] == "@ptr": - continue - field = col["field"] - record[field] = col["value"] - field_orders[field] = max(field_orders.get(field, -1), order) - - fields = sorted(field_orders, key=lambda f: field_orders[f]) - cols = [ - { - "name": f, - "type": "datetime" if f == "@timestamp" else "string", - "friendly_name": f, - } - for f in fields - ] - return { - "columns": cols, - "rows": rows, - "metadata": {"data_scanned": response["statistics"]["bytesScanned"]}, - } - - -def parse_query(query): - query = yaml.safe_load(query) - - for timeKey in ["startTime", "endTime"]: - if isinstance(query.get(timeKey), str): - query[timeKey] = int(parse_human_time(query[timeKey]).timestamp()) - if not query.get("endTime"): - query["endTime"] = int(datetime.datetime.now().timestamp()) - - return query - - -class CloudWatchInsights(BaseQueryRunner): - should_annotate_query = False - - @classmethod - def name(cls): - return "Amazon CloudWatch Logs Insights" - - @classmethod - def type(cls): - return "cloudwatch_insights" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "region": {"type": "string", "title": "AWS Region"}, - "aws_access_key": {"type": "string", "title": "AWS Access Key"}, - "aws_secret_key": {"type": "string", "title": "AWS Secret Key"}, - }, - "required": ["region", "aws_access_key", "aws_secret_key"], - "order": ["region", "aws_access_key", "aws_secret_key"], - "secret": ["aws_secret_key"], - } - - @classmethod - def enabled(cls): - return enabled - - def __init__(self, configuration): - super(CloudWatchInsights, self).__init__(configuration) - self.syntax = "yaml" - - def test_connection(self): - self.get_schema() - - def _get_client(self): - cloudwatch = boto3.client( - "logs", - region_name=self.configuration.get("region"), - aws_access_key_id=self.configuration.get("aws_access_key"), - aws_secret_access_key=self.configuration.get("aws_secret_key"), - ) - return cloudwatch - - def get_schema(self, get_stats=False): - client = self._get_client() - - log_groups = [] - paginator = client.get_paginator("describe_log_groups") - - for page in paginator.paginate(): - for group in page["logGroups"]: - group_name = group["logGroupName"] - fields = client.get_log_group_fields(logGroupName=group_name) - log_groups.append( - { - "name": group_name, - "columns": [ - field["name"] for field in fields["logGroupFields"] - ], - } - ) - - return log_groups - - def run_query(self, query, user): - logs = self._get_client() - - query = parse_query(query) - query_id = logs.start_query(**query)["queryId"] - - elapsed = 0 - while True: - result = logs.get_query_results(queryId=query_id) - if result["status"] == "Complete": - data = parse_response(result) - break - if result["status"] in ("Failed", "Timeout", "Unknown", "Cancelled"): - raise Exception( - "CloudWatch Insights Query Execution Status: {}".format( - result["status"] - ) - ) - elif elapsed > TIMEOUT: - raise Exception("Request exceeded timeout.") - else: - time.sleep(POLL_INTERVAL) - elapsed += POLL_INTERVAL - - return json_dumps(data), None - - -register(CloudWatchInsights) diff --git a/redash/query_runner/dynamodb_sql.py b/redash/query_runner/dynamodb_sql.py deleted file mode 100644 index 965cc8fc2f..0000000000 --- a/redash/query_runner/dynamodb_sql.py +++ /dev/null @@ -1,144 +0,0 @@ -import logging -import sys - -from redash.query_runner import * -from redash.utils import json_dumps - -logger = logging.getLogger(__name__) - -try: - from dql import Engine, FragmentEngine - from dynamo3 import DynamoDBError - from pyparsing import ParseException - - enabled = True -except ImportError as e: - enabled = False - -types_map = { - "UNICODE": TYPE_INTEGER, - "TINYINT": TYPE_INTEGER, - "SMALLINT": TYPE_INTEGER, - "INT": TYPE_INTEGER, - "DOUBLE": TYPE_FLOAT, - "DECIMAL": TYPE_FLOAT, - "FLOAT": TYPE_FLOAT, - "REAL": TYPE_FLOAT, - "BOOLEAN": TYPE_BOOLEAN, - "TIMESTAMP": TYPE_DATETIME, - "DATE": TYPE_DATETIME, - "CHAR": TYPE_STRING, - "STRING": TYPE_STRING, - "VARCHAR": TYPE_STRING, -} - - -class DynamoDBSQL(BaseSQLQueryRunner): - should_annotate_query = False - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "region": {"type": "string", "default": "us-east-1"}, - "access_key": {"type": "string"}, - "secret_key": {"type": "string"}, - }, - "required": ["access_key", "secret_key"], - "secret": ["secret_key"], - } - - def test_connection(self): - engine = self._connect() - list(engine.connection.list_tables()) - - @classmethod - def type(cls): - return "dynamodb_sql" - - @classmethod - def name(cls): - return "DynamoDB (with DQL)" - - def _connect(self): - engine = FragmentEngine() - config = self.configuration.to_dict() - - if not config.get("region"): - config["region"] = "us-east-1" - - if config.get("host") == "": - config["host"] = None - - engine.connect(**config) - - return engine - - def _get_tables(self, schema): - engine = self._connect() - - # We can't use describe_all because sometimes a user might give List permission - # for * (all tables), but describe permission only for some of them. - tables = engine.connection.list_tables() - for table_name in tables: - try: - table = engine.describe(table_name, True) - schema[table.name] = { - "name": table.name, - "columns": list(table.attrs.keys()), - } - except DynamoDBError: - pass - - def run_query(self, query, user): - engine = None - try: - engine = self._connect() - - if not query.endswith(";"): - query = query + ";" - - result = engine.execute(query) - - columns = [] - rows = [] - - # When running a count query it returns the value as a string, in which case - # we transform it into a dictionary to be the same as regular queries. - if isinstance(result, str): - # when count < scanned_count, dql returns a string with number of rows scanned - value = result.split(" (")[0] - if value: - value = int(value) - result = [{"value": value}] - - for item in result: - if not columns: - for k, v in item.items(): - columns.append( - { - "name": k, - "friendly_name": k, - "type": types_map.get(str(type(v)).upper(), None), - } - ) - rows.append(item) - - data = {"columns": columns, "rows": rows} - json_data = json_dumps(data) - error = None - except ParseException as e: - error = "Error parsing query at line {} (column {}):\n{}".format( - e.lineno, e.column, e.line - ) - json_data = None - except (KeyboardInterrupt, JobTimeoutException): - if engine and engine.connection: - engine.connection.cancel() - raise - - return json_data, error - - -register(DynamoDBSQL) diff --git a/redash/query_runner/firebolt.py b/redash/query_runner/firebolt.py deleted file mode 100644 index fc892f91c4..0000000000 --- a/redash/query_runner/firebolt.py +++ /dev/null @@ -1,95 +0,0 @@ -try: - from firebolt.db import connect - from firebolt.client import DEFAULT_API_URL - enabled = True -except ImportError: - enabled = False - -from redash.query_runner import BaseQueryRunner, register -from redash.query_runner import TYPE_STRING, TYPE_INTEGER, TYPE_BOOLEAN -from redash.utils import json_dumps, json_loads - -TYPES_MAP = {1: TYPE_STRING, 2: TYPE_INTEGER, 3: TYPE_BOOLEAN} - - -class Firebolt(BaseQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "api_endpoint": {"type": "string", "default": DEFAULT_API_URL}, - "engine_name": {"type": "string"}, - "DB": {"type": "string"}, - "user": {"type": "string"}, - "password": {"type": "string"}, - }, - "order": ["user", "password", "api_endpoint", "engine_name", "DB"], - "required": ["user", "password", "engine_name", "DB"], - "secret": ["password"], - } - - @classmethod - def enabled(cls): - return enabled - - def run_query(self, query, user): - connection = connect( - api_endpoint=(self.configuration.get("api_endpoint") or DEFAULT_API_URL), - engine_name=(self.configuration.get("engine_name") or None), - username=(self.configuration.get("user") or None), - password=(self.configuration.get("password") or None), - database=(self.configuration.get("DB") or None), - ) - - cursor = connection.cursor() - - try: - cursor.execute(query) - columns = self.fetch_columns( - [(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) for row in cursor - ] - - data = {"columns": columns, "rows": rows} - error = None - json_data = json_dumps(data) - finally: - connection.close() - - return json_data, error - - - def get_schema(self, get_stats=False): - query = """ - SELECT TABLE_SCHEMA, - TABLE_NAME, - COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA <> 'INFORMATION_SCHEMA' - """ - - results, error = self.run_query(query, None) - - if error is not None: - self._handle_run_query_error(error) - - schema = {} - results = json_loads(results) - - for row in results["rows"]: - table_name = "{}.{}".format(row["table_schema"], row["table_name"]) - - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - - schema[table_name]["columns"].append(row["column_name"]) - - return list(schema.values()) - - -register(Firebolt) diff --git a/redash/query_runner/pg.py b/redash/query_runner/pg.py deleted file mode 100644 index 20b5aed98a..0000000000 --- a/redash/query_runner/pg.py +++ /dev/null @@ -1,556 +0,0 @@ -import os -import logging -import select -from contextlib import contextmanager -from base64 import b64decode -from tempfile import NamedTemporaryFile -from uuid import uuid4 - -import psycopg2 -from psycopg2.extras import Range - -from redash.query_runner import * -from redash.utils import JSONEncoder, json_dumps, json_loads - -logger = logging.getLogger(__name__) - -try: - import boto3 - - IAM_ENABLED = True -except ImportError: - IAM_ENABLED = False - -types_map = { - 20: TYPE_INTEGER, - 21: TYPE_INTEGER, - 23: TYPE_INTEGER, - 700: TYPE_FLOAT, - 1700: TYPE_FLOAT, - 701: TYPE_FLOAT, - 16: TYPE_BOOLEAN, - 1082: TYPE_DATE, - 1114: TYPE_DATETIME, - 1184: TYPE_DATETIME, - 1014: TYPE_STRING, - 1015: TYPE_STRING, - 1008: TYPE_STRING, - 1009: TYPE_STRING, - 2951: TYPE_STRING, -} - - -class PostgreSQLJSONEncoder(JSONEncoder): - def default(self, o): - if isinstance(o, Range): - # From: https://github.com/psycopg/psycopg2/pull/779 - if o._bounds is None: - return "" - - items = [o._bounds[0], str(o._lower), ", ", str(o._upper), o._bounds[1]] - - return "".join(items) - - return super(PostgreSQLJSONEncoder, self).default(o) - - -def _wait(conn, timeout=None): - while 1: - try: - state = conn.poll() - if state == psycopg2.extensions.POLL_OK: - break - elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [conn.fileno()], [], timeout) - elif state == psycopg2.extensions.POLL_READ: - select.select([conn.fileno()], [], [], timeout) - else: - raise psycopg2.OperationalError("poll() returned %s" % state) - except select.error: - raise psycopg2.OperationalError("select.error received") - - -def full_table_name(schema, name): - if "." in name: - name = '"{}"'.format(name) - - return "{}.{}".format(schema, name) - - -def build_schema(query_result, schema): - # By default we omit the public schema name from the table name. But there are - # edge cases, where this might cause conflicts. For example: - # * We have a schema named "main" with table "users". - # * We have a table named "main.users" in the public schema. - # (while this feels unlikely, this actually happened) - # In this case if we omit the schema name for the public table, we will have - # a conflict. - table_names = set( - map( - lambda r: full_table_name(r["table_schema"], r["table_name"]), - query_result["rows"], - ) - ) - - for row in query_result["rows"]: - if row["table_schema"] != "public": - table_name = full_table_name(row["table_schema"], row["table_name"]) - else: - if row["table_name"] in table_names: - table_name = full_table_name(row["table_schema"], row["table_name"]) - else: - table_name = row["table_name"] - - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - - column = row["column_name"] - if row.get("data_type") is not None: - column = {"name": row["column_name"], "type": row["data_type"]} - - schema[table_name]["columns"].append(column) - - -def _create_cert_file(configuration, key, ssl_config): - file_key = key + "File" - if file_key in configuration: - with NamedTemporaryFile(mode="w", delete=False) as cert_file: - cert_bytes = b64decode(configuration[file_key]) - cert_file.write(cert_bytes.decode("utf-8")) - - ssl_config[key] = cert_file.name - - -def _cleanup_ssl_certs(ssl_config): - for k, v in ssl_config.items(): - if k != "sslmode": - os.remove(v) - - -def _get_ssl_config(configuration): - ssl_config = {"sslmode": configuration.get("sslmode", "prefer")} - - _create_cert_file(configuration, "sslrootcert", ssl_config) - _create_cert_file(configuration, "sslcert", ssl_config) - _create_cert_file(configuration, "sslkey", ssl_config) - - return ssl_config - - -class PostgreSQL(BaseSQLQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "user": {"type": "string"}, - "password": {"type": "string"}, - "host": {"type": "string", "default": "127.0.0.1"}, - "port": {"type": "number", "default": 5432}, - "dbname": {"type": "string", "title": "Database Name"}, - "sslmode": { - "type": "string", - "title": "SSL Mode", - "default": "prefer", - "extendedEnum": [ - {"value": "disable", "name": "Disable"}, - {"value": "allow", "name": "Allow"}, - {"value": "prefer", "name": "Prefer"}, - {"value": "require", "name": "Require"}, - {"value": "verify-ca", "name": "Verify CA"}, - {"value": "verify-full", "name": "Verify Full"}, - ], - }, - "sslrootcertFile": {"type": "string", "title": "SSL Root Certificate"}, - "sslcertFile": {"type": "string", "title": "SSL Client Certificate"}, - "sslkeyFile": {"type": "string", "title": "SSL Client Key"}, - }, - "order": ["host", "port", "user", "password"], - "required": ["dbname"], - "secret": ["password", "sslrootcertFile", "sslcertFile", "sslkeyFile"], - "extra_options": [ - "sslmode", - "sslrootcertFile", - "sslcertFile", - "sslkeyFile", - ], - } - - @classmethod - def type(cls): - return "pg" - - def _get_definitions(self, schema, query): - results, error = self.run_query(query, None) - - if error is not None: - self._handle_run_query_error(error) - - results = json_loads(results) - - build_schema(results, schema) - - def _get_tables(self, schema): - """ - relkind constants per https://www.postgresql.org/docs/10/static/catalog-pg-class.html - r = regular table - v = view - m = materialized view - f = foreign table - p = partitioned table (new in 10) - --- - i = index - S = sequence - t = TOAST table - c = composite type - """ - - query = """ - SELECT s.nspname as table_schema, - c.relname as table_name, - a.attname as column_name, - null as data_type - FROM pg_class c - JOIN pg_namespace s - ON c.relnamespace = s.oid - AND s.nspname NOT IN ('pg_catalog', 'information_schema') - JOIN pg_attribute a - ON a.attrelid = c.oid - AND a.attnum > 0 - AND NOT a.attisdropped - WHERE c.relkind IN ('m', 'f', 'p') - - UNION - - SELECT table_schema, - table_name, - column_name, - data_type - FROM information_schema.columns - WHERE table_schema NOT IN ('pg_catalog', 'information_schema') - """ - - self._get_definitions(schema, query) - - return list(schema.values()) - - def _get_connection(self): - self.ssl_config = _get_ssl_config(self.configuration) - connection = psycopg2.connect( - user=self.configuration.get("user"), - password=self.configuration.get("password"), - host=self.configuration.get("host"), - port=self.configuration.get("port"), - dbname=self.configuration.get("dbname"), - async_=True, - **self.ssl_config, - ) - - return connection - - def run_query(self, query, user): - connection = self._get_connection() - _wait(connection, timeout=10) - - cursor = connection.cursor() - - try: - cursor.execute(query) - _wait(connection) - - if cursor.description is not None: - columns = self.fetch_columns( - [(i[0], types_map.get(i[1], None)) for i in cursor.description] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) - for row in cursor - ] - - data = {"columns": columns, "rows": rows} - error = None - json_data = json_dumps(data, ignore_nan=True, cls=PostgreSQLJSONEncoder) - else: - error = "Query completed but it returned no data." - json_data = None - except (select.error, OSError) as e: - error = "Query interrupted. Please retry." - json_data = None - except psycopg2.DatabaseError as e: - error = str(e) - json_data = None - except (KeyboardInterrupt, InterruptException, JobTimeoutException): - connection.cancel() - raise - finally: - connection.close() - _cleanup_ssl_certs(self.ssl_config) - - return json_data, error - - -class Redshift(PostgreSQL): - @classmethod - def type(cls): - return "redshift" - - @classmethod - def name(cls): - return "Redshift" - - def _get_connection(self): - self.ssl_config = {} - - sslrootcert_path = os.path.join( - os.path.dirname(__file__), "./files/redshift-ca-bundle.crt" - ) - - connection = psycopg2.connect( - user=self.configuration.get("user"), - password=self.configuration.get("password"), - host=self.configuration.get("host"), - port=self.configuration.get("port"), - dbname=self.configuration.get("dbname"), - sslmode=self.configuration.get("sslmode", "prefer"), - sslrootcert=sslrootcert_path, - async_=True, - ) - - return connection - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "user": {"type": "string"}, - "password": {"type": "string"}, - "host": {"type": "string"}, - "port": {"type": "number"}, - "dbname": {"type": "string", "title": "Database Name"}, - "sslmode": {"type": "string", "title": "SSL Mode", "default": "prefer"}, - "adhoc_query_group": { - "type": "string", - "title": "Query Group for Adhoc Queries", - "default": "default", - }, - "scheduled_query_group": { - "type": "string", - "title": "Query Group for Scheduled Queries", - "default": "default", - }, - }, - "order": [ - "host", - "port", - "user", - "password", - "dbname", - "sslmode", - "adhoc_query_group", - "scheduled_query_group", - ], - "required": ["dbname", "user", "password", "host", "port"], - "secret": ["password"], - } - - def annotate_query(self, query, metadata): - annotated = super(Redshift, self).annotate_query(query, metadata) - - if metadata.get("Scheduled", False): - query_group = self.configuration.get("scheduled_query_group") - else: - query_group = self.configuration.get("adhoc_query_group") - - if query_group: - set_query_group = "set query_group to {};".format(query_group) - annotated = "{}\n{}".format(set_query_group, annotated) - - return annotated - - def _get_tables(self, schema): - # Use svv_columns to include internal & external (Spectrum) tables and views data for Redshift - # https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_COLUMNS.html - # Use HAS_SCHEMA_PRIVILEGE(), SVV_EXTERNAL_SCHEMAS and HAS_TABLE_PRIVILEGE() to filter - # out tables the current user cannot access. - # https://docs.aws.amazon.com/redshift/latest/dg/r_HAS_SCHEMA_PRIVILEGE.html - # https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_EXTERNAL_SCHEMAS.html - # https://docs.aws.amazon.com/redshift/latest/dg/r_HAS_TABLE_PRIVILEGE.html - query = """ - WITH tables AS ( - SELECT DISTINCT table_name, - table_schema, - column_name, - ordinal_position AS pos - FROM svv_columns - WHERE table_schema NOT IN ('pg_internal','pg_catalog','information_schema') - AND table_schema NOT LIKE 'pg_temp_%' - ) - SELECT table_name, table_schema, column_name - FROM tables - WHERE - HAS_SCHEMA_PRIVILEGE(table_schema, 'USAGE') AND - ( - table_schema IN (SELECT schemaname FROM SVV_EXTERNAL_SCHEMAS) OR - HAS_TABLE_PRIVILEGE('"' || table_schema || '"."' || table_name || '"', 'SELECT') - ) - ORDER BY table_name, pos - """ - - self._get_definitions(schema, query) - - return list(schema.values()) - - -class RedshiftIAM(Redshift): - @classmethod - def type(cls): - return "redshift_iam" - - @classmethod - def name(cls): - return "Redshift (with IAM User/Role)" - - @classmethod - def enabled(cls): - return IAM_ENABLED - - def _login_method_selection(self): - if self.configuration.get("rolename"): - if not self.configuration.get( - "aws_access_key_id" - ) or not self.configuration.get("aws_secret_access_key"): - return "ASSUME_ROLE_NO_KEYS" - else: - return "ASSUME_ROLE_KEYS" - elif self.configuration.get("aws_access_key_id") and self.configuration.get( - "aws_secret_access_key" - ): - return "KEYS" - elif not self.configuration.get("password"): - return "ROLE" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "rolename": {"type": "string", "title": "IAM Role Name"}, - "aws_region": {"type": "string", "title": "AWS Region"}, - "aws_access_key_id": {"type": "string", "title": "AWS Access Key ID"}, - "aws_secret_access_key": { - "type": "string", - "title": "AWS Secret Access Key", - }, - "clusterid": {"type": "string", "title": "Redshift Cluster ID"}, - "user": {"type": "string"}, - "host": {"type": "string"}, - "port": {"type": "number"}, - "dbname": {"type": "string", "title": "Database Name"}, - "sslmode": {"type": "string", "title": "SSL Mode", "default": "prefer"}, - "adhoc_query_group": { - "type": "string", - "title": "Query Group for Adhoc Queries", - "default": "default", - }, - "scheduled_query_group": { - "type": "string", - "title": "Query Group for Scheduled Queries", - "default": "default", - }, - }, - "order": [ - "rolename", - "aws_region", - "aws_access_key_id", - "aws_secret_access_key", - "clusterid", - "host", - "port", - "user", - "dbname", - "sslmode", - "adhoc_query_group", - "scheduled_query_group", - ], - "required": ["dbname", "user", "host", "port", "aws_region"], - "secret": ["aws_secret_access_key"], - } - - def _get_connection(self): - - sslrootcert_path = os.path.join( - os.path.dirname(__file__), "./files/redshift-ca-bundle.crt" - ) - - login_method = self._login_method_selection() - - if login_method == "KEYS": - client = boto3.client( - "redshift", - region_name=self.configuration.get("aws_region"), - aws_access_key_id=self.configuration.get("aws_access_key_id"), - aws_secret_access_key=self.configuration.get("aws_secret_access_key"), - ) - elif login_method == "ROLE": - client = boto3.client( - "redshift", region_name=self.configuration.get("aws_region") - ) - else: - if login_method == "ASSUME_ROLE_KEYS": - assume_client = client = boto3.client( - "sts", - region_name=self.configuration.get("aws_region"), - aws_access_key_id=self.configuration.get("aws_access_key_id"), - aws_secret_access_key=self.configuration.get( - "aws_secret_access_key" - ), - ) - else: - assume_client = client = boto3.client( - "sts", region_name=self.configuration.get("aws_region") - ) - role_session = f"redash_{uuid4().hex}" - session_keys = assume_client.assume_role( - RoleArn=self.configuration.get("rolename"), RoleSessionName=role_session - )["Credentials"] - client = boto3.client( - "redshift", - region_name=self.configuration.get("aws_region"), - aws_access_key_id=session_keys["AccessKeyId"], - aws_secret_access_key=session_keys["SecretAccessKey"], - aws_session_token=session_keys["SessionToken"], - ) - credentials = client.get_cluster_credentials( - DbUser=self.configuration.get("user"), - DbName=self.configuration.get("dbname"), - ClusterIdentifier=self.configuration.get("clusterid"), - ) - db_user = credentials["DbUser"] - db_password = credentials["DbPassword"] - connection = psycopg2.connect( - user=db_user, - password=db_password, - host=self.configuration.get("host"), - port=self.configuration.get("port"), - dbname=self.configuration.get("dbname"), - sslmode=self.configuration.get("sslmode", "prefer"), - sslrootcert=sslrootcert_path, - async_=True, - ) - - return connection - - -class CockroachDB(PostgreSQL): - @classmethod - def type(cls): - return "cockroach" - - -register(PostgreSQL) -register(Redshift) -register(RedshiftIAM) -register(CockroachDB) diff --git a/redash/query_runner/pinot.py b/redash/query_runner/pinot.py deleted file mode 100644 index 57ffe66cbf..0000000000 --- a/redash/query_runner/pinot.py +++ /dev/null @@ -1,134 +0,0 @@ -try: - import pinotdb - enabled = True -except ImportError: - enabled = False - -from redash.query_runner import BaseQueryRunner, register -from redash.query_runner import TYPE_DATETIME, TYPE_FLOAT, TYPE_STRING, TYPE_INTEGER, TYPE_BOOLEAN -from redash.utils import json_dumps - -import requests -from requests.auth import HTTPBasicAuth -import logging - -logger = logging.getLogger(__name__) - -PINOT_TYPES_MAPPING = { - "BOOLEAN": TYPE_BOOLEAN, - "INT": TYPE_INTEGER, - "LONG": TYPE_INTEGER, - "FLOAT": TYPE_FLOAT, - "DOUBLE": TYPE_FLOAT, - "STRING": TYPE_STRING, - "BYTES": TYPE_STRING, - "JSON": TYPE_STRING, - "TIMESTAMP": TYPE_DATETIME, -} - - -class Pinot(BaseQueryRunner): - noop_query = "SELECT 1" - username = None - password = None - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "brokerHost": {"type": "string", "default": ""}, - "brokerPort": {"type": "number", "default": 8099}, - "brokerScheme": {"type": "string", "default": "http"}, - "controllerURI": {"type": "string", "default": ""}, - "username": {"type": "string"}, - "password": {"type": "string"}, - }, - "order": ["brokerScheme", "brokerHost", "brokerPort", "controllerURI", "username", "password"], - "required": ["brokerHost", "controllerURI"], - "secret": ["password"], - } - - @classmethod - def enabled(cls): - return enabled - - def __init__(self, configuration): - super(Pinot, self).__init__(configuration) - self.controller_uri = self.configuration.get("controllerURI") - self.username=(self.configuration.get("username") or None) - self.password=(self.configuration.get("password") or None) - - def run_query(self, query, user): - logger.debug("Running query %s with username: %s, password: %s", query, self.username, self.password) - connection = pinotdb.connect( - host=self.configuration["brokerHost"], - port=self.configuration["brokerPort"], - path="/query/sql", - scheme=(self.configuration.get("brokerScheme") or "http"), - verify_ssl=False, - username=self.username, - password=self.password, - ) - - cursor = connection.cursor() - - try: - cursor.execute(query) - logger.debug("cursor.schema = %s",cursor.schema) - columns = self.fetch_columns( - [(i["name"], PINOT_TYPES_MAPPING.get(i["type"], None)) for i in cursor.schema] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) for row in cursor - ] - - data = {"columns": columns, "rows": rows} - error = None - json_data = json_dumps(data) - logger.debug("Pinot execute query [%s]", query) - finally: - connection.close() - - return json_data, error - - def get_schema(self, get_stats=False): - schema = {} - for schema_name in self.get_schema_names(): - for table_name in self.get_table_names(): - schema_table_name = "{}.{}".format(schema_name, table_name) - if table_name not in schema: - schema[schema_table_name] = {"name": schema_table_name, "columns": []} - table_schema =self.get_pinot_table_schema(table_name) - - for column in table_schema.get("dimensionFieldSpecs", []) + table_schema.get( - "metricFieldSpecs", []) + table_schema.get("dateTimeFieldSpecs", []): - c = { - "name": column["name"], - "type": PINOT_TYPES_MAPPING[column["dataType"]], - } - schema[schema_table_name]["columns"].append(c) - return list(schema.values()) - - def get_schema_names(self): - return ["default"] - - def get_pinot_table_schema(self, pinot_table_name): - return self.get_metadata_from_controller("/tables/" + pinot_table_name + "/schema") - - def get_table_names(self): - return self.get_metadata_from_controller("/tables")["tables"] - - def get_metadata_from_controller(self, path): - url = self.controller_uri + path - r = requests.get(url, headers={"Accept": "application/json"}, auth= HTTPBasicAuth(self.username, self.password)) - try: - result = r.json() - logger.debug("get_metadata_from_controller from path %s", path) - except ValueError as e: - raise pinotdb.exceptions.DatabaseError( - f"Got invalid json response from {self.controller_uri}:{path}: {r.text}" - ) from e - return result - -register(Pinot) diff --git a/redash/query_runner/snowflake.py b/redash/query_runner/snowflake.py deleted file mode 100644 index 8f5ff8b3c0..0000000000 --- a/redash/query_runner/snowflake.py +++ /dev/null @@ -1,191 +0,0 @@ -try: - import snowflake.connector - - enabled = True -except ImportError: - enabled = False - - -from redash.query_runner import BaseQueryRunner, register -from redash.query_runner import ( - TYPE_STRING, - TYPE_DATE, - TYPE_DATETIME, - TYPE_INTEGER, - TYPE_FLOAT, - TYPE_BOOLEAN, -) -from redash.utils import json_dumps, json_loads - -TYPES_MAP = { - 0: TYPE_INTEGER, - 1: TYPE_FLOAT, - 2: TYPE_STRING, - 3: TYPE_DATE, - 4: TYPE_DATETIME, - 5: TYPE_STRING, - 6: TYPE_DATETIME, - 7: TYPE_DATETIME, - 8: TYPE_DATETIME, - 13: TYPE_BOOLEAN, -} - - -class Snowflake(BaseQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "account": {"type": "string"}, - "user": {"type": "string"}, - "password": {"type": "string"}, - "warehouse": {"type": "string"}, - "database": {"type": "string"}, - "region": {"type": "string", "default": "us-west"}, - "lower_case_columns": { - "type": "boolean", - "title": "Lower Case Column Names in Results", - "default": False, - }, - "host": {"type": "string"}, - }, - "order": [ - "account", - "user", - "password", - "warehouse", - "database", - "region", - "host", - ], - "required": ["user", "password", "account", "database", "warehouse"], - "secret": ["password"], - "extra_options": [ - "host", - ], - } - - @classmethod - def enabled(cls): - return enabled - - @classmethod - def determine_type(cls, data_type, scale): - t = TYPES_MAP.get(data_type, None) - if t == TYPE_INTEGER and scale > 0: - return TYPE_FLOAT - return t - - def _get_connection(self): - region = self.configuration.get("region") - account = self.configuration["account"] - - # for us-west we don't need to pass a region (and if we do, it fails to connect) - if region == "us-west": - region = None - - if self.configuration.__contains__("host"): - host = self.configuration.get("host") - else: - if region: - host = "{}.{}.snowflakecomputing.com".format(account, region) - else: - host = "{}.snowflakecomputing.com".format(account) - - connection = snowflake.connector.connect( - user=self.configuration["user"], - password=self.configuration["password"], - account=account, - region=region, - host=host, - ) - - return connection - - def _column_name(self, column_name): - if self.configuration.get("lower_case_columns", False): - return column_name.lower() - - return column_name - - def _parse_results(self, cursor): - columns = self.fetch_columns( - [ - (self._column_name(i[0]), self.determine_type(i[1], i[5])) - for i in cursor.description - ] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) for row in cursor - ] - - data = {"columns": columns, "rows": rows} - return data - - def run_query(self, query, user): - connection = self._get_connection() - cursor = connection.cursor() - - try: - cursor.execute("USE WAREHOUSE {}".format(self.configuration["warehouse"])) - cursor.execute("USE {}".format(self.configuration["database"])) - - cursor.execute(query) - - data = self._parse_results(cursor) - error = None - json_data = json_dumps(data) - finally: - cursor.close() - connection.close() - - return json_data, error - - def _run_query_without_warehouse(self, query): - connection = self._get_connection() - cursor = connection.cursor() - - try: - cursor.execute("USE {}".format(self.configuration["database"])) - - cursor.execute(query) - - data = self._parse_results(cursor) - error = None - finally: - cursor.close() - connection.close() - - return data, error - - def _database_name_includes_schema(self): - return "." in self.configuration.get("database") - - def get_schema(self, get_stats=False): - if self._database_name_includes_schema(): - query = "SHOW COLUMNS" - else: - query = "SHOW COLUMNS IN DATABASE" - - results, error = self._run_query_without_warehouse(query) - - if error is not None: - self._handle_run_query_error(error) - - schema = {} - for row in results["rows"]: - if row["kind"] == "COLUMN": - table_name = "{}.{}".format(row["schema_name"], row["table_name"]) - - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - - schema[table_name]["columns"].append(row["column_name"]) - - return list(schema.values()) - - -register(Snowflake) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 38b6e7d0ee..a307b3c623 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -349,7 +349,6 @@ def email_server_is_configured(): "redash.query_runner.mongodb", "redash.query_runner.couchbase", "redash.query_runner.mysql", - "redash.query_runner.pg", "redash.query_runner.url", "redash.query_runner.influx_db", "redash.query_runner.elasticsearch", @@ -357,7 +356,6 @@ def email_server_is_configured(): "redash.query_runner.amazon_elasticsearch", "redash.query_runner.trino", "redash.query_runner.presto", - "redash.query_runner.pinot", "redash.query_runner.databricks", "redash.query_runner.hive_ds", "redash.query_runner.impala_ds", @@ -367,7 +365,6 @@ def email_server_is_configured(): "redash.query_runner.rockset", "redash.query_runner.treasuredata", "redash.query_runner.sqlite", - "redash.query_runner.dynamodb_sql", "redash.query_runner.mssql", "redash.query_runner.mssql_odbc", "redash.query_runner.memsql_ds", @@ -384,20 +381,16 @@ def email_server_is_configured(): "redash.query_runner.kylin", "redash.query_runner.drill", "redash.query_runner.uptycs", - "redash.query_runner.snowflake", "redash.query_runner.phoenix", "redash.query_runner.json_ds", "redash.query_runner.cass", "redash.query_runner.dgraph", "redash.query_runner.azure_kusto", "redash.query_runner.exasol", - "redash.query_runner.cloudwatch", - "redash.query_runner.cloudwatch_insights", "redash.query_runner.corporate_memory", "redash.query_runner.sparql_endpoint", "redash.query_runner.excel", "redash.query_runner.csv", - "redash.query_runner.firebolt", "redash.query_runner.databend", "redash.query_runner.nz", "redash.query_runner.arango" diff --git a/requirements.txt b/requirements.txt index a123232ab9..642d15a877 100644 --- a/requirements.txt +++ b/requirements.txt @@ -79,10 +79,6 @@ pymongo[tls,srv]==4.3.3 vertica-python==0.9.5 td-client==1.0.0 pymssql==2.1.5 -dql==0.5.26 -dynamo3==1.0.0 -boto3>=1.14.0,<1.15.0 -botocore>=1.13,<=1.17.55 sasl>=0.1.3 thrift>=0.8.0 thrift_sasl>=0.1.0 @@ -96,7 +92,6 @@ qds-sdk>=1.9.6 # ibm-db>=2.0.9 pydruid==0.5.7 requests_aws_sign==0.1.5 -snowflake-connector-python==3.0.2 phoenixdb==0.7 # certifi is needed to support MongoDB and SSL: certifi>=2019.9.11 @@ -109,12 +104,9 @@ trino~=0.305 cmem-cmempy==21.2.3 xlrd==2.0.1 openpyxl==3.0.7 -firebolt-sdk databend-sqlalchemy==0.2.4 pandas==1.3.4 nzpy>=1.15 nzalchemy python-arango==6.1.0 -pinotdb>=0.4.5 pyarrow==10.0.0 - diff --git a/requirements_all_ds.txt b/requirements_all_ds.txt index a7cf69c09f..559c414b29 100644 --- a/requirements_all_ds.txt +++ b/requirements_all_ds.txt @@ -10,10 +10,6 @@ pymongo[tls,srv]==4.3.3 vertica-python==0.9.5 td-client==1.0.0 pymssql==2.1.5 -dql==0.6.2 -dynamo3==1.0.0 -boto3>=1.14.0,<1.15.0 -botocore>=1.13,<=1.17.55 sasl>=0.1.3 thrift>=0.8.0 thrift_sasl>=0.1.0 @@ -27,7 +23,6 @@ qds-sdk>=1.9.6 # ibm-db>=2.0.9 pydruid==0.5.7 requests_aws_sign==0.1.5 -snowflake-connector-python==3.0.2 phoenixdb==0.7 # certifi is needed to support MongoDB and SSL: certifi>=2019.9.11 @@ -40,11 +35,9 @@ trino~=0.305 cmem-cmempy==21.2.3 xlrd==2.0.1 openpyxl==3.0.7 -firebolt-sdk databend-sqlalchemy==0.2.4 pandas==1.3.4 nzpy>=1.15 nzalchemy python-arango==6.1.0 -pinotdb>=0.4.5 pyarrow==10.0.0 diff --git a/requirements_dev.txt b/requirements_dev.txt index 6d0647666b..c3aa14387b 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -5,9 +5,7 @@ mock==3.0.5 # PyMongo and Athena dependencies are needed for some of the unit tests: # (this is not perfect and we should resolve this in a different way) -pymongo[srv,tls]==3.9.0 -boto3>=1.14.0,<1.15.0 -botocore>=1.13,<=1.17.55 +pymongo[srv,tls]==4.3.3 PyAthena>=1.5.0,<=1.11.5 ptvsd==4.3.2 freezegun==0.3.12 diff --git a/tests/query_runner/test_athena.py b/tests/query_runner/test_athena.py deleted file mode 100644 index 24552a3fc8..0000000000 --- a/tests/query_runner/test_athena.py +++ /dev/null @@ -1,241 +0,0 @@ -""" -Some test cases around the Glue catalog. -""" -from unittest import TestCase - -import botocore -import mock -from botocore.stub import Stubber - -from redash.query_runner.athena import Athena - - -class TestGlueSchema(TestCase): - def setUp(self): - - client = botocore.session.get_session().create_client( - "glue", - region_name="mars-east-1", - aws_access_key_id="foo", - aws_secret_access_key="bar", - ) - self.stubber = Stubber(client) - - self.patcher = mock.patch("boto3.client") - mocked_client = self.patcher.start() - mocked_client.return_value = client - - def tearDown(self): - self.patcher.stop() - - def test_external_table(self): - """Unpartitioned table crawled through a JDBC connection""" - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "jdbc_table", - "StorageDescriptor": { - "Columns": [{"Name": "row_id", "Type": "int"}], - "Location": "Database.Schema.Table", - "Compressed": False, - "NumberOfBuckets": -1, - "SerdeInfo": {"Parameters": {}}, - "BucketColumns": [], - "SortColumns": [], - "Parameters": { - "CrawlerSchemaDeserializerVersion": "1.0", - "CrawlerSchemaSerializerVersion": "1.0", - "UPDATED_BY_CRAWLER": "jdbc", - "classification": "sqlserver", - "compressionType": "none", - "connectionName": "jdbctest", - "typeOfData": "view", - }, - "StoredAsSubDirectories": False, - }, - "PartitionKeys": [], - "TableType": "EXTERNAL_TABLE", - "Parameters": { - "CrawlerSchemaDeserializerVersion": "1.0", - "CrawlerSchemaSerializerVersion": "1.0", - "UPDATED_BY_CRAWLER": "jdbc", - "classification": "sqlserver", - "compressionType": "none", - "connectionName": "jdbctest", - "typeOfData": "view", - }, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["row_id"], "name": "test1.jdbc_table"} - ] - - def test_partitioned_table(self): - """ - Partitioned table as created by a GlueContext - """ - - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "partitioned_table", - "StorageDescriptor": { - "Columns": [{"Name": "sk", "Type": "int"}], - "Location": "s3://bucket/prefix", - "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", - "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", - "Compressed": False, - "NumberOfBuckets": -1, - "SerdeInfo": { - "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", - "Parameters": {"serialization.format": "1"}, - }, - "BucketColumns": [], - "SortColumns": [], - "Parameters": {}, - "SkewedInfo": { - "SkewedColumnNames": [], - "SkewedColumnValues": [], - "SkewedColumnValueLocationMaps": {}, - }, - "StoredAsSubDirectories": False, - }, - "PartitionKeys": [{"Name": "category", "Type": "int"}], - "TableType": "EXTERNAL_TABLE", - "Parameters": { - "EXTERNAL": "TRUE", - "transient_lastDdlTime": "1537505313", - }, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["sk", "category"], "name": "test1.partitioned_table"} - ] - - def test_view(self): - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "view", - "StorageDescriptor": { - "Columns": [{"Name": "sk", "Type": "int"}], - "Location": "", - "Compressed": False, - "NumberOfBuckets": 0, - "SerdeInfo": {}, - "SortColumns": [], - "StoredAsSubDirectories": False, - }, - "PartitionKeys": [], - "ViewOriginalText": "/* Presto View: ... */", - "ViewExpandedText": "/* Presto View */", - "TableType": "VIRTUAL_VIEW", - "Parameters": {"comment": "Presto View", "presto_view": "true"}, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["sk"], "name": "test1.view"} - ] - - def test_dodgy_table_does_not_break_schema_listing(self): - """ - For some reason, not all Glue tables contain a "PartitionKeys" entry. - - This may be a Athena Catalog to Glue catalog migration issue. - """ - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "csv", - "StorageDescriptor": { - "Columns": [{"Name": "region", "Type": "string"}], - "Location": "s3://bucket/files/", - "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", - "Compressed": False, - "NumberOfBuckets": 0, - "SerdeInfo": { - "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", - "Parameters": { - "field.delim": "|", - "skip.header.line.count": "1", - }, - }, - "SortColumns": [], - "StoredAsSubDirectories": False, - }, - "Parameters": {"classification": "csv"}, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["region"], "name": "test1.csv"} - ] - - def test_no_storage_descriptor_table(self): - """ - For some reason, not all Glue tables contain a "StorageDescriptor" entry. - """ - query_runner = Athena({'glue': True, 'region': 'mars-east-1'}) - - self.stubber.add_response('get_databases', {'DatabaseList': [{'Name': 'test1'}]}, {}) - self.stubber.add_response( - 'get_tables', - { - 'TableList': [ - { - 'Name': 'no_storage_descriptor_table', - 'PartitionKeys': [], - 'TableType': 'EXTERNAL_TABLE', - 'Parameters': { - 'EXTERNAL': 'TRUE' - }, - } - ] - }, - {'DatabaseName': 'test1'}, - ) - with self.stubber: - assert query_runner.get_schema() == []