diff --git a/redash/__init__.py b/redash/__init__.py index 86cf335b60..c6301d7cf2 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -36,7 +36,6 @@ def setup_logging(): for name in [ "passlib", "requests.packages.urllib3", - "snowflake.connector", "apiclient", ]: logging.getLogger(name).setLevel("ERROR") diff --git a/redash/query_runner/amazon_elasticsearch.py b/redash/query_runner/amazon_elasticsearch.py deleted file mode 100644 index 529892af89..0000000000 --- a/redash/query_runner/amazon_elasticsearch.py +++ /dev/null @@ -1,67 +0,0 @@ -from .elasticsearch2 import ElasticSearch2 -from . import register - -try: - from requests_aws_sign import AWSV4Sign - from botocore import session, credentials - - enabled = True -except ImportError: - enabled = False - - -class AmazonElasticsearchService(ElasticSearch2): - @classmethod - def name(cls): - return "Amazon Elasticsearch Service" - - @classmethod - def enabled(cls): - return enabled - - @classmethod - def type(cls): - return "aws_es" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "server": {"type": "string", "title": "Endpoint"}, - "region": {"type": "string"}, - "access_key": {"type": "string", "title": "Access Key"}, - "secret_key": {"type": "string", "title": "Secret Key"}, - "use_aws_iam_profile": { - "type": "boolean", - "title": "Use AWS IAM Profile", - }, - }, - "secret": ["secret_key"], - "order": [ - "server", - "region", - "access_key", - "secret_key", - "use_aws_iam_profile", - ], - "required": ["server", "region"], - } - - def __init__(self, configuration): - super(AmazonElasticsearchService, self).__init__(configuration) - - region = configuration["region"] - cred = None - if configuration.get("use_aws_iam_profile", False): - cred = credentials.get_credentials(session.Session()) - else: - cred = credentials.Credentials( - access_key=configuration.get("access_key", ""), - secret_key=configuration.get("secret_key", ""), - ) - - self.auth = AWSV4Sign(cred, region, "es") - - -register(AmazonElasticsearchService) diff --git a/redash/query_runner/athena.py b/redash/query_runner/athena.py deleted file mode 100644 index 2fbd962490..0000000000 --- a/redash/query_runner/athena.py +++ /dev/null @@ -1,273 +0,0 @@ -import logging -import os - -from redash.query_runner import * -from redash.settings import parse_boolean -from redash.utils import json_dumps, json_loads - -logger = logging.getLogger(__name__) -ANNOTATE_QUERY = parse_boolean(os.environ.get("ATHENA_ANNOTATE_QUERY", "true")) -SHOW_EXTRA_SETTINGS = parse_boolean( - os.environ.get("ATHENA_SHOW_EXTRA_SETTINGS", "true") -) -ASSUME_ROLE = parse_boolean(os.environ.get("ATHENA_ASSUME_ROLE", "false")) -OPTIONAL_CREDENTIALS = parse_boolean( - os.environ.get("ATHENA_OPTIONAL_CREDENTIALS", "true") -) - -try: - import pyathena - import boto3 - - enabled = True -except ImportError: - enabled = False - - -_TYPE_MAPPINGS = { - "boolean": TYPE_BOOLEAN, - "tinyint": TYPE_INTEGER, - "smallint": TYPE_INTEGER, - "integer": TYPE_INTEGER, - "bigint": TYPE_INTEGER, - "double": TYPE_FLOAT, - "varchar": TYPE_STRING, - "timestamp": TYPE_DATETIME, - "date": TYPE_DATE, - "varbinary": TYPE_STRING, - "array": TYPE_STRING, - "map": TYPE_STRING, - "row": TYPE_STRING, - "decimal": TYPE_FLOAT, -} - - -class SimpleFormatter(object): - def format(self, operation, parameters=None): - return operation - - -class Athena(BaseQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def name(cls): - return "Amazon Athena" - - @classmethod - def configuration_schema(cls): - schema = { - "type": "object", - "properties": { - "region": {"type": "string", "title": "AWS Region"}, - "aws_access_key": {"type": "string", "title": "AWS Access Key"}, - "aws_secret_key": {"type": "string", "title": "AWS Secret Key"}, - "s3_staging_dir": { - "type": "string", - "title": "S3 Staging (Query Results) Bucket Path", - }, - "schema": { - "type": "string", - "title": "Schema Name", - "default": "default", - }, - "glue": {"type": "boolean", "title": "Use Glue Data Catalog"}, - "work_group": { - "type": "string", - "title": "Athena Work Group", - "default": "primary", - }, - "cost_per_tb": { - "type": "number", - "title": "Athena cost per Tb scanned (USD)", - "default": 5, - }, - }, - "required": ["region", "s3_staging_dir"], - "extra_options": ["glue", "cost_per_tb"], - "order": [ - "region", - "s3_staging_dir", - "schema", - "work_group", - "cost_per_tb", - ], - "secret": ["aws_secret_key"], - } - - if SHOW_EXTRA_SETTINGS: - schema["properties"].update( - { - "encryption_option": { - "type": "string", - "title": "Encryption Option", - }, - "kms_key": {"type": "string", "title": "KMS Key"}, - } - ) - schema["extra_options"].append("encryption_option") - schema["extra_options"].append("kms_key") - - if ASSUME_ROLE: - del schema["properties"]["aws_access_key"] - del schema["properties"]["aws_secret_key"] - schema["secret"] = [] - - schema["order"].insert(1, "iam_role") - schema["order"].insert(2, "external_id") - schema["properties"].update( - { - "iam_role": {"type": "string", "title": "IAM role to assume"}, - "external_id": { - "type": "string", - "title": "External ID to be used while STS assume role", - }, - } - ) - else: - schema["order"].insert(1, "aws_access_key") - schema["order"].insert(2, "aws_secret_key") - - if not OPTIONAL_CREDENTIALS and not ASSUME_ROLE: - schema["required"] += ["aws_access_key", "aws_secret_key"] - - return schema - - @classmethod - def enabled(cls): - return enabled - - def annotate_query(self, query, metadata): - if ANNOTATE_QUERY: - return super(Athena, self).annotate_query(query, metadata) - return query - - @classmethod - def type(cls): - return "athena" - - def _get_iam_credentials(self, user=None): - if ASSUME_ROLE: - role_session_name = "redash" if user is None else user.email - sts = boto3.client("sts") - creds = sts.assume_role( - RoleArn=self.configuration.get("iam_role"), - RoleSessionName=role_session_name, - ExternalId=self.configuration.get("external_id"), - ) - return { - "aws_access_key_id": creds["Credentials"]["AccessKeyId"], - "aws_secret_access_key": creds["Credentials"]["SecretAccessKey"], - "aws_session_token": creds["Credentials"]["SessionToken"], - "region_name": self.configuration["region"], - } - else: - return { - "aws_access_key_id": self.configuration.get("aws_access_key", None), - "aws_secret_access_key": self.configuration.get("aws_secret_key", None), - "region_name": self.configuration["region"], - } - - def __get_schema_from_glue(self): - client = boto3.client("glue", **self._get_iam_credentials()) - schema = {} - - database_paginator = client.get_paginator("get_databases") - table_paginator = client.get_paginator("get_tables") - - for databases in database_paginator.paginate(): - for database in databases["DatabaseList"]: - iterator = table_paginator.paginate(DatabaseName=database["Name"]) - for table in iterator.search("TableList[]"): - table_name = "%s.%s" % (database["Name"], table["Name"]) - if 'StorageDescriptor' not in table: - logger.warning("Glue table doesn't have StorageDescriptor: %s", table_name) - continue - if table_name not in schema: - column = [ - columns["Name"] - for columns in table["StorageDescriptor"]["Columns"] - ] - schema[table_name] = {"name": table_name, "columns": column} - for partition in table.get("PartitionKeys", []): - schema[table_name]["columns"].append(partition["Name"]) - return list(schema.values()) - - def get_schema(self, get_stats=False): - if self.configuration.get("glue", False): - return self.__get_schema_from_glue() - - schema = {} - query = """ - SELECT table_schema, table_name, column_name - FROM information_schema.columns - WHERE table_schema NOT IN ('information_schema') - """ - - results, error = self.run_query(query, None) - if error is not None: - self._handle_run_query_error(error) - - results = json_loads(results) - for row in results["rows"]: - table_name = "{0}.{1}".format(row["table_schema"], row["table_name"]) - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - schema[table_name]["columns"].append(row["column_name"]) - - return list(schema.values()) - - def run_query(self, query, user): - cursor = pyathena.connect( - s3_staging_dir=self.configuration["s3_staging_dir"], - schema_name=self.configuration.get("schema", "default"), - encryption_option=self.configuration.get("encryption_option", None), - kms_key=self.configuration.get("kms_key", None), - work_group=self.configuration.get("work_group", "primary"), - formatter=SimpleFormatter(), - **self._get_iam_credentials(user=user) - ).cursor() - - try: - cursor.execute(query) - column_tuples = [ - (i[0], _TYPE_MAPPINGS.get(i[1], None)) for i in cursor.description - ] - columns = self.fetch_columns(column_tuples) - rows = [ - dict(zip(([c["name"] for c in columns]), r)) - for i, r in enumerate(cursor.fetchall()) - ] - qbytes = None - athena_query_id = None - try: - qbytes = cursor.data_scanned_in_bytes - except AttributeError as e: - logger.debug("Athena Upstream can't get data_scanned_in_bytes: %s", e) - try: - athena_query_id = cursor.query_id - except AttributeError as e: - logger.debug("Athena Upstream can't get query_id: %s", e) - - price = self.configuration.get("cost_per_tb", 5) - data = { - "columns": columns, - "rows": rows, - "metadata": { - "data_scanned": qbytes, - "athena_query_id": athena_query_id, - "query_cost": price * qbytes * 10e-12, - }, - } - - json_data = json_dumps(data, ignore_nan=True) - error = None - except Exception: - if cursor.query_id: - cursor.cancel() - raise - - return json_data, error - - -register(Athena) diff --git a/redash/query_runner/cloudwatch.py b/redash/query_runner/cloudwatch.py deleted file mode 100644 index c4640a537d..0000000000 --- a/redash/query_runner/cloudwatch.py +++ /dev/null @@ -1,124 +0,0 @@ -import yaml -import datetime - -from redash.query_runner import BaseQueryRunner, register -from redash.utils import json_dumps, parse_human_time - -try: - import boto3 - enabled = True -except ImportError: - enabled = False - -def parse_response(results): - columns = [ - {"name": "id", "type": "string"}, - {"name": "label", "type": "string"}, - {"name": "timestamp", "type": "datetime"}, - {"name": "value", "type": "float"}, - ] - - rows = [] - - for metric in results: - for i, value in enumerate(metric["Values"]): - rows.append( - { - "id": metric["Id"], - "label": metric["Label"], - "timestamp": metric["Timestamps"][i], - "value": value, - } - ) - - return rows, columns - - -def parse_query(query): - query = yaml.safe_load(query) - - for timeKey in ["StartTime", "EndTime"]: - if isinstance(query.get(timeKey), str): - query[timeKey] = int(parse_human_time(query[timeKey]).timestamp()) - if not query.get("EndTime"): - query["EndTime"] = int(datetime.datetime.now().timestamp()) - - return query - - -class CloudWatch(BaseQueryRunner): - should_annotate_query = False - - @classmethod - def name(cls): - return "Amazon CloudWatch" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "region": {"type": "string", "title": "AWS Region"}, - "aws_access_key": {"type": "string", "title": "AWS Access Key"}, - "aws_secret_key": {"type": "string", "title": "AWS Secret Key"}, - }, - "required": ["region", "aws_access_key", "aws_secret_key"], - "order": ["region", "aws_access_key", "aws_secret_key"], - "secret": ["aws_secret_key"], - } - - @classmethod - def enabled(cls): - return enabled - - def __init__(self, configuration): - super(CloudWatch, self).__init__(configuration) - self.syntax = "yaml" - - def test_connection(self): - self.get_schema() - - def _get_client(self): - cloudwatch = boto3.client( - "cloudwatch", - region_name=self.configuration.get("region"), - aws_access_key_id=self.configuration.get("aws_access_key"), - aws_secret_access_key=self.configuration.get("aws_secret_key"), - ) - return cloudwatch - - def get_schema(self, get_stats=False): - client = self._get_client() - - paginator = client.get_paginator("list_metrics") - - metrics = {} - for page in paginator.paginate(): - for metric in page["Metrics"]: - if metric["Namespace"] not in metrics: - metrics[metric["Namespace"]] = { - "name": metric["Namespace"], - "columns": [], - } - - if metric["MetricName"] not in metrics[metric["Namespace"]]["columns"]: - metrics[metric["Namespace"]]["columns"].append(metric["MetricName"]) - - return list(metrics.values()) - - def run_query(self, query, user): - cloudwatch = self._get_client() - - query = parse_query(query) - - results = [] - paginator = cloudwatch.get_paginator("get_metric_data") - for page in paginator.paginate(**query): - results += page["MetricDataResults"] - - rows, columns = parse_response(results) - - return json_dumps({"rows": rows, "columns": columns}), None - - -register(CloudWatch) diff --git a/redash/query_runner/cloudwatch_insights.py b/redash/query_runner/cloudwatch_insights.py deleted file mode 100644 index 139d5b678a..0000000000 --- a/redash/query_runner/cloudwatch_insights.py +++ /dev/null @@ -1,156 +0,0 @@ -import yaml -import datetime -import time - -from redash.query_runner import BaseQueryRunner, register -from redash.utils import json_dumps, parse_human_time - -try: - import boto3 - from botocore.exceptions import ParamValidationError - enabled = True -except ImportError: - enabled = False - -POLL_INTERVAL = 3 -TIMEOUT = 180 - - -def parse_response(response): - results = response["results"] - rows = [] - field_orders = {} - - for row in results: - record = {} - rows.append(record) - - for order, col in enumerate(row): - if col["field"] == "@ptr": - continue - field = col["field"] - record[field] = col["value"] - field_orders[field] = max(field_orders.get(field, -1), order) - - fields = sorted(field_orders, key=lambda f: field_orders[f]) - cols = [ - { - "name": f, - "type": "datetime" if f == "@timestamp" else "string", - "friendly_name": f, - } - for f in fields - ] - return { - "columns": cols, - "rows": rows, - "metadata": {"data_scanned": response["statistics"]["bytesScanned"]}, - } - - -def parse_query(query): - query = yaml.safe_load(query) - - for timeKey in ["startTime", "endTime"]: - if isinstance(query.get(timeKey), str): - query[timeKey] = int(parse_human_time(query[timeKey]).timestamp()) - if not query.get("endTime"): - query["endTime"] = int(datetime.datetime.now().timestamp()) - - return query - - -class CloudWatchInsights(BaseQueryRunner): - should_annotate_query = False - - @classmethod - def name(cls): - return "Amazon CloudWatch Logs Insights" - - @classmethod - def type(cls): - return "cloudwatch_insights" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "region": {"type": "string", "title": "AWS Region"}, - "aws_access_key": {"type": "string", "title": "AWS Access Key"}, - "aws_secret_key": {"type": "string", "title": "AWS Secret Key"}, - }, - "required": ["region", "aws_access_key", "aws_secret_key"], - "order": ["region", "aws_access_key", "aws_secret_key"], - "secret": ["aws_secret_key"], - } - - @classmethod - def enabled(cls): - return enabled - - def __init__(self, configuration): - super(CloudWatchInsights, self).__init__(configuration) - self.syntax = "yaml" - - def test_connection(self): - self.get_schema() - - def _get_client(self): - cloudwatch = boto3.client( - "logs", - region_name=self.configuration.get("region"), - aws_access_key_id=self.configuration.get("aws_access_key"), - aws_secret_access_key=self.configuration.get("aws_secret_key"), - ) - return cloudwatch - - def get_schema(self, get_stats=False): - client = self._get_client() - - log_groups = [] - paginator = client.get_paginator("describe_log_groups") - - for page in paginator.paginate(): - for group in page["logGroups"]: - group_name = group["logGroupName"] - fields = client.get_log_group_fields(logGroupName=group_name) - log_groups.append( - { - "name": group_name, - "columns": [ - field["name"] for field in fields["logGroupFields"] - ], - } - ) - - return log_groups - - def run_query(self, query, user): - logs = self._get_client() - - query = parse_query(query) - query_id = logs.start_query(**query)["queryId"] - - elapsed = 0 - while True: - result = logs.get_query_results(queryId=query_id) - if result["status"] == "Complete": - data = parse_response(result) - break - if result["status"] in ("Failed", "Timeout", "Unknown", "Cancelled"): - raise Exception( - "CloudWatch Insights Query Execution Status: {}".format( - result["status"] - ) - ) - elif elapsed > TIMEOUT: - raise Exception("Request exceeded timeout.") - else: - time.sleep(POLL_INTERVAL) - elapsed += POLL_INTERVAL - - return json_dumps(data), None - - -register(CloudWatchInsights) diff --git a/redash/query_runner/dynamodb_sql.py b/redash/query_runner/dynamodb_sql.py deleted file mode 100644 index 965cc8fc2f..0000000000 --- a/redash/query_runner/dynamodb_sql.py +++ /dev/null @@ -1,144 +0,0 @@ -import logging -import sys - -from redash.query_runner import * -from redash.utils import json_dumps - -logger = logging.getLogger(__name__) - -try: - from dql import Engine, FragmentEngine - from dynamo3 import DynamoDBError - from pyparsing import ParseException - - enabled = True -except ImportError as e: - enabled = False - -types_map = { - "UNICODE": TYPE_INTEGER, - "TINYINT": TYPE_INTEGER, - "SMALLINT": TYPE_INTEGER, - "INT": TYPE_INTEGER, - "DOUBLE": TYPE_FLOAT, - "DECIMAL": TYPE_FLOAT, - "FLOAT": TYPE_FLOAT, - "REAL": TYPE_FLOAT, - "BOOLEAN": TYPE_BOOLEAN, - "TIMESTAMP": TYPE_DATETIME, - "DATE": TYPE_DATETIME, - "CHAR": TYPE_STRING, - "STRING": TYPE_STRING, - "VARCHAR": TYPE_STRING, -} - - -class DynamoDBSQL(BaseSQLQueryRunner): - should_annotate_query = False - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "region": {"type": "string", "default": "us-east-1"}, - "access_key": {"type": "string"}, - "secret_key": {"type": "string"}, - }, - "required": ["access_key", "secret_key"], - "secret": ["secret_key"], - } - - def test_connection(self): - engine = self._connect() - list(engine.connection.list_tables()) - - @classmethod - def type(cls): - return "dynamodb_sql" - - @classmethod - def name(cls): - return "DynamoDB (with DQL)" - - def _connect(self): - engine = FragmentEngine() - config = self.configuration.to_dict() - - if not config.get("region"): - config["region"] = "us-east-1" - - if config.get("host") == "": - config["host"] = None - - engine.connect(**config) - - return engine - - def _get_tables(self, schema): - engine = self._connect() - - # We can't use describe_all because sometimes a user might give List permission - # for * (all tables), but describe permission only for some of them. - tables = engine.connection.list_tables() - for table_name in tables: - try: - table = engine.describe(table_name, True) - schema[table.name] = { - "name": table.name, - "columns": list(table.attrs.keys()), - } - except DynamoDBError: - pass - - def run_query(self, query, user): - engine = None - try: - engine = self._connect() - - if not query.endswith(";"): - query = query + ";" - - result = engine.execute(query) - - columns = [] - rows = [] - - # When running a count query it returns the value as a string, in which case - # we transform it into a dictionary to be the same as regular queries. - if isinstance(result, str): - # when count < scanned_count, dql returns a string with number of rows scanned - value = result.split(" (")[0] - if value: - value = int(value) - result = [{"value": value}] - - for item in result: - if not columns: - for k, v in item.items(): - columns.append( - { - "name": k, - "friendly_name": k, - "type": types_map.get(str(type(v)).upper(), None), - } - ) - rows.append(item) - - data = {"columns": columns, "rows": rows} - json_data = json_dumps(data) - error = None - except ParseException as e: - error = "Error parsing query at line {} (column {}):\n{}".format( - e.lineno, e.column, e.line - ) - json_data = None - except (KeyboardInterrupt, JobTimeoutException): - if engine and engine.connection: - engine.connection.cancel() - raise - - return json_data, error - - -register(DynamoDBSQL) diff --git a/redash/query_runner/firebolt.py b/redash/query_runner/firebolt.py deleted file mode 100644 index fc892f91c4..0000000000 --- a/redash/query_runner/firebolt.py +++ /dev/null @@ -1,95 +0,0 @@ -try: - from firebolt.db import connect - from firebolt.client import DEFAULT_API_URL - enabled = True -except ImportError: - enabled = False - -from redash.query_runner import BaseQueryRunner, register -from redash.query_runner import TYPE_STRING, TYPE_INTEGER, TYPE_BOOLEAN -from redash.utils import json_dumps, json_loads - -TYPES_MAP = {1: TYPE_STRING, 2: TYPE_INTEGER, 3: TYPE_BOOLEAN} - - -class Firebolt(BaseQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "api_endpoint": {"type": "string", "default": DEFAULT_API_URL}, - "engine_name": {"type": "string"}, - "DB": {"type": "string"}, - "user": {"type": "string"}, - "password": {"type": "string"}, - }, - "order": ["user", "password", "api_endpoint", "engine_name", "DB"], - "required": ["user", "password", "engine_name", "DB"], - "secret": ["password"], - } - - @classmethod - def enabled(cls): - return enabled - - def run_query(self, query, user): - connection = connect( - api_endpoint=(self.configuration.get("api_endpoint") or DEFAULT_API_URL), - engine_name=(self.configuration.get("engine_name") or None), - username=(self.configuration.get("user") or None), - password=(self.configuration.get("password") or None), - database=(self.configuration.get("DB") or None), - ) - - cursor = connection.cursor() - - try: - cursor.execute(query) - columns = self.fetch_columns( - [(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) for row in cursor - ] - - data = {"columns": columns, "rows": rows} - error = None - json_data = json_dumps(data) - finally: - connection.close() - - return json_data, error - - - def get_schema(self, get_stats=False): - query = """ - SELECT TABLE_SCHEMA, - TABLE_NAME, - COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_SCHEMA <> 'INFORMATION_SCHEMA' - """ - - results, error = self.run_query(query, None) - - if error is not None: - self._handle_run_query_error(error) - - schema = {} - results = json_loads(results) - - for row in results["rows"]: - table_name = "{}.{}".format(row["table_schema"], row["table_name"]) - - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - - schema[table_name]["columns"].append(row["column_name"]) - - return list(schema.values()) - - -register(Firebolt) diff --git a/redash/query_runner/pg.py b/redash/query_runner/pg.py deleted file mode 100644 index 20b5aed98a..0000000000 --- a/redash/query_runner/pg.py +++ /dev/null @@ -1,556 +0,0 @@ -import os -import logging -import select -from contextlib import contextmanager -from base64 import b64decode -from tempfile import NamedTemporaryFile -from uuid import uuid4 - -import psycopg2 -from psycopg2.extras import Range - -from redash.query_runner import * -from redash.utils import JSONEncoder, json_dumps, json_loads - -logger = logging.getLogger(__name__) - -try: - import boto3 - - IAM_ENABLED = True -except ImportError: - IAM_ENABLED = False - -types_map = { - 20: TYPE_INTEGER, - 21: TYPE_INTEGER, - 23: TYPE_INTEGER, - 700: TYPE_FLOAT, - 1700: TYPE_FLOAT, - 701: TYPE_FLOAT, - 16: TYPE_BOOLEAN, - 1082: TYPE_DATE, - 1114: TYPE_DATETIME, - 1184: TYPE_DATETIME, - 1014: TYPE_STRING, - 1015: TYPE_STRING, - 1008: TYPE_STRING, - 1009: TYPE_STRING, - 2951: TYPE_STRING, -} - - -class PostgreSQLJSONEncoder(JSONEncoder): - def default(self, o): - if isinstance(o, Range): - # From: https://github.com/psycopg/psycopg2/pull/779 - if o._bounds is None: - return "" - - items = [o._bounds[0], str(o._lower), ", ", str(o._upper), o._bounds[1]] - - return "".join(items) - - return super(PostgreSQLJSONEncoder, self).default(o) - - -def _wait(conn, timeout=None): - while 1: - try: - state = conn.poll() - if state == psycopg2.extensions.POLL_OK: - break - elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [conn.fileno()], [], timeout) - elif state == psycopg2.extensions.POLL_READ: - select.select([conn.fileno()], [], [], timeout) - else: - raise psycopg2.OperationalError("poll() returned %s" % state) - except select.error: - raise psycopg2.OperationalError("select.error received") - - -def full_table_name(schema, name): - if "." in name: - name = '"{}"'.format(name) - - return "{}.{}".format(schema, name) - - -def build_schema(query_result, schema): - # By default we omit the public schema name from the table name. But there are - # edge cases, where this might cause conflicts. For example: - # * We have a schema named "main" with table "users". - # * We have a table named "main.users" in the public schema. - # (while this feels unlikely, this actually happened) - # In this case if we omit the schema name for the public table, we will have - # a conflict. - table_names = set( - map( - lambda r: full_table_name(r["table_schema"], r["table_name"]), - query_result["rows"], - ) - ) - - for row in query_result["rows"]: - if row["table_schema"] != "public": - table_name = full_table_name(row["table_schema"], row["table_name"]) - else: - if row["table_name"] in table_names: - table_name = full_table_name(row["table_schema"], row["table_name"]) - else: - table_name = row["table_name"] - - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - - column = row["column_name"] - if row.get("data_type") is not None: - column = {"name": row["column_name"], "type": row["data_type"]} - - schema[table_name]["columns"].append(column) - - -def _create_cert_file(configuration, key, ssl_config): - file_key = key + "File" - if file_key in configuration: - with NamedTemporaryFile(mode="w", delete=False) as cert_file: - cert_bytes = b64decode(configuration[file_key]) - cert_file.write(cert_bytes.decode("utf-8")) - - ssl_config[key] = cert_file.name - - -def _cleanup_ssl_certs(ssl_config): - for k, v in ssl_config.items(): - if k != "sslmode": - os.remove(v) - - -def _get_ssl_config(configuration): - ssl_config = {"sslmode": configuration.get("sslmode", "prefer")} - - _create_cert_file(configuration, "sslrootcert", ssl_config) - _create_cert_file(configuration, "sslcert", ssl_config) - _create_cert_file(configuration, "sslkey", ssl_config) - - return ssl_config - - -class PostgreSQL(BaseSQLQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "user": {"type": "string"}, - "password": {"type": "string"}, - "host": {"type": "string", "default": "127.0.0.1"}, - "port": {"type": "number", "default": 5432}, - "dbname": {"type": "string", "title": "Database Name"}, - "sslmode": { - "type": "string", - "title": "SSL Mode", - "default": "prefer", - "extendedEnum": [ - {"value": "disable", "name": "Disable"}, - {"value": "allow", "name": "Allow"}, - {"value": "prefer", "name": "Prefer"}, - {"value": "require", "name": "Require"}, - {"value": "verify-ca", "name": "Verify CA"}, - {"value": "verify-full", "name": "Verify Full"}, - ], - }, - "sslrootcertFile": {"type": "string", "title": "SSL Root Certificate"}, - "sslcertFile": {"type": "string", "title": "SSL Client Certificate"}, - "sslkeyFile": {"type": "string", "title": "SSL Client Key"}, - }, - "order": ["host", "port", "user", "password"], - "required": ["dbname"], - "secret": ["password", "sslrootcertFile", "sslcertFile", "sslkeyFile"], - "extra_options": [ - "sslmode", - "sslrootcertFile", - "sslcertFile", - "sslkeyFile", - ], - } - - @classmethod - def type(cls): - return "pg" - - def _get_definitions(self, schema, query): - results, error = self.run_query(query, None) - - if error is not None: - self._handle_run_query_error(error) - - results = json_loads(results) - - build_schema(results, schema) - - def _get_tables(self, schema): - """ - relkind constants per https://www.postgresql.org/docs/10/static/catalog-pg-class.html - r = regular table - v = view - m = materialized view - f = foreign table - p = partitioned table (new in 10) - --- - i = index - S = sequence - t = TOAST table - c = composite type - """ - - query = """ - SELECT s.nspname as table_schema, - c.relname as table_name, - a.attname as column_name, - null as data_type - FROM pg_class c - JOIN pg_namespace s - ON c.relnamespace = s.oid - AND s.nspname NOT IN ('pg_catalog', 'information_schema') - JOIN pg_attribute a - ON a.attrelid = c.oid - AND a.attnum > 0 - AND NOT a.attisdropped - WHERE c.relkind IN ('m', 'f', 'p') - - UNION - - SELECT table_schema, - table_name, - column_name, - data_type - FROM information_schema.columns - WHERE table_schema NOT IN ('pg_catalog', 'information_schema') - """ - - self._get_definitions(schema, query) - - return list(schema.values()) - - def _get_connection(self): - self.ssl_config = _get_ssl_config(self.configuration) - connection = psycopg2.connect( - user=self.configuration.get("user"), - password=self.configuration.get("password"), - host=self.configuration.get("host"), - port=self.configuration.get("port"), - dbname=self.configuration.get("dbname"), - async_=True, - **self.ssl_config, - ) - - return connection - - def run_query(self, query, user): - connection = self._get_connection() - _wait(connection, timeout=10) - - cursor = connection.cursor() - - try: - cursor.execute(query) - _wait(connection) - - if cursor.description is not None: - columns = self.fetch_columns( - [(i[0], types_map.get(i[1], None)) for i in cursor.description] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) - for row in cursor - ] - - data = {"columns": columns, "rows": rows} - error = None - json_data = json_dumps(data, ignore_nan=True, cls=PostgreSQLJSONEncoder) - else: - error = "Query completed but it returned no data." - json_data = None - except (select.error, OSError) as e: - error = "Query interrupted. Please retry." - json_data = None - except psycopg2.DatabaseError as e: - error = str(e) - json_data = None - except (KeyboardInterrupt, InterruptException, JobTimeoutException): - connection.cancel() - raise - finally: - connection.close() - _cleanup_ssl_certs(self.ssl_config) - - return json_data, error - - -class Redshift(PostgreSQL): - @classmethod - def type(cls): - return "redshift" - - @classmethod - def name(cls): - return "Redshift" - - def _get_connection(self): - self.ssl_config = {} - - sslrootcert_path = os.path.join( - os.path.dirname(__file__), "./files/redshift-ca-bundle.crt" - ) - - connection = psycopg2.connect( - user=self.configuration.get("user"), - password=self.configuration.get("password"), - host=self.configuration.get("host"), - port=self.configuration.get("port"), - dbname=self.configuration.get("dbname"), - sslmode=self.configuration.get("sslmode", "prefer"), - sslrootcert=sslrootcert_path, - async_=True, - ) - - return connection - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "user": {"type": "string"}, - "password": {"type": "string"}, - "host": {"type": "string"}, - "port": {"type": "number"}, - "dbname": {"type": "string", "title": "Database Name"}, - "sslmode": {"type": "string", "title": "SSL Mode", "default": "prefer"}, - "adhoc_query_group": { - "type": "string", - "title": "Query Group for Adhoc Queries", - "default": "default", - }, - "scheduled_query_group": { - "type": "string", - "title": "Query Group for Scheduled Queries", - "default": "default", - }, - }, - "order": [ - "host", - "port", - "user", - "password", - "dbname", - "sslmode", - "adhoc_query_group", - "scheduled_query_group", - ], - "required": ["dbname", "user", "password", "host", "port"], - "secret": ["password"], - } - - def annotate_query(self, query, metadata): - annotated = super(Redshift, self).annotate_query(query, metadata) - - if metadata.get("Scheduled", False): - query_group = self.configuration.get("scheduled_query_group") - else: - query_group = self.configuration.get("adhoc_query_group") - - if query_group: - set_query_group = "set query_group to {};".format(query_group) - annotated = "{}\n{}".format(set_query_group, annotated) - - return annotated - - def _get_tables(self, schema): - # Use svv_columns to include internal & external (Spectrum) tables and views data for Redshift - # https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_COLUMNS.html - # Use HAS_SCHEMA_PRIVILEGE(), SVV_EXTERNAL_SCHEMAS and HAS_TABLE_PRIVILEGE() to filter - # out tables the current user cannot access. - # https://docs.aws.amazon.com/redshift/latest/dg/r_HAS_SCHEMA_PRIVILEGE.html - # https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_EXTERNAL_SCHEMAS.html - # https://docs.aws.amazon.com/redshift/latest/dg/r_HAS_TABLE_PRIVILEGE.html - query = """ - WITH tables AS ( - SELECT DISTINCT table_name, - table_schema, - column_name, - ordinal_position AS pos - FROM svv_columns - WHERE table_schema NOT IN ('pg_internal','pg_catalog','information_schema') - AND table_schema NOT LIKE 'pg_temp_%' - ) - SELECT table_name, table_schema, column_name - FROM tables - WHERE - HAS_SCHEMA_PRIVILEGE(table_schema, 'USAGE') AND - ( - table_schema IN (SELECT schemaname FROM SVV_EXTERNAL_SCHEMAS) OR - HAS_TABLE_PRIVILEGE('"' || table_schema || '"."' || table_name || '"', 'SELECT') - ) - ORDER BY table_name, pos - """ - - self._get_definitions(schema, query) - - return list(schema.values()) - - -class RedshiftIAM(Redshift): - @classmethod - def type(cls): - return "redshift_iam" - - @classmethod - def name(cls): - return "Redshift (with IAM User/Role)" - - @classmethod - def enabled(cls): - return IAM_ENABLED - - def _login_method_selection(self): - if self.configuration.get("rolename"): - if not self.configuration.get( - "aws_access_key_id" - ) or not self.configuration.get("aws_secret_access_key"): - return "ASSUME_ROLE_NO_KEYS" - else: - return "ASSUME_ROLE_KEYS" - elif self.configuration.get("aws_access_key_id") and self.configuration.get( - "aws_secret_access_key" - ): - return "KEYS" - elif not self.configuration.get("password"): - return "ROLE" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "rolename": {"type": "string", "title": "IAM Role Name"}, - "aws_region": {"type": "string", "title": "AWS Region"}, - "aws_access_key_id": {"type": "string", "title": "AWS Access Key ID"}, - "aws_secret_access_key": { - "type": "string", - "title": "AWS Secret Access Key", - }, - "clusterid": {"type": "string", "title": "Redshift Cluster ID"}, - "user": {"type": "string"}, - "host": {"type": "string"}, - "port": {"type": "number"}, - "dbname": {"type": "string", "title": "Database Name"}, - "sslmode": {"type": "string", "title": "SSL Mode", "default": "prefer"}, - "adhoc_query_group": { - "type": "string", - "title": "Query Group for Adhoc Queries", - "default": "default", - }, - "scheduled_query_group": { - "type": "string", - "title": "Query Group for Scheduled Queries", - "default": "default", - }, - }, - "order": [ - "rolename", - "aws_region", - "aws_access_key_id", - "aws_secret_access_key", - "clusterid", - "host", - "port", - "user", - "dbname", - "sslmode", - "adhoc_query_group", - "scheduled_query_group", - ], - "required": ["dbname", "user", "host", "port", "aws_region"], - "secret": ["aws_secret_access_key"], - } - - def _get_connection(self): - - sslrootcert_path = os.path.join( - os.path.dirname(__file__), "./files/redshift-ca-bundle.crt" - ) - - login_method = self._login_method_selection() - - if login_method == "KEYS": - client = boto3.client( - "redshift", - region_name=self.configuration.get("aws_region"), - aws_access_key_id=self.configuration.get("aws_access_key_id"), - aws_secret_access_key=self.configuration.get("aws_secret_access_key"), - ) - elif login_method == "ROLE": - client = boto3.client( - "redshift", region_name=self.configuration.get("aws_region") - ) - else: - if login_method == "ASSUME_ROLE_KEYS": - assume_client = client = boto3.client( - "sts", - region_name=self.configuration.get("aws_region"), - aws_access_key_id=self.configuration.get("aws_access_key_id"), - aws_secret_access_key=self.configuration.get( - "aws_secret_access_key" - ), - ) - else: - assume_client = client = boto3.client( - "sts", region_name=self.configuration.get("aws_region") - ) - role_session = f"redash_{uuid4().hex}" - session_keys = assume_client.assume_role( - RoleArn=self.configuration.get("rolename"), RoleSessionName=role_session - )["Credentials"] - client = boto3.client( - "redshift", - region_name=self.configuration.get("aws_region"), - aws_access_key_id=session_keys["AccessKeyId"], - aws_secret_access_key=session_keys["SecretAccessKey"], - aws_session_token=session_keys["SessionToken"], - ) - credentials = client.get_cluster_credentials( - DbUser=self.configuration.get("user"), - DbName=self.configuration.get("dbname"), - ClusterIdentifier=self.configuration.get("clusterid"), - ) - db_user = credentials["DbUser"] - db_password = credentials["DbPassword"] - connection = psycopg2.connect( - user=db_user, - password=db_password, - host=self.configuration.get("host"), - port=self.configuration.get("port"), - dbname=self.configuration.get("dbname"), - sslmode=self.configuration.get("sslmode", "prefer"), - sslrootcert=sslrootcert_path, - async_=True, - ) - - return connection - - -class CockroachDB(PostgreSQL): - @classmethod - def type(cls): - return "cockroach" - - -register(PostgreSQL) -register(Redshift) -register(RedshiftIAM) -register(CockroachDB) diff --git a/redash/query_runner/pinot.py b/redash/query_runner/pinot.py deleted file mode 100644 index 57ffe66cbf..0000000000 --- a/redash/query_runner/pinot.py +++ /dev/null @@ -1,134 +0,0 @@ -try: - import pinotdb - enabled = True -except ImportError: - enabled = False - -from redash.query_runner import BaseQueryRunner, register -from redash.query_runner import TYPE_DATETIME, TYPE_FLOAT, TYPE_STRING, TYPE_INTEGER, TYPE_BOOLEAN -from redash.utils import json_dumps - -import requests -from requests.auth import HTTPBasicAuth -import logging - -logger = logging.getLogger(__name__) - -PINOT_TYPES_MAPPING = { - "BOOLEAN": TYPE_BOOLEAN, - "INT": TYPE_INTEGER, - "LONG": TYPE_INTEGER, - "FLOAT": TYPE_FLOAT, - "DOUBLE": TYPE_FLOAT, - "STRING": TYPE_STRING, - "BYTES": TYPE_STRING, - "JSON": TYPE_STRING, - "TIMESTAMP": TYPE_DATETIME, -} - - -class Pinot(BaseQueryRunner): - noop_query = "SELECT 1" - username = None - password = None - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "brokerHost": {"type": "string", "default": ""}, - "brokerPort": {"type": "number", "default": 8099}, - "brokerScheme": {"type": "string", "default": "http"}, - "controllerURI": {"type": "string", "default": ""}, - "username": {"type": "string"}, - "password": {"type": "string"}, - }, - "order": ["brokerScheme", "brokerHost", "brokerPort", "controllerURI", "username", "password"], - "required": ["brokerHost", "controllerURI"], - "secret": ["password"], - } - - @classmethod - def enabled(cls): - return enabled - - def __init__(self, configuration): - super(Pinot, self).__init__(configuration) - self.controller_uri = self.configuration.get("controllerURI") - self.username=(self.configuration.get("username") or None) - self.password=(self.configuration.get("password") or None) - - def run_query(self, query, user): - logger.debug("Running query %s with username: %s, password: %s", query, self.username, self.password) - connection = pinotdb.connect( - host=self.configuration["brokerHost"], - port=self.configuration["brokerPort"], - path="/query/sql", - scheme=(self.configuration.get("brokerScheme") or "http"), - verify_ssl=False, - username=self.username, - password=self.password, - ) - - cursor = connection.cursor() - - try: - cursor.execute(query) - logger.debug("cursor.schema = %s",cursor.schema) - columns = self.fetch_columns( - [(i["name"], PINOT_TYPES_MAPPING.get(i["type"], None)) for i in cursor.schema] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) for row in cursor - ] - - data = {"columns": columns, "rows": rows} - error = None - json_data = json_dumps(data) - logger.debug("Pinot execute query [%s]", query) - finally: - connection.close() - - return json_data, error - - def get_schema(self, get_stats=False): - schema = {} - for schema_name in self.get_schema_names(): - for table_name in self.get_table_names(): - schema_table_name = "{}.{}".format(schema_name, table_name) - if table_name not in schema: - schema[schema_table_name] = {"name": schema_table_name, "columns": []} - table_schema =self.get_pinot_table_schema(table_name) - - for column in table_schema.get("dimensionFieldSpecs", []) + table_schema.get( - "metricFieldSpecs", []) + table_schema.get("dateTimeFieldSpecs", []): - c = { - "name": column["name"], - "type": PINOT_TYPES_MAPPING[column["dataType"]], - } - schema[schema_table_name]["columns"].append(c) - return list(schema.values()) - - def get_schema_names(self): - return ["default"] - - def get_pinot_table_schema(self, pinot_table_name): - return self.get_metadata_from_controller("/tables/" + pinot_table_name + "/schema") - - def get_table_names(self): - return self.get_metadata_from_controller("/tables")["tables"] - - def get_metadata_from_controller(self, path): - url = self.controller_uri + path - r = requests.get(url, headers={"Accept": "application/json"}, auth= HTTPBasicAuth(self.username, self.password)) - try: - result = r.json() - logger.debug("get_metadata_from_controller from path %s", path) - except ValueError as e: - raise pinotdb.exceptions.DatabaseError( - f"Got invalid json response from {self.controller_uri}:{path}: {r.text}" - ) from e - return result - -register(Pinot) diff --git a/redash/query_runner/snowflake.py b/redash/query_runner/snowflake.py deleted file mode 100644 index 8f5ff8b3c0..0000000000 --- a/redash/query_runner/snowflake.py +++ /dev/null @@ -1,191 +0,0 @@ -try: - import snowflake.connector - - enabled = True -except ImportError: - enabled = False - - -from redash.query_runner import BaseQueryRunner, register -from redash.query_runner import ( - TYPE_STRING, - TYPE_DATE, - TYPE_DATETIME, - TYPE_INTEGER, - TYPE_FLOAT, - TYPE_BOOLEAN, -) -from redash.utils import json_dumps, json_loads - -TYPES_MAP = { - 0: TYPE_INTEGER, - 1: TYPE_FLOAT, - 2: TYPE_STRING, - 3: TYPE_DATE, - 4: TYPE_DATETIME, - 5: TYPE_STRING, - 6: TYPE_DATETIME, - 7: TYPE_DATETIME, - 8: TYPE_DATETIME, - 13: TYPE_BOOLEAN, -} - - -class Snowflake(BaseQueryRunner): - noop_query = "SELECT 1" - - @classmethod - def configuration_schema(cls): - return { - "type": "object", - "properties": { - "account": {"type": "string"}, - "user": {"type": "string"}, - "password": {"type": "string"}, - "warehouse": {"type": "string"}, - "database": {"type": "string"}, - "region": {"type": "string", "default": "us-west"}, - "lower_case_columns": { - "type": "boolean", - "title": "Lower Case Column Names in Results", - "default": False, - }, - "host": {"type": "string"}, - }, - "order": [ - "account", - "user", - "password", - "warehouse", - "database", - "region", - "host", - ], - "required": ["user", "password", "account", "database", "warehouse"], - "secret": ["password"], - "extra_options": [ - "host", - ], - } - - @classmethod - def enabled(cls): - return enabled - - @classmethod - def determine_type(cls, data_type, scale): - t = TYPES_MAP.get(data_type, None) - if t == TYPE_INTEGER and scale > 0: - return TYPE_FLOAT - return t - - def _get_connection(self): - region = self.configuration.get("region") - account = self.configuration["account"] - - # for us-west we don't need to pass a region (and if we do, it fails to connect) - if region == "us-west": - region = None - - if self.configuration.__contains__("host"): - host = self.configuration.get("host") - else: - if region: - host = "{}.{}.snowflakecomputing.com".format(account, region) - else: - host = "{}.snowflakecomputing.com".format(account) - - connection = snowflake.connector.connect( - user=self.configuration["user"], - password=self.configuration["password"], - account=account, - region=region, - host=host, - ) - - return connection - - def _column_name(self, column_name): - if self.configuration.get("lower_case_columns", False): - return column_name.lower() - - return column_name - - def _parse_results(self, cursor): - columns = self.fetch_columns( - [ - (self._column_name(i[0]), self.determine_type(i[1], i[5])) - for i in cursor.description - ] - ) - rows = [ - dict(zip((column["name"] for column in columns), row)) for row in cursor - ] - - data = {"columns": columns, "rows": rows} - return data - - def run_query(self, query, user): - connection = self._get_connection() - cursor = connection.cursor() - - try: - cursor.execute("USE WAREHOUSE {}".format(self.configuration["warehouse"])) - cursor.execute("USE {}".format(self.configuration["database"])) - - cursor.execute(query) - - data = self._parse_results(cursor) - error = None - json_data = json_dumps(data) - finally: - cursor.close() - connection.close() - - return json_data, error - - def _run_query_without_warehouse(self, query): - connection = self._get_connection() - cursor = connection.cursor() - - try: - cursor.execute("USE {}".format(self.configuration["database"])) - - cursor.execute(query) - - data = self._parse_results(cursor) - error = None - finally: - cursor.close() - connection.close() - - return data, error - - def _database_name_includes_schema(self): - return "." in self.configuration.get("database") - - def get_schema(self, get_stats=False): - if self._database_name_includes_schema(): - query = "SHOW COLUMNS" - else: - query = "SHOW COLUMNS IN DATABASE" - - results, error = self._run_query_without_warehouse(query) - - if error is not None: - self._handle_run_query_error(error) - - schema = {} - for row in results["rows"]: - if row["kind"] == "COLUMN": - table_name = "{}.{}".format(row["schema_name"], row["table_name"]) - - if table_name not in schema: - schema[table_name] = {"name": table_name, "columns": []} - - schema[table_name]["columns"].append(row["column_name"]) - - return list(schema.values()) - - -register(Snowflake) diff --git a/redash/settings/__init__.py b/redash/settings/__init__.py index 38b6e7d0ee..a307b3c623 100644 --- a/redash/settings/__init__.py +++ b/redash/settings/__init__.py @@ -349,7 +349,6 @@ def email_server_is_configured(): "redash.query_runner.mongodb", "redash.query_runner.couchbase", "redash.query_runner.mysql", - "redash.query_runner.pg", "redash.query_runner.url", "redash.query_runner.influx_db", "redash.query_runner.elasticsearch", @@ -357,7 +356,6 @@ def email_server_is_configured(): "redash.query_runner.amazon_elasticsearch", "redash.query_runner.trino", "redash.query_runner.presto", - "redash.query_runner.pinot", "redash.query_runner.databricks", "redash.query_runner.hive_ds", "redash.query_runner.impala_ds", @@ -367,7 +365,6 @@ def email_server_is_configured(): "redash.query_runner.rockset", "redash.query_runner.treasuredata", "redash.query_runner.sqlite", - "redash.query_runner.dynamodb_sql", "redash.query_runner.mssql", "redash.query_runner.mssql_odbc", "redash.query_runner.memsql_ds", @@ -384,20 +381,16 @@ def email_server_is_configured(): "redash.query_runner.kylin", "redash.query_runner.drill", "redash.query_runner.uptycs", - "redash.query_runner.snowflake", "redash.query_runner.phoenix", "redash.query_runner.json_ds", "redash.query_runner.cass", "redash.query_runner.dgraph", "redash.query_runner.azure_kusto", "redash.query_runner.exasol", - "redash.query_runner.cloudwatch", - "redash.query_runner.cloudwatch_insights", "redash.query_runner.corporate_memory", "redash.query_runner.sparql_endpoint", "redash.query_runner.excel", "redash.query_runner.csv", - "redash.query_runner.firebolt", "redash.query_runner.databend", "redash.query_runner.nz", "redash.query_runner.arango" diff --git a/requirements.txt b/requirements.txt index db68533766..642d15a877 100644 --- a/requirements.txt +++ b/requirements.txt @@ -64,3 +64,49 @@ werkzeug==0.16.1 # ldap3==2.2.4 Authlib==0.15.5 advocate==1.0.0 + +# Copied from requirements_all_ds.txt, as a test to see if it fixes the "missing data source libraries" +# problem in our initial preview build +google-api-python-client==1.7.11 +protobuf==3.18.3 +gspread==3.1.0 +impyla==0.16.0 +influxdb==5.2.3 +mysqlclient==2.1.1 +oauth2client==4.1.3 +pyhive==0.6.1 +pymongo[tls,srv]==4.3.3 +vertica-python==0.9.5 +td-client==1.0.0 +pymssql==2.1.5 +sasl>=0.1.3 +thrift>=0.8.0 +thrift_sasl>=0.1.0 +cassandra-driver==3.21.0 +memsql==3.2.0 +atsd_client==3.0.5 +simple_salesforce==0.74.3 +PyAthena>=1.5.0,<=1.11.5 +#pymapd==0.19.0 +qds-sdk>=1.9.6 +# ibm-db>=2.0.9 +pydruid==0.5.7 +requests_aws_sign==0.1.5 +phoenixdb==0.7 +# certifi is needed to support MongoDB and SSL: +certifi>=2019.9.11 +pydgraph==2.0.2 +azure-kusto-data==0.0.35 +pyexasol==0.12.0 +python-rapidjson==0.8.0 +pyodbc==4.0.28 +trino~=0.305 +cmem-cmempy==21.2.3 +xlrd==2.0.1 +openpyxl==3.0.7 +databend-sqlalchemy==0.2.4 +pandas==1.3.4 +nzpy>=1.15 +nzalchemy +python-arango==6.1.0 +pyarrow==10.0.0 diff --git a/requirements_all_ds.txt b/requirements_all_ds.txt index 8bf40326f0..559c414b29 100644 --- a/requirements_all_ds.txt +++ b/requirements_all_ds.txt @@ -10,11 +10,6 @@ pymongo[tls,srv]==4.3.3 vertica-python==0.9.5 td-client==1.0.0 pymssql==2.1.5 -# We can't upgrade dql==0.5.26 version as dql==0.6.2 newer versions require PostgreSQL > 10, but we target older versions at the moment. -dql==0.5.26 -dynamo3==1.0.0 -boto3>=1.14.0,<1.15.0 -botocore>=1.13,<=1.17.55 sasl>=0.1.3 thrift>=0.8.0 thrift_sasl>=0.1.0 @@ -28,7 +23,6 @@ qds-sdk>=1.9.6 # ibm-db>=2.0.9 pydruid==0.5.7 requests_aws_sign==0.1.5 -snowflake-connector-python==3.0.2 phoenixdb==0.7 # certifi is needed to support MongoDB and SSL: certifi>=2019.9.11 @@ -41,11 +35,9 @@ trino~=0.305 cmem-cmempy==21.2.3 xlrd==2.0.1 openpyxl==3.0.7 -firebolt-sdk databend-sqlalchemy==0.2.4 pandas==1.3.4 nzpy>=1.15 nzalchemy python-arango==6.1.0 -pinotdb>=0.4.5 pyarrow==10.0.0 diff --git a/requirements_dev.txt b/requirements_dev.txt index 6d0647666b..c3aa14387b 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -5,9 +5,7 @@ mock==3.0.5 # PyMongo and Athena dependencies are needed for some of the unit tests: # (this is not perfect and we should resolve this in a different way) -pymongo[srv,tls]==3.9.0 -boto3>=1.14.0,<1.15.0 -botocore>=1.13,<=1.17.55 +pymongo[srv,tls]==4.3.3 PyAthena>=1.5.0,<=1.11.5 ptvsd==4.3.2 freezegun==0.3.12 diff --git a/tests/query_runner/test_athena.py b/tests/query_runner/test_athena.py deleted file mode 100644 index 24552a3fc8..0000000000 --- a/tests/query_runner/test_athena.py +++ /dev/null @@ -1,241 +0,0 @@ -""" -Some test cases around the Glue catalog. -""" -from unittest import TestCase - -import botocore -import mock -from botocore.stub import Stubber - -from redash.query_runner.athena import Athena - - -class TestGlueSchema(TestCase): - def setUp(self): - - client = botocore.session.get_session().create_client( - "glue", - region_name="mars-east-1", - aws_access_key_id="foo", - aws_secret_access_key="bar", - ) - self.stubber = Stubber(client) - - self.patcher = mock.patch("boto3.client") - mocked_client = self.patcher.start() - mocked_client.return_value = client - - def tearDown(self): - self.patcher.stop() - - def test_external_table(self): - """Unpartitioned table crawled through a JDBC connection""" - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "jdbc_table", - "StorageDescriptor": { - "Columns": [{"Name": "row_id", "Type": "int"}], - "Location": "Database.Schema.Table", - "Compressed": False, - "NumberOfBuckets": -1, - "SerdeInfo": {"Parameters": {}}, - "BucketColumns": [], - "SortColumns": [], - "Parameters": { - "CrawlerSchemaDeserializerVersion": "1.0", - "CrawlerSchemaSerializerVersion": "1.0", - "UPDATED_BY_CRAWLER": "jdbc", - "classification": "sqlserver", - "compressionType": "none", - "connectionName": "jdbctest", - "typeOfData": "view", - }, - "StoredAsSubDirectories": False, - }, - "PartitionKeys": [], - "TableType": "EXTERNAL_TABLE", - "Parameters": { - "CrawlerSchemaDeserializerVersion": "1.0", - "CrawlerSchemaSerializerVersion": "1.0", - "UPDATED_BY_CRAWLER": "jdbc", - "classification": "sqlserver", - "compressionType": "none", - "connectionName": "jdbctest", - "typeOfData": "view", - }, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["row_id"], "name": "test1.jdbc_table"} - ] - - def test_partitioned_table(self): - """ - Partitioned table as created by a GlueContext - """ - - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "partitioned_table", - "StorageDescriptor": { - "Columns": [{"Name": "sk", "Type": "int"}], - "Location": "s3://bucket/prefix", - "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", - "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", - "Compressed": False, - "NumberOfBuckets": -1, - "SerdeInfo": { - "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", - "Parameters": {"serialization.format": "1"}, - }, - "BucketColumns": [], - "SortColumns": [], - "Parameters": {}, - "SkewedInfo": { - "SkewedColumnNames": [], - "SkewedColumnValues": [], - "SkewedColumnValueLocationMaps": {}, - }, - "StoredAsSubDirectories": False, - }, - "PartitionKeys": [{"Name": "category", "Type": "int"}], - "TableType": "EXTERNAL_TABLE", - "Parameters": { - "EXTERNAL": "TRUE", - "transient_lastDdlTime": "1537505313", - }, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["sk", "category"], "name": "test1.partitioned_table"} - ] - - def test_view(self): - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "view", - "StorageDescriptor": { - "Columns": [{"Name": "sk", "Type": "int"}], - "Location": "", - "Compressed": False, - "NumberOfBuckets": 0, - "SerdeInfo": {}, - "SortColumns": [], - "StoredAsSubDirectories": False, - }, - "PartitionKeys": [], - "ViewOriginalText": "/* Presto View: ... */", - "ViewExpandedText": "/* Presto View */", - "TableType": "VIRTUAL_VIEW", - "Parameters": {"comment": "Presto View", "presto_view": "true"}, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["sk"], "name": "test1.view"} - ] - - def test_dodgy_table_does_not_break_schema_listing(self): - """ - For some reason, not all Glue tables contain a "PartitionKeys" entry. - - This may be a Athena Catalog to Glue catalog migration issue. - """ - query_runner = Athena({"glue": True, "region": "mars-east-1"}) - - self.stubber.add_response( - "get_databases", {"DatabaseList": [{"Name": "test1"}]}, {} - ) - self.stubber.add_response( - "get_tables", - { - "TableList": [ - { - "Name": "csv", - "StorageDescriptor": { - "Columns": [{"Name": "region", "Type": "string"}], - "Location": "s3://bucket/files/", - "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", - "Compressed": False, - "NumberOfBuckets": 0, - "SerdeInfo": { - "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", - "Parameters": { - "field.delim": "|", - "skip.header.line.count": "1", - }, - }, - "SortColumns": [], - "StoredAsSubDirectories": False, - }, - "Parameters": {"classification": "csv"}, - } - ] - }, - {"DatabaseName": "test1"}, - ) - with self.stubber: - assert query_runner.get_schema() == [ - {"columns": ["region"], "name": "test1.csv"} - ] - - def test_no_storage_descriptor_table(self): - """ - For some reason, not all Glue tables contain a "StorageDescriptor" entry. - """ - query_runner = Athena({'glue': True, 'region': 'mars-east-1'}) - - self.stubber.add_response('get_databases', {'DatabaseList': [{'Name': 'test1'}]}, {}) - self.stubber.add_response( - 'get_tables', - { - 'TableList': [ - { - 'Name': 'no_storage_descriptor_table', - 'PartitionKeys': [], - 'TableType': 'EXTERNAL_TABLE', - 'Parameters': { - 'EXTERNAL': 'TRUE' - }, - } - ] - }, - {'DatabaseName': 'test1'}, - ) - with self.stubber: - assert query_runner.get_schema() == []