Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ def read_stream(
stream_name = configured_catalog.streams[0].stream.name

stream_read = test_read_handler.run_test_read(
source, config, configured_catalog, state, limits.max_records
source,
config,
configured_catalog,
stream_name,
state,
limits.max_records,
)

return AirbyteMessage(
Expand Down
31 changes: 31 additions & 0 deletions airbyte_cdk/connector_builder/test_reader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,37 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby
return at_least_one_page_in_group and should_process_slice_descriptor(message)


def is_page_http_request_for_different_stream(
json_message: Optional[Dict[str, Any]], stream_name: str
) -> bool:
"""
Determines whether a given JSON message represents a page HTTP request for a different stream.

This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is
different from the provided stream name.

This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore
when they do not match the stream that is being read.

Args:
json_message (Optional[Dict[str, Any]]): The JSON message to evaluate.
stream_name (str): The name of the stream to compare against.

Returns:
bool: True if the JSON message is a page HTTP request for a different stream, False otherwise.
"""
if not json_message or not is_page_http_request(json_message):
return False

message_stream_name: str | None = (
json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None)
)
if message_stream_name is None:
return False

return message_stream_name != stream_name


def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
"""
Determines whether a given JSON message represents a page HTTP request.
Expand Down
5 changes: 5 additions & 0 deletions airbyte_cdk/connector_builder/test_reader/message_grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
is_async_auxiliary_request,
is_config_update_message,
is_log_message,
is_page_http_request_for_different_stream,
is_record_message,
is_state_message,
is_trace_with_error,
Expand All @@ -44,6 +45,7 @@ def get_message_groups(
schema_inferrer: SchemaInferrer,
datetime_format_inferrer: DatetimeFormatInferrer,
limit: int,
stream_name: str,
) -> MESSAGE_GROUPS:
"""
Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence.
Expand Down Expand Up @@ -96,6 +98,9 @@ def get_message_groups(
while records_count < limit and (message := next(messages, None)):
json_message = airbyte_message_to_json(message)

if is_page_http_request_for_different_stream(json_message, stream_name):
continue

if should_close_page(at_least_one_page_in_group, message, json_message):
current_page_request, current_page_response = handle_current_page(
current_page_request,
Expand Down
11 changes: 8 additions & 3 deletions airbyte_cdk/connector_builder/test_reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def run_test_read(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
stream_name: str,
state: List[AirbyteStateMessage],
record_limit: Optional[int] = None,
) -> StreamRead:
Expand All @@ -112,14 +113,17 @@ def run_test_read(

record_limit = self._check_record_limit(record_limit)
# The connector builder currently only supports reading from a single stream at a time
stream = source.streams(config)[0]
streams = source.streams(config)
stream = next((stream for stream in streams if stream.name == stream_name), None)

# get any deprecation warnings during the component creation
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()

schema_inferrer = SchemaInferrer(
self._pk_to_nested_and_composite_field(stream.primary_key),
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
if stream
else None,
)
datetime_format_inferrer = DatetimeFormatInferrer()

Expand All @@ -128,6 +132,7 @@ def run_test_read(
schema_inferrer,
datetime_format_inferrer,
record_limit,
stream_name,
)

slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
Expand Down
18 changes: 13 additions & 5 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,19 @@ def _dynamic_stream_configs(
components_resolver_config["retriever"]["requester"]["use_cache"] = True

# Create a resolver for dynamic components based on type
components_resolver = self._constructor.create_component(
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
components_resolver_config,
config,
)
if resolver_type == "HttpComponentsResolver":
components_resolver = self._constructor.create_component(
model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
component_definition=components_resolver_config,
config=config,
stream_name=dynamic_definition.get("name"),
)
else:
components_resolver = self._constructor.create_component(
model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
component_definition=components_resolver_config,
config=config,
)

stream_template_config = dynamic_definition["stream_template"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3493,10 +3493,11 @@ def _get_download_retriever() -> SimpleRetriever:
requester=download_requester,
record_selector=record_selector,
primary_key=None,
name=job_download_components_name,
name=name,
paginator=paginator,
config=config,
parameters={},
log_formatter=self._get_log_formatter(None, name),
)

def _get_job_timeout() -> datetime.timedelta:
Expand Down Expand Up @@ -3805,15 +3806,15 @@ def create_components_mapping_definition(
)

def create_http_components_resolver(
self, model: HttpComponentsResolverModel, config: Config
self, model: HttpComponentsResolverModel, config: Config, stream_name: Optional[str] = None
) -> Any:
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)

retriever = self._create_component_from_model(
model=model.retriever,
config=config,
name="",
name=f"{stream_name if stream_name else '__http_components_resolver'}",
primary_key=None,
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
transformations=[],
Expand Down Expand Up @@ -3890,7 +3891,9 @@ def create_config_components_resolver(
)

def create_parametrized_components_resolver(
self, model: ParametrizedComponentsResolverModel, config: Config
self,
model: ParametrizedComponentsResolverModel,
config: Config,
) -> ParametrizedComponentsResolver:
stream_parameters = StreamParametersDefinition(
list_of_parameters_for_stream=model.stream_parameters.list_of_parameters_for_stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ def test_read():
source,
config,
ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG),
_stream_name,
_A_STATE,
limits.max_records,
)
Expand Down Expand Up @@ -812,6 +813,10 @@ def primary_key(self):
def cursor_field(self):
return []

@property
def name(self):
return _stream_name

class MockManifestDeclarativeSource:
def streams(self, config):
return [MockDeclarativeStream()]
Expand Down
Loading
Loading