diff --git a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json index 1d82d43b11a1..47f7ff8a6f57 100644 --- a/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-salesforce/integration_tests/configured_catalog.json @@ -2,119 +2,68 @@ "streams": [ { "stream": { - "name": "Account", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], + "name": "Lead", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": true, + "properties": { + "Country": { + "type": [ + "string", + "null" + ] + }, + "country_iso_code__c": { + "type": [ + "string", + "null" + ] + }, + "Email": { + "type": [ + "string", + "null" + ] + }, + "Id": { + "type": [ + "string", + "null" + ] + }, + "LastModifiedDate": { + "format": "date-time", + "type": [ + "string", + "null" + ] + }, + "SystemModstamp": { + "format": "date-time", + "type": [ + "string", + "null" + ] + } + }, + "type": "object" + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActiveFeatureLicenseMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActivePermSetLicenseMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "ActiveProfileMetric", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "AppDefinition", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "Asset", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "FormulaFunctionAllowedType", - "json_schema": {}, - "supported_sync_modes": ["full_refresh"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "ObjectPermissions", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "PermissionSetTabSetting", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["SystemModstamp"], - "source_defined_primary_key": [["Id"]] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "LeadHistory", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["CreatedDate"], - "source_defined_primary_key": [["Id"]] + "default_cursor_field": [ + "CreatedDate" + ], + "source_defined_primary_key": [ + [ + "Id" + ] + ] }, "sync_mode": "full_refresh", "destination_sync_mode": "append" } ] -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py index e44adb3401b6..e98757f211ff 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py @@ -147,6 +147,12 @@ def prepare_stream(cls, stream_name: str, json_schema, sobject_options, sf_objec "authenticator": authenticator, "start_date": config.get("start_date"), } + + stream_filters = config.get("stream_filters") + if stream_filters is not None: + for filter in stream_filters: + if filter["stream_name"] == stream_name: + stream_kwargs["stream_filter"] = filter["filter_value"] api_type = cls._get_api_type(stream_name, json_schema, config.get("force_use_bulk_api", False)) full_refresh, incremental = cls._get_stream_type(stream_name, api_type) @@ -174,6 +180,11 @@ def generate_streams( for stream_name, sobject_options in stream_objects.items(): json_schema = schemas.get(stream_name, {}) + if self.catalog: + for catalog_stream in self.catalog.streams: + if stream_name == catalog_stream.stream.name and catalog_stream.stream.json_schema.get("properties", {}): + json_schema['properties'] = catalog_stream.stream.json_schema.get("properties", {}) + stream_class, kwargs = self.prepare_stream(stream_name, json_schema, sobject_options, *default_args) parent_name = PARENT_SALESFORCE_OBJECTS.get(stream_name, {}).get("parent_name") diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml index 1882e4e66c4a..6ec8878024d4 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml @@ -101,6 +101,21 @@ connectionSpecification: order: 2 title: Filter Salesforce Objects description: Add filters to select only required stream based on `SObject` name. Use this field to filter which tables are displayed by this connector. This is useful if your Salesforce account has a large number of tables (>1000), in which case you may find it easier to navigate the UI and speed up the connector's performance if you restrict the tables displayed by this connector. + stream_filters: + description: Add filters to sync only required records based on `SObject` name and available `SObject` attributes. + type: array + order: 9 + items: + type: object + properties: + stream_name: + type: string + title: Stream Name + order: 1 + filter_value: + type: string + title: Filter expression + order: 2 advanced_auth: auth_flow_type: oauth2.0 predicate_key: diff --git a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py index cbef66d52826..f7ae56a7197f 100644 --- a/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py +++ b/airbyte-integrations/connectors/source-salesforce/source_salesforce/streams.py @@ -63,6 +63,7 @@ def __init__( sobject_options: Mapping[str, Any] = None, schema: dict = None, start_date=None, + stream_filter: str = "", **kwargs, ): super().__init__(**kwargs) @@ -78,6 +79,7 @@ def __init__( session=self._session, # no need to specific api_budget and authenticator as HttpStream sets them in self._session error_handler=SalesforceErrorHandler(stream_name=self.stream_name, sobject_options=self.sobject_options), ) + self.stream_filter = stream_filter @staticmethod def format_start_date(start_date: Optional[str]) -> Optional[str]: @@ -190,12 +192,19 @@ def request_params( property_chunk = property_chunk or {} query = f"SELECT {','.join(property_chunk.keys())} FROM {self.name} " + where_conditions = [] if self.name in PARENT_SALESFORCE_OBJECTS: # add where clause: " WHERE ContentDocumentId IN ('06905000000NMXXXXX', ...)" parent_field = PARENT_SALESFORCE_OBJECTS[self.name]["field"] parent_ids = [f"'{parent_record[parent_field]}'" for parent_record in stream_slice["parents"]] - query += f" WHERE ContentDocumentId IN ({','.join(parent_ids)})" + where_conditions.append(f"ContentDocumentId IN ({','.join(parent_ids)})") + + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") + + if where_conditions: + query += f" WHERE {' AND '.join(where_conditions)}" if self.primary_key and self.name not in UNSUPPORTED_FILTERING_STREAMS: query += f"ORDER BY {self.primary_key} ASC" @@ -204,7 +213,6 @@ def request_params( def chunk_properties(self) -> Iterable[Mapping[str, Any]]: selected_properties = self.get_json_schema().get("properties", {}) - def empty_props_with_pk_if_present(): return {self.primary_key: selected_properties[self.primary_key]} if self.primary_key else {} @@ -589,13 +597,21 @@ def request_params( query = f"SELECT {select_fields} FROM {self.name}" if next_page_token: query += next_page_token["next_token"] + + where_conditions = [] if self.name in PARENT_SALESFORCE_OBJECTS: # add where clause: " WHERE ContentDocumentId IN ('06905000000NMXXXXX', '06905000000Mxp7XXX', ...)" parent_field = PARENT_SALESFORCE_OBJECTS[self.name]["field"] parent_ids = [f"'{parent_record[parent_field]}'" for parent_record in stream_slice["parents"]] - query += f" WHERE ContentDocumentId IN ({','.join(parent_ids)})" - + where_conditions.append(f"ContentDocumentId IN ({','.join(parent_ids)})") + + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") + + if where_conditions: + query += f" WHERE {' AND '.join(where_conditions)}" + return {"q": query} def read_records( @@ -696,12 +712,14 @@ class IncrementalRestSalesforceStream(RestSalesforceStream, CheckpointMixin, ABC state_checkpoint_interval = 500 _slice = None - def __init__(self, replication_key: str, stream_slice_step: str = "P30D", **kwargs): + def __init__(self, replication_key: str, stream_slice_step: str = "P30D", stream_filter: str = "", **kwargs): super().__init__(**kwargs) self.replication_key = replication_key self._stream_slice_step = stream_slice_step self._stream_slicer_cursor = None self._state = {} + self.stream_filter = stream_filter + def set_cursor(self, cursor: Cursor) -> None: self._stream_slicer_cursor = cursor @@ -752,6 +770,8 @@ def request_params( where_conditions.append(f"{self.cursor_field} >= {start_date}") if end_date: where_conditions.append(f"{self.cursor_field} < {end_date}") + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" @@ -798,8 +818,12 @@ def request_params( table_name = self.name where_conditions = [f"{self.cursor_field} >= {start_date}", f"{self.cursor_field} < {end_date}"] + if self.stream_filter: + where_conditions.append(f"{self.stream_filter}") + where_clause = f"WHERE {' AND '.join(where_conditions)}" query = f"SELECT {select_fields} FROM {table_name} {where_clause}" + return {"q": query}