diff --git a/SPIKE_INVESTIGATION.md b/SPIKE_INVESTIGATION.md new file mode 100644 index 000000000..c986ef337 --- /dev/null +++ b/SPIKE_INVESTIGATION.md @@ -0,0 +1,68 @@ +# Spike Investigation: StreamThreadException in Bing Ads Source + +## Issue Summary +- **Issue**: [#8301](https://github.com/airbytehq/oncall/issues/8301) - StreamThreadException in Bing Ads source +- **Error**: `'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte` +- **Stream**: `campaign_labels` +- **Root Cause**: GZIP-compressed data being treated as UTF-8 text + +## Analysis + +### Error Context +From Christo's clarification in the issue: +``` +Exception while syncing stream campaign_labels: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte +``` + +The byte `0x8b` is the GZIP magic number, indicating that compressed data is being passed to a UTF-8 decoder. + +### Technical Investigation + +#### 1. Bing Ads Connector Configuration +- Uses `GzipDecoder` with `CsvDecoder` for bulk streams +- Encoding: `utf-8-sig` +- Stream: `campaign_labels` with `DownloadEntities: ["CampaignLabels"]` + +#### 2. Concurrent Source Framework +- `StreamThreadException` wraps exceptions from concurrent processing +- `CompositeRawDecoder` handles response decoding with multiple parsers +- `GzipParser` decompresses GZIP data before passing to inner parsers + +#### 3. Root Cause Analysis +The issue occurs in the concurrent source framework when: +1. GZIP-compressed response is received +2. Parser selection logic fails to detect GZIP content-encoding +3. Compressed data (starting with 0x8b) is passed directly to UTF-8 decoder +4. UTF-8 decoder fails with the observed error +5. Exception is wrapped in `StreamThreadException` + +## Proposed Investigation Areas + +### 1. Parser Selection Logic +- Examine `CompositeRawDecoder._select_parser()` method +- Check header-based parser selection for GZIP content +- Investigate concurrent source integration with declarative decoders + +### 2. Error Handling +- Review exception propagation in concurrent processing +- Check if GZIP decompression errors are properly handled +- Examine fallback mechanisms for parser failures + +### 3. Integration Points +- Analyze how `ConcurrentDeclarativeSource` handles bulk streams +- Check if declarative decoders are properly integrated with concurrent framework +- Investigate state management during concurrent processing + +## Next Steps + +1. Create test cases to reproduce the issue +2. Implement parser selection improvements +3. Add better error handling for GZIP decompression +4. Test with Bing Ads campaign_labels stream +5. Validate fix doesn't break other connectors + +## Files to Investigate +- `airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py` +- `airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py` +- `airbyte_cdk/sources/declarative/concurrent_declarative_source.py` +- Bing Ads manifest configuration for bulk streams diff --git a/fix_gzip_parser_selection.py b/fix_gzip_parser_selection.py new file mode 100644 index 000000000..d0096b01c --- /dev/null +++ b/fix_gzip_parser_selection.py @@ -0,0 +1,171 @@ +""" +Proposed fix for StreamThreadException in Bing Ads source connector. + +This fix addresses the root cause where GZIP-compressed data is incorrectly +treated as UTF-8 text due to missing Content-Encoding header detection +in the concurrent source framework. + +Issue: #8301 - 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte +""" + +import io +import logging +from typing import Any, Dict, Optional + +import requests + +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, + Parser, +) + +logger = logging.getLogger("airbyte") + + +class ImprovedCompositeRawDecoder(CompositeRawDecoder): + """ + Enhanced CompositeRawDecoder with better GZIP detection and error handling. + + This addresses the StreamThreadException issue by: + 1. Auto-detecting GZIP content based on magic bytes + 2. Providing better error handling for decompression failures + 3. Falling back gracefully when parser selection fails + """ + + def __init__( + self, + parser: Parser, + stream_response: bool = True, + parsers_by_header: Optional[Dict[str, Any]] = None, + auto_detect_gzip: bool = True, + ) -> None: + super().__init__(parser, stream_response, parsers_by_header) + self._auto_detect_gzip = auto_detect_gzip + + def _detect_gzip_content(self, response: requests.Response) -> bool: + """ + Detect if response content is GZIP-compressed by checking magic bytes. + + Returns True if the response starts with GZIP magic number (0x1f, 0x8b). + This helps identify GZIP content even when Content-Encoding header is missing. + """ + if not self._auto_detect_gzip: + return False + + try: + if hasattr(response, "raw") and response.raw: + current_pos = response.raw.tell() if hasattr(response.raw, "tell") else None + + magic_bytes = response.raw.read(2) + + if current_pos is not None and hasattr(response.raw, "seek"): + response.raw.seek(current_pos) + elif hasattr(response.raw, "seek"): + response.raw.seek(0) + + return len(magic_bytes) >= 2 and magic_bytes[0] == 0x1F and magic_bytes[1] == 0x8B + + elif hasattr(response, "content") and len(response.content) >= 2: + return response.content[0] == 0x1F and response.content[1] == 0x8B + + except Exception as e: + logger.debug(f"Failed to detect GZIP content: {e}") + + return False + + def _select_parser(self, response: requests.Response) -> Parser: + """ + Enhanced parser selection with GZIP auto-detection. + + This method extends the base implementation to: + 1. Check Content-Encoding header (existing behavior) + 2. Auto-detect GZIP content by magic bytes + 3. Wrap parser with GzipParser if GZIP is detected + """ + selected_parser = super()._select_parser(response) + + if not isinstance(selected_parser, GzipParser) and self._detect_gzip_content(response): + logger.info( + "Auto-detected GZIP content without Content-Encoding header, wrapping parser" + ) + + return GzipParser(inner_parser=selected_parser) + + return selected_parser + + def decode(self, response: requests.Response): + """ + Enhanced decode method with better error handling. + + Provides more informative error messages and graceful fallback + when decompression or parsing fails. + """ + try: + yield from super().decode(response) + except UnicodeDecodeError as e: + if "can't decode byte 0x8b" in str(e): + error_msg = ( + f"UTF-8 decoding failed with GZIP magic byte 0x8b. " + f"This suggests GZIP-compressed data is being treated as UTF-8 text. " + f"Check Content-Encoding headers or enable auto_detect_gzip. " + f"Original error: {e}" + ) + logger.error(error_msg) + + if self._auto_detect_gzip and self._detect_gzip_content(response): + logger.info("Attempting recovery with GZIP decompression") + gzip_parser = GzipParser(inner_parser=self.parser) + + if hasattr(response, "raw") and hasattr(response.raw, "seek"): + response.raw.seek(0) + + try: + if self.is_stream_response(): + response.raw.auto_close = False + yield from gzip_parser.parse(data=response.raw) + response.raw.close() + else: + yield from gzip_parser.parse(data=io.BytesIO(response.content)) + return + except Exception as recovery_error: + logger.error(f"GZIP recovery failed: {recovery_error}") + + raise RuntimeError(error_msg) from e + else: + raise + except Exception as e: + logger.error(f"Decoder error: {e}") + raise + + +def create_bing_ads_compatible_decoder() -> ImprovedCompositeRawDecoder: + """ + Create a CompositeRawDecoder configured for Bing Ads bulk streams. + + This decoder handles the campaign_labels stream and other bulk streams + that use GZIP compression with CSV data. + """ + csv_parser = CsvParser(encoding="utf-8-sig", set_values_to_none=[""]) + + gzip_parser = GzipParser(inner_parser=csv_parser) + + decoder = ImprovedCompositeRawDecoder.by_headers( + parsers=[({"Content-Encoding"}, {"gzip"}, gzip_parser)], + stream_response=True, + fallback_parser=csv_parser, + ) + + decoder._auto_detect_gzip = True + + return decoder + + +if __name__ == "__main__": + print("Proposed fix for StreamThreadException in Bing Ads source") + print("This enhanced CompositeRawDecoder provides:") + print("1. Auto-detection of GZIP content by magic bytes") + print("2. Better error handling for UTF-8/GZIP issues") + print("3. Graceful fallback and recovery mechanisms") + print("4. Specific configuration for Bing Ads bulk streams") diff --git a/test_gzip_utf8_issue.py b/test_gzip_utf8_issue.py new file mode 100644 index 000000000..76f3678b2 --- /dev/null +++ b/test_gzip_utf8_issue.py @@ -0,0 +1,100 @@ +""" +Test script to reproduce the StreamThreadException issue with GZIP data and UTF-8 decoding. + +This test demonstrates the root cause of issue #8301 where GZIP-compressed data +(starting with byte 0x8b) is incorrectly treated as UTF-8 text, causing the +'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte error. +""" + +import gzip +import io +from unittest.mock import Mock + +import requests + +from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( + CompositeRawDecoder, + CsvParser, + GzipParser, +) + + +def test_gzip_utf8_decoding_issue(): + """ + Reproduce the issue where GZIP data is incorrectly treated as UTF-8. + + This simulates the scenario in Bing Ads campaign_labels stream where: + 1. Response contains GZIP-compressed CSV data + 2. Parser selection fails to detect GZIP content-encoding + 3. Compressed data is passed to UTF-8 decoder + 4. UTF-8 decoder fails with byte 0x8b error + """ + csv_data = "Account Id,Campaign,Client Id\n123,Test Campaign,456\n" + + compressed_data = gzip.compress(csv_data.encode("utf-8")) + + assert ( + compressed_data[1] == 0x8B + ), f"Expected GZIP magic number 0x8b, got {hex(compressed_data[1])}" + + mock_response = Mock(spec=requests.Response) + mock_response.content = compressed_data + mock_response.raw = io.BytesIO(compressed_data) + mock_response.headers = {} # Missing Content-Encoding: gzip header + + csv_parser = CsvParser(encoding="utf-8") + decoder = CompositeRawDecoder(parser=csv_parser, stream_response=False) + + try: + list(decoder.decode(mock_response)) + assert False, "Expected UTF-8 decoding error but none occurred" + except UnicodeDecodeError as e: + assert "can't decode byte 0x8b" in str(e) + assert "invalid start byte" in str(e) + print(f"✓ Reproduced the issue: {e}") + + gzip_parser = GzipParser(inner_parser=csv_parser) + correct_decoder = CompositeRawDecoder(parser=gzip_parser, stream_response=False) + + mock_response.raw = io.BytesIO(compressed_data) + + records = list(correct_decoder.decode(mock_response)) + assert len(records) == 1 + assert records[0]["Account Id"] == "123" + assert records[0]["Campaign"] == "Test Campaign" + print("✓ Correct GZIP handling works as expected") + + +def test_header_based_parser_selection(): + """ + Test that CompositeRawDecoder.by_headers() correctly selects GZIP parser + when Content-Encoding header is present. + """ + csv_data = "Account Id,Campaign\n123,Test\n" + compressed_data = gzip.compress(csv_data.encode("utf-8")) + + mock_response = Mock(spec=requests.Response) + mock_response.content = compressed_data + mock_response.raw = io.BytesIO(compressed_data) + mock_response.headers = {"Content-Encoding": "gzip"} + + gzip_parser = GzipParser(inner_parser=CsvParser(encoding="utf-8")) + fallback_parser = CsvParser(encoding="utf-8") + + decoder = CompositeRawDecoder.by_headers( + parsers=[({"Content-Encoding"}, {"gzip"}, gzip_parser)], + stream_response=False, + fallback_parser=fallback_parser, + ) + + records = list(decoder.decode(mock_response)) + assert len(records) == 1 + assert records[0]["Account Id"] == "123" + print("✓ Header-based parser selection works correctly") + + +if __name__ == "__main__": + print("Testing GZIP UTF-8 decoding issue reproduction...") + test_gzip_utf8_decoding_issue() + test_header_based_parser_selection() + print("All tests completed successfully!")