diff --git a/airbyte_cdk/connector_builder/connector_builder_handler.py b/airbyte_cdk/connector_builder/connector_builder_handler.py index bb6b0929a..513546737 100644 --- a/airbyte_cdk/connector_builder/connector_builder_handler.py +++ b/airbyte_cdk/connector_builder/connector_builder_handler.py @@ -108,7 +108,12 @@ def read_stream( stream_name = configured_catalog.streams[0].stream.name stream_read = test_read_handler.run_test_read( - source, config, configured_catalog, state, limits.max_records + source, + config, + configured_catalog, + stream_name, + state, + limits.max_records, ) return AirbyteMessage( diff --git a/airbyte_cdk/connector_builder/test_reader/helpers.py b/airbyte_cdk/connector_builder/test_reader/helpers.py index fcd36189f..9154610cc 100644 --- a/airbyte_cdk/connector_builder/test_reader/helpers.py +++ b/airbyte_cdk/connector_builder/test_reader/helpers.py @@ -269,6 +269,37 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby return at_least_one_page_in_group and should_process_slice_descriptor(message) +def is_page_http_request_for_different_stream( + json_message: Optional[Dict[str, Any]], stream_name: str +) -> bool: + """ + Determines whether a given JSON message represents a page HTTP request for a different stream. + + This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is + different from the provided stream name. + + This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore + when they do not match the stream that is being read. + + Args: + json_message (Optional[Dict[str, Any]]): The JSON message to evaluate. + stream_name (str): The name of the stream to compare against. + + Returns: + bool: True if the JSON message is a page HTTP request for a different stream, False otherwise. + """ + if not json_message or not is_page_http_request(json_message): + return False + + message_stream_name: str | None = ( + json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None) + ) + if message_stream_name is None: + return False + + return message_stream_name != stream_name + + def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool: """ Determines whether a given JSON message represents a page HTTP request. diff --git a/airbyte_cdk/connector_builder/test_reader/message_grouper.py b/airbyte_cdk/connector_builder/test_reader/message_grouper.py index e4478a0ad..33b594451 100644 --- a/airbyte_cdk/connector_builder/test_reader/message_grouper.py +++ b/airbyte_cdk/connector_builder/test_reader/message_grouper.py @@ -28,6 +28,7 @@ is_async_auxiliary_request, is_config_update_message, is_log_message, + is_page_http_request_for_different_stream, is_record_message, is_state_message, is_trace_with_error, @@ -44,6 +45,7 @@ def get_message_groups( schema_inferrer: SchemaInferrer, datetime_format_inferrer: DatetimeFormatInferrer, limit: int, + stream_name: str, ) -> MESSAGE_GROUPS: """ Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence. @@ -96,6 +98,9 @@ def get_message_groups( while records_count < limit and (message := next(messages, None)): json_message = airbyte_message_to_json(message) + if is_page_http_request_for_different_stream(json_message, stream_name): + continue + if should_close_page(at_least_one_page_in_group, message, json_message): current_page_request, current_page_response = handle_current_page( current_page_request, diff --git a/airbyte_cdk/connector_builder/test_reader/reader.py b/airbyte_cdk/connector_builder/test_reader/reader.py index ea6e960c2..5c16798a2 100644 --- a/airbyte_cdk/connector_builder/test_reader/reader.py +++ b/airbyte_cdk/connector_builder/test_reader/reader.py @@ -86,6 +86,7 @@ def run_test_read( source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, + stream_name: str, state: List[AirbyteStateMessage], record_limit: Optional[int] = None, ) -> StreamRead: @@ -112,14 +113,17 @@ def run_test_read( record_limit = self._check_record_limit(record_limit) # The connector builder currently only supports reading from a single stream at a time - stream = source.streams(config)[0] + streams = source.streams(config) + stream = next((stream for stream in streams if stream.name == stream_name), None) # get any deprecation warnings during the component creation deprecation_warnings: List[LogMessage] = source.deprecation_warnings() schema_inferrer = SchemaInferrer( - self._pk_to_nested_and_composite_field(stream.primary_key), - self._cursor_field_to_nested_and_composite_field(stream.cursor_field), + self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None, + self._cursor_field_to_nested_and_composite_field(stream.cursor_field) + if stream + else None, ) datetime_format_inferrer = DatetimeFormatInferrer() @@ -128,6 +132,7 @@ def run_test_read( schema_inferrer, datetime_format_inferrer, record_limit, + stream_name, ) slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 7452d5c68..e962f3813 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -542,11 +542,19 @@ def _dynamic_stream_configs( components_resolver_config["retriever"]["requester"]["use_cache"] = True # Create a resolver for dynamic components based on type - components_resolver = self._constructor.create_component( - COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], - components_resolver_config, - config, - ) + if resolver_type == "HttpComponentsResolver": + components_resolver = self._constructor.create_component( + model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], + component_definition=components_resolver_config, + config=config, + stream_name=dynamic_definition.get("name"), + ) + else: + components_resolver = self._constructor.create_component( + model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], + component_definition=components_resolver_config, + config=config, + ) stream_template_config = dynamic_definition["stream_template"] diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c4b9d18f6..628bea575 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3493,10 +3493,11 @@ def _get_download_retriever() -> SimpleRetriever: requester=download_requester, record_selector=record_selector, primary_key=None, - name=job_download_components_name, + name=name, paginator=paginator, config=config, parameters={}, + log_formatter=self._get_log_formatter(None, name), ) def _get_job_timeout() -> datetime.timedelta: @@ -3805,7 +3806,7 @@ def create_components_mapping_definition( ) def create_http_components_resolver( - self, model: HttpComponentsResolverModel, config: Config + self, model: HttpComponentsResolverModel, config: Config, stream_name: Optional[str] = None ) -> Any: stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer) @@ -3813,7 +3814,7 @@ def create_http_components_resolver( retriever = self._create_component_from_model( model=model.retriever, config=config, - name="", + name=f"{stream_name if stream_name else '__http_components_resolver'}", primary_key=None, stream_slicer=stream_slicer if stream_slicer else combined_slicers, transformations=[], @@ -3890,7 +3891,9 @@ def create_config_components_resolver( ) def create_parametrized_components_resolver( - self, model: ParametrizedComponentsResolverModel, config: Config + self, + model: ParametrizedComponentsResolverModel, + config: Config, ) -> ParametrizedComponentsResolver: stream_parameters = StreamParametersDefinition( list_of_parameters_for_stream=model.stream_parameters.list_of_parameters_for_stream diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index cd0f8f9b1..2587fb95a 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -752,6 +752,7 @@ def test_read(): source, config, ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG), + _stream_name, _A_STATE, limits.max_records, ) @@ -812,6 +813,10 @@ def primary_key(self): def cursor_field(self): return [] + @property + def name(self): + return _stream_name + class MockManifestDeclarativeSource: def streams(self, config): return [MockDeclarativeStream()] diff --git a/unit_tests/connector_builder/test_message_grouper.py b/unit_tests/connector_builder/test_message_grouper.py index c40514a27..6c4f11526 100644 --- a/unit_tests/connector_builder/test_message_grouper.py +++ b/unit_tests/connector_builder/test_message_grouper.py @@ -147,6 +147,7 @@ @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -194,11 +195,11 @@ def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho", "date": "2023-03-03"}), - record_message("hashiras", {"name": "Muichiro Tokito", "date": "2023-03-04"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji", "date": "2023-03-05"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho", "date": "2023-03-03"}), + record_message(stream_name, {"name": "Muichiro Tokito", "date": "2023-03-04"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji", "date": "2023-03-05"}), ] ), ) @@ -207,7 +208,8 @@ def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: actual_response: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -221,6 +223,7 @@ def test_get_grouped_messages(mock_entrypoint_read: Mock) -> None: @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -270,13 +273,13 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: level=Level.INFO, message="log message before the request" ), ), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message="log message during the page"), ), - record_message("hashiras", {"name": "Muichiro Tokito"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), AirbyteMessage( type=MessageType.LOG, log=AirbyteLogMessage( @@ -292,7 +295,8 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: actual_response: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) single_slice = actual_response.slices[0] @@ -314,6 +318,7 @@ def test_get_grouped_messages_with_logs(mock_entrypoint_read: Mock) -> None: def test_get_grouped_messages_record_limit( mock_entrypoint_read: Mock, request_record_limit: int, max_record_limit: int, should_fail: bool ) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -329,11 +334,11 @@ def test_get_grouped_messages_record_limit( mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Muichiro Tokito"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), ] ), ) @@ -347,7 +352,8 @@ def test_get_grouped_messages_record_limit( api.run_test_read( mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, record_limit=request_record_limit, ) @@ -355,7 +361,8 @@ def test_get_grouped_messages_record_limit( actual_response: StreamRead = api.run_test_read( mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, record_limit=request_record_limit, ) @@ -379,6 +386,7 @@ def test_get_grouped_messages_record_limit( def test_get_grouped_messages_default_record_limit( mock_entrypoint_read: Mock, max_record_limit: int ) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -394,11 +402,11 @@ def test_get_grouped_messages_default_record_limit( mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Muichiro Tokito"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), ] ), ) @@ -408,7 +416,8 @@ def test_get_grouped_messages_default_record_limit( actual_response: StreamRead = api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) single_slice = actual_response.slices[0] @@ -420,6 +429,7 @@ def test_get_grouped_messages_default_record_limit( @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -435,11 +445,11 @@ def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Muichiro Tokito"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), ] ), ) @@ -449,7 +459,8 @@ def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, record_limit=0, ) @@ -457,6 +468,7 @@ def test_get_grouped_messages_limit_0(mock_entrypoint_read: Mock) -> None: @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "https://demonslayers.com/api/v1/hashiras?era=taisho" request = { "headers": {"Content-Type": "application/json"}, @@ -495,8 +507,8 @@ def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None: mock_entrypoint_read, iter( [ - request_response_log_message(request, response, url), - request_response_log_message(request, response, url), + request_response_log_message(request, response, url, stream_name), + request_response_log_message(request, response, url, stream_name), ] ), ) @@ -506,7 +518,8 @@ def test_get_grouped_messages_no_records(mock_entrypoint_read: Mock) -> None: actual_response: StreamRead = message_grouper.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -591,6 +604,7 @@ def test_create_response_from_log_message( @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" url = "http://a-url.com" request: Mapping[str, Any] = {} response = {"status_code": 200} @@ -600,16 +614,16 @@ def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> No iter( [ slice_message('{"descriptor": "first_slice"}'), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Muichiro Tokito"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Muichiro Tokito"}), slice_message('{"descriptor": "second_slice"}'), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Shinobu Kocho"}), - record_message("hashiras", {"name": "Mitsuri Kanroji"}), - request_response_log_message(request, response, url), - record_message("hashiras", {"name": "Obanai Iguro"}), - request_response_log_message(request, response, url), - state_message("hashiras", {"a_timestamp": 123}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Shinobu Kocho"}), + record_message(stream_name, {"name": "Mitsuri Kanroji"}), + request_response_log_message(request, response, url, stream_name), + record_message(stream_name, {"name": "Obanai Iguro"}), + request_response_log_message(request, response, url, stream_name), + state_message(stream_name, {"a_timestamp": 123}), ] ), ) @@ -619,7 +633,8 @@ def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> No stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -647,13 +662,14 @@ def test_get_grouped_messages_with_many_slices(mock_entrypoint_read: Mock) -> No def test_get_grouped_messages_given_maximum_number_of_slices_then_test_read_limit_reached( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" maximum_number_of_slices = 5 request: Mapping[str, Any] = {} response = {"status_code": 200} mock_source = make_mock_source( mock_entrypoint_read, iter( - [slice_message(), request_response_log_message(request, response, "a_url")] + [slice_message(), request_response_log_message(request, response, "a_url", stream_name)] * maximum_number_of_slices ), ) @@ -663,7 +679,8 @@ def test_get_grouped_messages_given_maximum_number_of_slices_then_test_read_limi stream_read: StreamRead = api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -674,6 +691,7 @@ def test_get_grouped_messages_given_maximum_number_of_slices_then_test_read_limi def test_get_grouped_messages_given_maximum_number_of_pages_then_test_read_limit_reached( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" maximum_number_of_pages_per_slice = 5 request: Mapping[str, Any] = {} response = {"status_code": 200} @@ -681,7 +699,7 @@ def test_get_grouped_messages_given_maximum_number_of_pages_then_test_read_limit mock_entrypoint_read, iter( [slice_message()] - + [request_response_log_message(request, response, "a_url")] + + [request_response_log_message(request, response, "a_url", stream_name)] * maximum_number_of_pages_per_slice ), ) @@ -691,7 +709,8 @@ def test_get_grouped_messages_given_maximum_number_of_pages_then_test_read_limit stream_read: StreamRead = api.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -710,6 +729,7 @@ def test_read_stream_returns_error_if_stream_does_not_exist() -> None: source=mock_source, config=full_config, configured_catalog=create_configured_catalog("not_in_manifest"), + stream_name="not_in_manifest", state=_NO_STATE, ) @@ -722,6 +742,7 @@ def test_read_stream_returns_error_if_stream_does_not_exist() -> None: def test_given_control_message_then_stream_read_has_config_update( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" updated_config = {"x": 1} mock_source = make_mock_source( mock_entrypoint_read, @@ -734,7 +755,8 @@ def test_given_control_message_then_stream_read_has_config_update( stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -745,6 +767,7 @@ def test_given_control_message_then_stream_read_has_config_update( def test_given_multiple_control_messages_then_stream_read_has_latest_based_on_emitted_at( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" earliest = 0 earliest_config = {"earliest": 0} latest = 1 @@ -764,7 +787,8 @@ def test_given_multiple_control_messages_then_stream_read_has_latest_based_on_em stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -775,6 +799,7 @@ def test_given_multiple_control_messages_then_stream_read_has_latest_based_on_em def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_has_latest_based_on_message_order( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" emitted_at = 0 earliest_config = {"earliest": 0} latest_config = {"latest": 1} @@ -792,7 +817,8 @@ def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_ha stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -801,6 +827,7 @@ def test_given_multiple_control_messages_with_same_timestamp_then_stream_read_ha @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" mock_source = make_mock_source( mock_entrypoint_read, iter(any_request_and_response_with_a_record() + [auxiliary_request_log_message()]), @@ -809,7 +836,8 @@ def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_ stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -818,12 +846,14 @@ def test_given_auxiliary_requests_then_return_auxiliary_request(mock_entrypoint_ @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_given_no_slices_then_return_empty_slices(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" mock_source = make_mock_source(mock_entrypoint_read, iter([auxiliary_request_log_message()])) connector_builder_handler = TestReader(MAX_PAGES_PER_SLICE, MAX_SLICES) stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -832,17 +862,21 @@ def test_given_no_slices_then_return_empty_slices(mock_entrypoint_read: Mock) -> @patch("airbyte_cdk.connector_builder.test_reader.reader.AirbyteEntrypoint.read") def test_given_pk_then_ensure_pk_is_pass_to_schema_inferrence(mock_entrypoint_read: Mock) -> None: + stream_name = "hashiras" mock_source = make_mock_source( mock_entrypoint_read, iter( [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com"), - record_message("hashiras", {"id": "Shinobu Kocho", "date": "2023-03-03"}), - record_message("hashiras", {"id": "Muichiro Tokito", "date": "2023-03-04"}), + request_response_log_message( + {"request": 1}, {"response": 2}, "http://any_url.com", stream_name + ), + record_message(stream_name, {"id": "Shinobu Kocho", "date": "2023-03-03"}), + record_message(stream_name, {"id": "Muichiro Tokito", "date": "2023-03-04"}), ] ), ) mock_source.streams.return_value = [Mock()] + mock_source.streams.return_value[0].name = stream_name mock_source.streams.return_value[0].primary_key = [["id"]] mock_source.streams.return_value[0].cursor_field = _NO_CURSOR_FIELD connector_builder_handler = TestReader(MAX_PAGES_PER_SLICE, MAX_SLICES) @@ -850,7 +884,8 @@ def test_given_pk_then_ensure_pk_is_pass_to_schema_inferrence(mock_entrypoint_re stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -861,17 +896,21 @@ def test_given_pk_then_ensure_pk_is_pass_to_schema_inferrence(mock_entrypoint_re def test_given_cursor_field_then_ensure_cursor_field_is_pass_to_schema_inferrence( mock_entrypoint_read: Mock, ) -> None: + stream_name = "hashiras" mock_source = make_mock_source( mock_entrypoint_read, iter( [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com"), - record_message("hashiras", {"id": "Shinobu Kocho", "date": "2023-03-03"}), - record_message("hashiras", {"id": "Muichiro Tokito", "date": "2023-03-04"}), + request_response_log_message( + {"request": 1}, {"response": 2}, "http://any_url.com", stream_name + ), + record_message(stream_name, {"id": "Shinobu Kocho", "date": "2023-03-03"}), + record_message(stream_name, {"id": "Muichiro Tokito", "date": "2023-03-04"}), ] ), ) mock_source.streams.return_value = [Mock()] + mock_source.streams.return_value[0].name = stream_name mock_source.streams.return_value[0].primary_key = _NO_PK mock_source.streams.return_value[0].cursor_field = ["date"] connector_builder_handler = TestReader(MAX_PAGES_PER_SLICE, MAX_SLICES) @@ -879,7 +918,8 @@ def test_given_cursor_field_then_ensure_cursor_field_is_pass_to_schema_inferrenc stream_read: StreamRead = connector_builder_handler.run_test_read( source=mock_source, config=CONFIG, - configured_catalog=create_configured_catalog("hashiras"), + configured_catalog=create_configured_catalog(stream_name), + stream_name=stream_name, state=_NO_STATE, ) @@ -976,7 +1016,7 @@ def auxiliary_request_log_message() -> AirbyteMessage: def request_response_log_message( - request: Mapping[str, Any], response: Mapping[str, Any], url: str + request: Mapping[str, Any], response: Mapping[str, Any], url: str, stream_name: str ) -> AirbyteMessage: return AirbyteMessage( type=MessageType.LOG, @@ -984,7 +1024,7 @@ def request_response_log_message( level=Level.INFO, message=json.dumps( { - "airbyte_cdk": {"stream": {"name": "a stream name"}}, + "airbyte_cdk": {"stream": {"name": stream_name}}, "http": { "title": "a title", "description": "a description", @@ -1000,6 +1040,8 @@ def request_response_log_message( def any_request_and_response_with_a_record() -> List[AirbyteMessage]: return [ - request_response_log_message({"request": 1}, {"response": 2}, "http://any_url.com"), + request_response_log_message( + {"request": 1}, {"response": 2}, "http://any_url.com", "hashiras" + ), record_message("hashiras", {"name": "Shinobu Kocho"}), ]