Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions SPIKE_INVESTIGATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Spike Investigation: StreamThreadException in Bing Ads Source

## Issue Summary
- **Issue**: [#8301](https://github.com/airbytehq/oncall/issues/8301) - StreamThreadException in Bing Ads source
- **Error**: `'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte`
- **Stream**: `campaign_labels`
- **Root Cause**: GZIP-compressed data being treated as UTF-8 text

## Analysis

### Error Context
From Christo's clarification in the issue:
```
Exception while syncing stream campaign_labels: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
```

The byte `0x8b` is the GZIP magic number, indicating that compressed data is being passed to a UTF-8 decoder.

### Technical Investigation

#### 1. Bing Ads Connector Configuration
- Uses `GzipDecoder` with `CsvDecoder` for bulk streams
- Encoding: `utf-8-sig`
- Stream: `campaign_labels` with `DownloadEntities: ["CampaignLabels"]`

#### 2. Concurrent Source Framework
- `StreamThreadException` wraps exceptions from concurrent processing
- `CompositeRawDecoder` handles response decoding with multiple parsers
- `GzipParser` decompresses GZIP data before passing to inner parsers

#### 3. Root Cause Analysis
The issue occurs in the concurrent source framework when:
1. GZIP-compressed response is received
2. Parser selection logic fails to detect GZIP content-encoding
3. Compressed data (starting with 0x8b) is passed directly to UTF-8 decoder
4. UTF-8 decoder fails with the observed error
5. Exception is wrapped in `StreamThreadException`

## Proposed Investigation Areas

### 1. Parser Selection Logic
- Examine `CompositeRawDecoder._select_parser()` method
- Check header-based parser selection for GZIP content
- Investigate concurrent source integration with declarative decoders

### 2. Error Handling
- Review exception propagation in concurrent processing
- Check if GZIP decompression errors are properly handled
- Examine fallback mechanisms for parser failures

### 3. Integration Points
- Analyze how `ConcurrentDeclarativeSource` handles bulk streams
- Check if declarative decoders are properly integrated with concurrent framework
- Investigate state management during concurrent processing

## Next Steps

1. Create test cases to reproduce the issue
2. Implement parser selection improvements
3. Add better error handling for GZIP decompression
4. Test with Bing Ads campaign_labels stream
5. Validate fix doesn't break other connectors

## Files to Investigate
- `airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py`
- `airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py`
- `airbyte_cdk/sources/declarative/concurrent_declarative_source.py`
- Bing Ads manifest configuration for bulk streams
171 changes: 171 additions & 0 deletions fix_gzip_parser_selection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""
Proposed fix for StreamThreadException in Bing Ads source connector.

This fix addresses the root cause where GZIP-compressed data is incorrectly
treated as UTF-8 text due to missing Content-Encoding header detection
in the concurrent source framework.

Issue: #8301 - 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
"""

import io
import logging
from typing import Any, Dict, Optional

import requests

from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
CsvParser,
GzipParser,
Parser,
)

logger = logging.getLogger("airbyte")


class ImprovedCompositeRawDecoder(CompositeRawDecoder):
"""
Enhanced CompositeRawDecoder with better GZIP detection and error handling.

This addresses the StreamThreadException issue by:
1. Auto-detecting GZIP content based on magic bytes
2. Providing better error handling for decompression failures
3. Falling back gracefully when parser selection fails
"""

def __init__(
self,
parser: Parser,
stream_response: bool = True,
parsers_by_header: Optional[Dict[str, Any]] = None,
auto_detect_gzip: bool = True,
) -> None:
super().__init__(parser, stream_response, parsers_by_header)
self._auto_detect_gzip = auto_detect_gzip

def _detect_gzip_content(self, response: requests.Response) -> bool:
"""
Detect if response content is GZIP-compressed by checking magic bytes.

Returns True if the response starts with GZIP magic number (0x1f, 0x8b).
This helps identify GZIP content even when Content-Encoding header is missing.
"""
if not self._auto_detect_gzip:
return False

try:
if hasattr(response, "raw") and response.raw:
current_pos = response.raw.tell() if hasattr(response.raw, "tell") else None

magic_bytes = response.raw.read(2)

if current_pos is not None and hasattr(response.raw, "seek"):
response.raw.seek(current_pos)
elif hasattr(response.raw, "seek"):
response.raw.seek(0)

return len(magic_bytes) >= 2 and magic_bytes[0] == 0x1F and magic_bytes[1] == 0x8B

elif hasattr(response, "content") and len(response.content) >= 2:
return response.content[0] == 0x1F and response.content[1] == 0x8B

except Exception as e:
logger.debug(f"Failed to detect GZIP content: {e}")

return False

def _select_parser(self, response: requests.Response) -> Parser:
"""
Enhanced parser selection with GZIP auto-detection.

This method extends the base implementation to:
1. Check Content-Encoding header (existing behavior)
2. Auto-detect GZIP content by magic bytes
3. Wrap parser with GzipParser if GZIP is detected
"""
selected_parser = super()._select_parser(response)

if not isinstance(selected_parser, GzipParser) and self._detect_gzip_content(response):
logger.info(
"Auto-detected GZIP content without Content-Encoding header, wrapping parser"
)

return GzipParser(inner_parser=selected_parser)

return selected_parser

def decode(self, response: requests.Response):
"""
Enhanced decode method with better error handling.

Provides more informative error messages and graceful fallback
when decompression or parsing fails.
"""
try:
yield from super().decode(response)
except UnicodeDecodeError as e:
if "can't decode byte 0x8b" in str(e):
error_msg = (
f"UTF-8 decoding failed with GZIP magic byte 0x8b. "
f"This suggests GZIP-compressed data is being treated as UTF-8 text. "
f"Check Content-Encoding headers or enable auto_detect_gzip. "
f"Original error: {e}"
)
logger.error(error_msg)

if self._auto_detect_gzip and self._detect_gzip_content(response):
logger.info("Attempting recovery with GZIP decompression")
gzip_parser = GzipParser(inner_parser=self.parser)

if hasattr(response, "raw") and hasattr(response.raw, "seek"):
response.raw.seek(0)

try:
if self.is_stream_response():
response.raw.auto_close = False
yield from gzip_parser.parse(data=response.raw)
response.raw.close()
else:
yield from gzip_parser.parse(data=io.BytesIO(response.content))
return
except Exception as recovery_error:
logger.error(f"GZIP recovery failed: {recovery_error}")

raise RuntimeError(error_msg) from e
else:
raise
except Exception as e:
logger.error(f"Decoder error: {e}")
raise


def create_bing_ads_compatible_decoder() -> ImprovedCompositeRawDecoder:
"""
Create a CompositeRawDecoder configured for Bing Ads bulk streams.

This decoder handles the campaign_labels stream and other bulk streams
that use GZIP compression with CSV data.
"""
csv_parser = CsvParser(encoding="utf-8-sig", set_values_to_none=[""])

gzip_parser = GzipParser(inner_parser=csv_parser)

decoder = ImprovedCompositeRawDecoder.by_headers(
parsers=[({"Content-Encoding"}, {"gzip"}, gzip_parser)],
stream_response=True,
fallback_parser=csv_parser,
)

decoder._auto_detect_gzip = True

return decoder


if __name__ == "__main__":
print("Proposed fix for StreamThreadException in Bing Ads source")
print("This enhanced CompositeRawDecoder provides:")
print("1. Auto-detection of GZIP content by magic bytes")
print("2. Better error handling for UTF-8/GZIP issues")
print("3. Graceful fallback and recovery mechanisms")
print("4. Specific configuration for Bing Ads bulk streams")
100 changes: 100 additions & 0 deletions test_gzip_utf8_issue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
Test script to reproduce the StreamThreadException issue with GZIP data and UTF-8 decoding.

This test demonstrates the root cause of issue #8301 where GZIP-compressed data
(starting with byte 0x8b) is incorrectly treated as UTF-8 text, causing the
'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte error.
"""

import gzip
import io
from unittest.mock import Mock

import requests

from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
CompositeRawDecoder,
CsvParser,
GzipParser,
)


def test_gzip_utf8_decoding_issue():
"""
Reproduce the issue where GZIP data is incorrectly treated as UTF-8.

This simulates the scenario in Bing Ads campaign_labels stream where:
1. Response contains GZIP-compressed CSV data
2. Parser selection fails to detect GZIP content-encoding
3. Compressed data is passed to UTF-8 decoder
4. UTF-8 decoder fails with byte 0x8b error
"""
csv_data = "Account Id,Campaign,Client Id\n123,Test Campaign,456\n"

compressed_data = gzip.compress(csv_data.encode("utf-8"))

assert (
compressed_data[1] == 0x8B
), f"Expected GZIP magic number 0x8b, got {hex(compressed_data[1])}"

mock_response = Mock(spec=requests.Response)
mock_response.content = compressed_data
mock_response.raw = io.BytesIO(compressed_data)
mock_response.headers = {} # Missing Content-Encoding: gzip header

csv_parser = CsvParser(encoding="utf-8")
decoder = CompositeRawDecoder(parser=csv_parser, stream_response=False)

try:
list(decoder.decode(mock_response))
assert False, "Expected UTF-8 decoding error but none occurred"
except UnicodeDecodeError as e:
assert "can't decode byte 0x8b" in str(e)
assert "invalid start byte" in str(e)
print(f"✓ Reproduced the issue: {e}")

gzip_parser = GzipParser(inner_parser=csv_parser)
correct_decoder = CompositeRawDecoder(parser=gzip_parser, stream_response=False)

mock_response.raw = io.BytesIO(compressed_data)

records = list(correct_decoder.decode(mock_response))
assert len(records) == 1
assert records[0]["Account Id"] == "123"
assert records[0]["Campaign"] == "Test Campaign"
print("✓ Correct GZIP handling works as expected")


def test_header_based_parser_selection():
"""
Test that CompositeRawDecoder.by_headers() correctly selects GZIP parser
when Content-Encoding header is present.
"""
csv_data = "Account Id,Campaign\n123,Test\n"
compressed_data = gzip.compress(csv_data.encode("utf-8"))

mock_response = Mock(spec=requests.Response)
mock_response.content = compressed_data
mock_response.raw = io.BytesIO(compressed_data)
mock_response.headers = {"Content-Encoding": "gzip"}

gzip_parser = GzipParser(inner_parser=CsvParser(encoding="utf-8"))
fallback_parser = CsvParser(encoding="utf-8")

decoder = CompositeRawDecoder.by_headers(
parsers=[({"Content-Encoding"}, {"gzip"}, gzip_parser)],
stream_response=False,
fallback_parser=fallback_parser,
)

records = list(decoder.decode(mock_response))
assert len(records) == 1
assert records[0]["Account Id"] == "123"
print("✓ Header-based parser selection works correctly")


if __name__ == "__main__":
print("Testing GZIP UTF-8 decoding issue reproduction...")
test_gzip_utf8_decoding_issue()
test_header_based_parser_selection()
print("All tests completed successfully!")
Loading