Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ def read_stream(
),
)
except Exception as exc:
# - message: user-friendly error for display
# - internal_message: technical details for debugging (including config/catalog)
error = AirbyteTracedException.from_exception(
exc,
message=filter_secrets(
f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
),
message=filter_secrets(f"Error reading stream {stream_name}: {str(exc)}"),
)
# Override internal_message to include context for debugging
error.internal_message = filter_secrets(
f"Error reading stream {stream_name} with config={config} and catalog={configured_catalog}: {str(exc)}"
)
Comment on lines +120 to 129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent UnboundLocalError when stream_name is unavailable.

Lines 124-128 assume stream_name was set earlier, but if the exception fires before that assignment (e.g., the catalog has zero streams or run_test_read raises during evaluation of the name) the except block will hit an UnboundLocalError, masking the real failure instead of emitting the sanitized trace. Could we stash the name ahead of the try/except and fall back to a sentinel when it's missing so the handler still returns a useful error message, wdyt?

 def read_stream(
     source: ConcurrentDeclarativeSource,
     config: Mapping[str, Any],
     configured_catalog: ConfiguredAirbyteCatalog,
     state: List[AirbyteStateMessage],
     limits: TestLimits,
 ) -> AirbyteMessage:
-    try:
+    stream_name: str | None = None
+    try:
         test_read_handler = TestReader(
             limits.max_pages_per_slice, limits.max_slices, limits.max_records
         )
         # The connector builder only supports a single stream
         stream_name = configured_catalog.streams[0].stream.name
@@
-    except Exception as exc:
+    except Exception as exc:
+        safe_stream_name = stream_name or "<unknown stream>"
         # - message: user-friendly error for display
         # - internal_message: technical details for debugging (including config/catalog)
         error = AirbyteTracedException.from_exception(
             exc,
-            message=filter_secrets(f"Error reading stream {stream_name}: {str(exc)}"),
+            message=filter_secrets(f"Error reading stream {safe_stream_name}: {str(exc)}"),
         )
         # Override internal_message to include context for debugging
         error.internal_message = filter_secrets(
-            f"Error reading stream {stream_name} with config={config} and catalog={configured_catalog}: {str(exc)}"
+            f"Error reading stream {safe_stream_name} with config={config} and catalog={configured_catalog}: {str(exc)}"
         )
         return error.as_airbyte_message()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# - message: user-friendly error for display
# - internal_message: technical details for debugging (including config/catalog)
error = AirbyteTracedException.from_exception(
exc,
message=filter_secrets(
f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
),
message=filter_secrets(f"Error reading stream {stream_name}: {str(exc)}"),
)
# Override internal_message to include context for debugging
error.internal_message = filter_secrets(
f"Error reading stream {stream_name} with config={config} and catalog={configured_catalog}: {str(exc)}"
)
def read_stream(
source: ConcurrentDeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
limits: TestLimits,
) -> AirbyteMessage:
stream_name: str | None = None
try:
test_read_handler = TestReader(
limits.max_pages_per_slice, limits.max_slices, limits.max_records
)
# The connector builder only supports a single stream
stream_name = configured_catalog.streams[0].stream.name
# ... any additional logic here ...
except Exception as exc:
safe_stream_name = stream_name or "<unknown stream>"
# - message: user-friendly error for display
# - internal_message: technical details for debugging (including config/catalog)
error = AirbyteTracedException.from_exception(
exc,
message=filter_secrets(f"Error reading stream {safe_stream_name}: {str(exc)}"),
)
# Override internal_message to include context for debugging
error.internal_message = filter_secrets(
f"Error reading stream {safe_stream_name} with config={config} and catalog={configured_catalog}: {str(exc)}"
)
return error.as_airbyte_message()
🤖 Prompt for AI Agents
In airbyte_cdk/connector_builder/connector_builder_handler.py around lines 120
to 129, the except block references stream_name which can be unassigned if the
exception occurs before it is set; stash a local sentinel variable (e.g.,
safe_stream_name = "<unknown_stream>") before the try/except and update any
references in the AirbyteTracedException.from_exception call and
error.internal_message to use safe_stream_name so an UnboundLocalError cannot
mask the original exception and the handler still returns a sanitized,
contextual error.

return error.as_airbyte_message()

Expand Down
69 changes: 69 additions & 0 deletions unit_tests/connector_builder/test_connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,75 @@ def test_read_stream_exception_with_secrets():
assert "super_secret_key" not in response.trace.error.message


def test_read_stream_error_message_does_not_contain_config_and_catalog():
"""
Test that error messages in read_stream are clean and user-friendly,
without embedding verbose config and catalog information.

This test verifies that:
1. The user-facing `message` is clean and doesn't contain config/catalog dumps
2. The technical `internal_message` still contains full context for debugging
"""
# Create a config and catalog with identifiable content
config = {
"__injected_declarative_manifest": "test_manifest",
"verbose_config_data": "this_should_not_appear_in_user_message",
"api_key": "secret_key_value",
}
catalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=AirbyteStream(
name=_stream_name,
json_schema={"properties": {"verbose_catalog_schema": {"type": "string"}}},
supported_sync_modes=[SyncMode.full_refresh],
),
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append,
)
]
)
state = []
limits = TestLimits()

# Mock the source
mock_source = MagicMock()

with patch(
"airbyte_cdk.connector_builder.test_reader.TestReader.run_test_read"
) as mock_handler:
# Simulate a common error like a datetime parsing error
mock_handler.side_effect = ValueError(
"time data '' does not match format '%Y-%m-%dT%H:%M:%SZ'"
)

# Call the read_stream function
response = read_stream(mock_source, config, catalog, state, limits)

# Verify it's a trace message with an error
assert response.type == Type.TRACE
assert response.trace.type.value == "ERROR"

# The user-facing message should be clean - no config or catalog dumps
user_message = response.trace.error.message
assert "verbose_config_data" not in user_message
assert "verbose_catalog_schema" not in user_message
assert "__injected_declarative_manifest" not in user_message

# But it should contain the actual error
stream_name = catalog.streams[0].stream.name
assert (
user_message
== f"Error reading stream {stream_name}: time data '' does not match format '%Y-%m-%dT%H:%M:%SZ'"
)

# The internal message should contain technical details for debugging
internal_message = response.trace.error.internal_message
assert "verbose_config_data" in internal_message
assert "verbose_catalog_schema" in internal_message
assert f"Error reading stream {stream_name} with config=" in internal_message


def test_full_resolve_manifest(valid_resolve_manifest_config_file):
config = copy.deepcopy(RESOLVE_DYNAMIC_STREAM_MANIFEST_CONFIG)
command = config["__command"]
Expand Down
Loading