From c90f365b2972f6ce3503a14229c3dfbc076d384f Mon Sep 17 00:00:00 2001 From: Agata Walukiewicz Date: Tue, 25 Jun 2024 18:46:54 +0200 Subject: [PATCH 1/5] add custom stream filter --- .../source_salesforce/source.py | 6 ++++ .../source_salesforce/spec.yaml | 15 ++++++++++ .../source_salesforce/streams.py | 30 +++++++++++++++++-- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index c3785f39236c..7f938fb11405 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -148,6 +148,12 @@ def prepare_stream(cls, stream_name: str, json_schema, sobject_options, sf_objec "authenticator": authenticator, "start_date": config.get("start_date"), } + + stream_filters = config.get("stream_filters") + if stream_filters is not None: + for filter in stream_filters: + if filter["stream_name"] == stream_name: + stream_kwargs["stream_filter"] = filter["filter_value"] api_type = cls._get_api_type(stream_name, json_schema, config.get("force_use_bulk_api", False)) full_refresh, incremental = cls._get_stream_type(stream_name, api_type) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index 1882e4e66c4a..6ec8878024d4 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -101,6 +101,21 @@ connectionSpecification: order: 2 title: Filter Salesforce Objects description: Add filters to select only required stream based on `SObject` name. Use this field to filter which tables are displayed by this connector. This is useful if your Salesforce account has a large number of tables (>1000), in which case you may find it easier to navigate the UI and speed up the connector's performance if you restrict the tables displayed by this connector. + stream_filters: + description: Add filters to sync only required records based on `SObject` name and available `SObject` attributes. + type: array + order: 9 + items: + type: object + properties: + stream_name: + type: string + title: Stream Name + order: 1 + filter_value: + type: string + title: Filter expression + order: 2 advanced_auth: auth_flow_type: oauth2.0 predicate_key: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 7c8fc6ab146e..84fea7f8a29a 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -64,6 +64,7 @@ def __init__( sobject_options: Mapping[str, Any] = None, schema: dict = None, start_date=None, + stream_filter: str = "", **kwargs, ): super().__init__(**kwargs) @@ -79,6 +80,7 @@ def __init__( session=self._session, # no need to specific api_budget and authenticator as HttpStream sets them in self._session error_handler=SalesforceErrorHandler(stream_name=self.stream_name, sobject_options=self.sobject_options), ) + self.stream_filter = stream_filter @staticmethod def format_start_date(start_date: Optional[str]) -> Optional[str]: @@ -191,12 +193,19 @@ def request_params( property_chunk = property_chunk or {} query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} " + where_conditions = [] if self.name in PARENT_SALESFORCE_OBJECTS: # add where clause: " WHERE ContentDocumentId IN ('06905000000NMXXXXX', ...)" parent_field = PARENT_SALESFORCE_OBJECTS[self.name]["field"] parent_ids = [f"'{parent_record[parent_field]}'" for parent_record in stream_slice["parents"]] - query += f" WHERE ContentDocumentId IN ({','.join(parent_ids)})" + where_conditions.append(f"ContentDocumentId IN ({','.join(parent_ids)})") + + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") + + if where_conditions: + query += f" WHERE {' AND '.join(where_conditions)}" if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: query += f"ORDER BY {self.primary_key} ASC" @@ -592,12 +601,20 @@ def request_params( query = f"SELECT {select_fields} FROM {self.name}" if next_page_token: query += next_page_token["next_token"] + + where_conditions = [] if self.name in PARENT_SALESFORCE_OBJECTS: # add where clause: " WHERE ContentDocumentId IN ('06905000000NMXXXXX', '06905000000Mxp7XXX', ...)" parent_field = PARENT_SALESFORCE_OBJECTS[self.name]["field"] parent_ids = [f"'{parent_record[parent_field]}'" for parent_record in stream_slice["parents"]] - query += f" WHERE ContentDocumentId IN ({','.join(parent_ids)})" + where_conditions.append(f"ContentDocumentId IN ({','.join(parent_ids)})") + + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") + + if where_conditions: + query += f" WHERE {' AND '.join(where_conditions)}" return {"q": query} @@ -699,12 +716,14 @@ class IncrementalRestSalesforceStream(RestSalesforceStream, CheckpointMixin, ABC state_checkpoint_interval = 500 _slice = None - def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwargs): + def __init__(self, replication_key: str, stream_slice_step: str = "P30D", stream_filter: str = "", **kwargs): super().__init__(**kwargs) self.replication_key = replication_key self._stream_slice_step = stream_slice_step self._stream_slicer_cursor = None self._state = {} + self.stream_filter = stream_filter + def set_cursor(self, cursor: Cursor) -> None: self._stream_slicer_cursor = cursor @@ -755,6 +774,8 @@ def request_params( where_conditions.append(f"{self.cursor_field} >= {start_date}") if end_date: where_conditions.append(f"{self.cursor_field} < {end_date}") + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" @@ -801,6 +822,9 @@ def request_params( table_name = self.name where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"] + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") + where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" return {"q": query} From f7ed8477e0fd80a1756b03df5d9b293a1829606a Mon Sep 17 00:00:00 2001 From: Agata Walukiewicz Date: Tue, 25 Jun 2024 19:03:12 +0200 Subject: [PATCH 2/5] bump SF version --- airbyte-integrations/connectors/source-salesforce/metadata.yaml | 2 +- .../connectors/source-salesforce/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/metadata.yaml b/airbyte-integrations/connectors/source-salesforce/metadata.yaml index 43c23651a6f5..0b1244876696 100644 --- a/airbyte-integrations/connectors/source-salesforce/metadata.yaml +++ b/airbyte-integrations/connectors/source-salesforce/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: b117307c-14b6-41aa-9422-947e34922962 - dockerImageTag: 2.5.16 + dockerImageTag: 2.5.17 dockerRepository: airbyte/source-salesforce documentationUrl: https://docs.airbyte.com/integrations/sources/salesforce githubIssueLabel: source-salesforce diff --git a/airbyte-integrations/connectors/source-salesforce/pyproject.toml b/airbyte-integrations/connectors/source-salesforce/pyproject.toml index f3aec4fd57dc..47270d2c3c0b 100644 --- a/airbyte-integrations/connectors/source-salesforce/pyproject.toml +++ b/airbyte-integrations/connectors/source-salesforce/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.5.16" +version = "2.5.17" name = "source-salesforce" description = "Source implementation for Salesforce." authors = [ "Airbyte ",] From 974ac76be0ec8583dedc7ce3bc170c5f6d2a86c5 Mon Sep 17 00:00:00 2001 From: Agata Walukiewicz Date: Thu, 27 Jun 2024 17:42:48 +0200 Subject: [PATCH 3/5] add logger --- .../source-salesforce/source_salesforce/streams.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 84fea7f8a29a..3c9988f404a0 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -210,6 +210,7 @@ def request_params( if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: query += f"ORDER BY {self.primary_key} ASC" + self.logger.warning(f"Used SOQL query {query}") return {"q": query} def chunk_properties(self) -> Iterable[Mapping[str, Any]]: @@ -615,7 +616,8 @@ def request_params( if where_conditions: query += f" WHERE {' AND '.join(where_conditions)}" - + + self.logger.warning(f"Used SOQL query {query}") return {"q": query} def read_records( @@ -779,6 +781,7 @@ def request_params( where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" + self.logger.warning(f"Used SOQL query {query}") return {"q": query} @@ -827,6 +830,8 @@ def request_params( where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" + + self.logger.warning(f"Used SOQL query {query}") return {"q": query} From 2885183f4b2236a084ae3b1f98dfe31872e8a3a2 Mon Sep 17 00:00:00 2001 From: Agata Walukiewicz Date: Wed, 3 Jul 2024 14:17:13 +0200 Subject: [PATCH 4/5] use catalog where possible instead of the default SF schema --- .../integration_tests/configured_catalog.json | 169 ++++++------------ .../source_salesforce/source.py | 6 + .../source_salesforce/streams.py | 5 - 3 files changed, 65 insertions(+), 115 deletions(-) diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json index 1d82d43b11a1..47f7ff8a6f57 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json @@ -2,119 +2,68 @@ "streams": [ { "stream": { - "name": "Account", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], + "name": "Lead", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "Country": { + "type": [ + "string", + "null" + ] + }, + "country_iso_code__c": { + "type": [ + "string", + "null" + ] + }, + "Email": { + "type": [ + "string", + "null" + ] + }, + "Id": { + "type": [ + "string", + "null" + ] + }, + "LastModifiedDate": { + "format": "date-time", + "type": [ + "string", + "null" + ] + }, + "SystemModstamp": { + "format": "date-time", + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActiveFeatureLicenseMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActivePermSetLicenseMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActiveProfileMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "AppDefinition", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "Asset", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "FormulaFunctionAllowedType", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "ObjectPermissions", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "PermissionSetTabSetting", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "LeadHistory", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["CreatedDate"], - "source_defined_primary_key": [["Id"]] + "default_cursor_field": [ + "CreatedDate" + ], + "source_defined_primary_key": [ + [ + "Id" + ] + ] }, "sync_mode": "full_refresh", "destination_sync_mode": "append" } ] -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index 7f938fb11405..cb5387d79f28 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -181,6 +181,12 @@ def generate_streams( for stream_name, sobject_options in stream_objects.items(): json_schema = schemas.get(stream_name, {}) + if self.catalog: + for catalog_stream in self.catalog.streams: + if stream_name == catalog_stream.stream.name and catalog_stream.stream.json_schema.get("properties", {}): + json_schema['properties'] = catalog_stream.stream.json_schema.get("properties", {}) + + logger.warning(f"JSON schema used for the stream {stream_name}: {json_schema}") stream_class, kwargs = self.prepare_stream(stream_name, json_schema, sobject_options, *default_args) parent_name = PARENT_SALESFORCE_OBJECTS.get(stream_name, {}).get("parent_name") diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index 3c9988f404a0..04569a088a54 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -210,12 +210,10 @@ def request_params( if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: query += f"ORDER BY {self.primary_key} ASC" - self.logger.warning(f"Used SOQL query {query}") return {"q": query} def chunk_properties(self) -> Iterable[Mapping[str, Any]]: selected_properties = self.get_json_schema().get("properties", {}) - def empty_props_with_pk_if_present(): return {self.primary_key: selected_properties[self.primary_key]} if self.primary_key else {} @@ -617,7 +615,6 @@ def request_params( if where_conditions: query += f" WHERE {' AND '.join(where_conditions)}" - self.logger.warning(f"Used SOQL query {query}") return {"q": query} def read_records( @@ -781,7 +778,6 @@ def request_params( where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" - self.logger.warning(f"Used SOQL query {query}") return {"q": query} @@ -831,7 +827,6 @@ def request_params( where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" - self.logger.warning(f"Used SOQL query {query}") return {"q": query} From 2a6f86711ec4cb8b96e226c3acbdaa1100e65675 Mon Sep 17 00:00:00 2001 From: Agata Walukiewicz Date: Wed, 3 Jul 2024 15:01:32 +0200 Subject: [PATCH 5/5] remove warning log --- .../connectors/source-salesforce/source_salesforce/source.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index cb5387d79f28..0d697839172e 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -186,7 +186,6 @@ def generate_streams( if stream_name == catalog_stream.stream.name and catalog_stream.stream.json_schema.get("properties", {}): json_schema['properties'] = catalog_stream.stream.json_schema.get("properties", {}) - logger.warning(f"JSON schema used for the stream {stream_name}: {json_schema}") stream_class, kwargs = self.prepare_stream(stream_name, json_schema, sobject_options, *default_args) parent_name = PARENT_SALESFORCE_OBJECTS.get(stream_name, {}).get("parent_name")