Skip to content

Commit e7f1e4c

Browse files
pnilanCopilotcoderabbitai[bot]
authored
feat(http_client): respective failure_type is raised after exhausting request attempts and failing w/ backoff exception (#762)
Co-authored-by: Copilot <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 691b59a commit e7f1e4c

File tree

13 files changed

+174
-67
lines changed

13 files changed

+174
-67
lines changed

airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ def check_connection(
4949
for stream in streams[: min(self.stream_count, len(streams))]:
5050
stream_is_available, reason = evaluate_availability(stream, logger)
5151
if not stream_is_available:
52-
logger.warning(f"Stream {stream.name} is not available: {reason}")
53-
return False, reason
52+
message = f"Stream {stream.name} is not available: {reason}"
53+
logger.warning(message)
54+
return False, message
5455
except Exception as error:
5556
error_message = (
5657
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"

airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,56 +31,56 @@
3131
400: ErrorResolution(
3232
response_action=ResponseAction.FAIL,
3333
failure_type=FailureType.system_error,
34-
error_message="Bad request. Please check your request parameters.",
34+
error_message="HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
3535
),
3636
401: ErrorResolution(
3737
response_action=ResponseAction.FAIL,
3838
failure_type=FailureType.config_error,
39-
error_message="Unauthorized. Please ensure you are authenticated correctly.",
39+
error_message="HTTP Status Code: 401. Error: Unauthorized. Please ensure you are authenticated correctly.",
4040
),
4141
403: ErrorResolution(
4242
response_action=ResponseAction.FAIL,
4343
failure_type=FailureType.config_error,
44-
error_message="Forbidden. You don't have permission to access this resource.",
44+
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
4545
),
4646
404: ErrorResolution(
4747
response_action=ResponseAction.FAIL,
4848
failure_type=FailureType.system_error,
49-
error_message="Not found. The requested resource was not found on the server.",
49+
error_message="HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
5050
),
5151
405: ErrorResolution(
5252
response_action=ResponseAction.FAIL,
5353
failure_type=FailureType.system_error,
54-
error_message="Method not allowed. Please check your request method.",
54+
error_message="HTTP Status Code: 405. Error: Method not allowed. Please check your request method.",
5555
),
5656
408: ErrorResolution(
5757
response_action=ResponseAction.RETRY,
5858
failure_type=FailureType.transient_error,
59-
error_message="Request timeout.",
59+
error_message="HTTP Status Code: 408. Error: Request timeout.",
6060
),
6161
429: ErrorResolution(
6262
response_action=ResponseAction.RATE_LIMITED,
6363
failure_type=FailureType.transient_error,
64-
error_message="Too many requests.",
64+
error_message="HTTP Status Code: 429. Error: Too many requests.",
6565
),
6666
500: ErrorResolution(
6767
response_action=ResponseAction.RETRY,
6868
failure_type=FailureType.transient_error,
69-
error_message="Internal server error.",
69+
error_message="HTTP Status Code: 500. Error: Internal server error.",
7070
),
7171
502: ErrorResolution(
7272
response_action=ResponseAction.RETRY,
7373
failure_type=FailureType.transient_error,
74-
error_message="Bad gateway.",
74+
error_message="HTTP Status Code: 502. Error: Bad gateway.",
7575
),
7676
503: ErrorResolution(
7777
response_action=ResponseAction.RETRY,
7878
failure_type=FailureType.transient_error,
79-
error_message="Service unavailable.",
79+
error_message="HTTP Status Code: 503. Error: Service unavailable.",
8080
),
8181
504: ErrorResolution(
8282
response_action=ResponseAction.RETRY,
8383
failure_type=FailureType.transient_error,
84-
error_message="Gateway timeout.",
84+
error_message="HTTP Status Code: 504. Error: Gateway timeout.",
8585
),
8686
}

airbyte_cdk/sources/streams/http/exceptions.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,18 @@
77

88
import requests
99

10+
from airbyte_cdk.models import FailureType
11+
1012

1113
class BaseBackoffException(requests.exceptions.HTTPError):
1214
def __init__(
1315
self,
1416
request: requests.PreparedRequest,
1517
response: Optional[Union[requests.Response, Exception]],
1618
error_message: str = "",
19+
failure_type: Optional[FailureType] = None,
1720
):
21+
self.failure_type = failure_type
1822
if isinstance(response, requests.Response):
1923
error_message = (
2024
error_message
@@ -43,14 +47,20 @@ def __init__(
4347
request: requests.PreparedRequest,
4448
response: Optional[Union[requests.Response, Exception]],
4549
error_message: str = "",
50+
failure_type: Optional[FailureType] = None,
4651
):
4752
"""
4853
:param backoff: how long to backoff in seconds
4954
:param request: the request that triggered this backoff exception
5055
:param response: the response that triggered the backoff exception
5156
"""
5257
self.backoff = backoff
53-
super().__init__(request=request, response=response, error_message=error_message)
58+
super().__init__(
59+
request=request,
60+
response=response,
61+
error_message=error_message,
62+
failure_type=failure_type,
63+
)
5464

5565

5666
class DefaultBackoffException(BaseBackoffException):

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
AirbyteStreamStatus,
1919
AirbyteStreamStatusReason,
2020
AirbyteStreamStatusReasonType,
21+
FailureType,
2122
Level,
2223
StreamDescriptor,
2324
)
@@ -35,6 +36,7 @@
3536
ResponseAction,
3637
)
3738
from airbyte_cdk.sources.streams.http.exceptions import (
39+
BaseBackoffException,
3840
DefaultBackoffException,
3941
RateLimitBackoffException,
4042
RequestBodyException,
@@ -290,15 +292,25 @@ def _send_with_retry(
290292
backoff_handler = http_client_default_backoff_handler(
291293
max_tries=max_tries, max_time=max_time
292294
)
293-
# backoff handlers wrap _send, so it will always return a response
294-
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
295-
request,
296-
request_kwargs,
297-
log_formatter=log_formatter,
298-
exit_on_rate_limit=exit_on_rate_limit,
299-
) # type: ignore # mypy can't infer that backoff_handler wraps _send
300-
301-
return response
295+
# backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted
296+
try:
297+
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
298+
request,
299+
request_kwargs,
300+
log_formatter=log_formatter,
301+
exit_on_rate_limit=exit_on_rate_limit,
302+
) # type: ignore # mypy can't infer that backoff_handler wraps _send
303+
304+
return response
305+
except BaseBackoffException as e:
306+
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
307+
raise MessageRepresentationAirbyteTracedErrors(
308+
internal_message=f"Exhausted available request attempts. Exception: {e}",
309+
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
310+
failure_type=e.failure_type or FailureType.system_error,
311+
exception=e,
312+
stream_descriptor=StreamDescriptor(name=self._name),
313+
)
302314

303315
def _send(
304316
self,
@@ -492,19 +504,22 @@ def _handle_error_resolution(
492504
request=request,
493505
response=(response if response is not None else exc),
494506
error_message=error_message,
507+
failure_type=error_resolution.failure_type,
495508
)
496509

497510
elif retry_endlessly:
498511
raise RateLimitBackoffException(
499512
request=request,
500513
response=(response if response is not None else exc),
501514
error_message=error_message,
515+
failure_type=error_resolution.failure_type,
502516
)
503517

504518
raise DefaultBackoffException(
505519
request=request,
506520
response=(response if response is not None else exc),
507521
error_message=error_message,
522+
failure_type=error_resolution.failure_type,
508523
)
509524

510525
elif response:

airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,11 @@ def _make_handled_request(self) -> Any:
240240
except requests.exceptions.RequestException as e:
241241
if e.response is not None:
242242
if e.response.status_code == 429 or e.response.status_code >= 500:
243-
raise DefaultBackoffException(request=e.response.request, response=e.response)
243+
raise DefaultBackoffException(
244+
request=e.response.request,
245+
response=e.response,
246+
failure_type=FailureType.transient_error,
247+
)
244248
if self._wrap_refresh_token_exception(e):
245249
message = "Refresh token is invalid or expired. Please re-authenticate from Sources/<your source>/Settings."
246250
raise AirbyteTracedException(

unit_tests/sources/declarative/async_job/test_job_orchestrator.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
2020
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
2121
from airbyte_cdk.sources.message import MessageRepository
22-
from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors
2322
from airbyte_cdk.sources.types import StreamSlice
2423
from airbyte_cdk.utils import AirbyteTracedException
2524

@@ -244,7 +243,7 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a
244243
Since this is a config error, we assume the other jobs will fail for the same reasons.
245244
"""
246245
job_tracker = JobTracker(1)
247-
self._job_repository.start.side_effect = MessageRepresentationAirbyteTracedErrors(
246+
self._job_repository.start.side_effect = AirbyteTracedException(
248247
"Can't create job", failure_type=FailureType.config_error
249248
)
250249

unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ def test_default_error_handler_with_default_response_filter(
9292
),
9393
ResponseAction.RETRY,
9494
FailureType.system_error,
95-
"Bad request. Please check your request parameters.",
95+
"HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
9696
),
9797
(
9898
"_with_http_response_status_402_fail_with_default_failure_type",
@@ -118,7 +118,7 @@ def test_default_error_handler_with_default_response_filter(
118118
),
119119
ResponseAction.FAIL,
120120
FailureType.config_error,
121-
"Forbidden. You don't have permission to access this resource.",
121+
"HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
122122
),
123123
(
124124
"_with_http_response_status_200_fail_with_contained_error_message",

unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
ErrorResolution(
4545
response_action=ResponseAction.IGNORE,
4646
failure_type=FailureType.config_error,
47-
error_message="Forbidden. You don't have permission to access this resource.",
47+
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
4848
),
4949
id="test_http_code_matches_ignore_action",
5050
),
@@ -59,7 +59,7 @@
5959
ErrorResolution(
6060
response_action=ResponseAction.RETRY,
6161
failure_type=FailureType.transient_error,
62-
error_message="Too many requests.",
62+
error_message="HTTP Status Code: 429. Error: Too many requests.",
6363
),
6464
id="test_http_code_matches_retry_action",
6565
),
@@ -104,7 +104,7 @@
104104
ErrorResolution(
105105
response_action=ResponseAction.FAIL,
106106
failure_type=FailureType.config_error,
107-
error_message="Forbidden. You don't have permission to access this resource.",
107+
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
108108
),
109109
id="test_predicate_matches_headers",
110110
),

unit_tests/sources/declarative/requesters/test_http_requester.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,12 @@
2929
InterpolatedRequestOptionsProvider,
3030
)
3131
from airbyte_cdk.sources.message import MessageRepository
32-
from airbyte_cdk.sources.streams.call_rate import (
33-
AbstractAPIBudget,
34-
HttpAPIBudget,
35-
MovingWindowCallRatePolicy,
36-
Rate,
37-
)
38-
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
32+
from airbyte_cdk.sources.streams.call_rate import HttpAPIBudget
3933
from airbyte_cdk.sources.streams.http.exceptions import (
4034
RequestBodyException,
41-
UserDefinedBackoffException,
4235
)
4336
from airbyte_cdk.sources.types import Config
37+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
4438

4539

4640
@pytest.fixture
@@ -880,7 +874,7 @@ def test_request_attempt_count_is_tracked_across_retries(http_requester_factory)
880874
response.status_code = 500
881875
http_requester._http_client._session.send.return_value = response
882876

883-
with pytest.raises(UserDefinedBackoffException):
877+
with pytest.raises(AirbyteTracedException):
884878
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})
885879

886880
assert (
@@ -906,7 +900,7 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_
906900
response.status_code = 500
907901
http_requester._http_client._session.send.return_value = response
908902

909-
with pytest.raises(UserDefinedBackoffException):
903+
with pytest.raises(AirbyteTracedException):
910904
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})
911905

912906
assert (
@@ -937,7 +931,7 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any
937931
response.status_code = 500
938932
http_requester._http_client._session.send.return_value = response
939933

940-
with pytest.raises(UserDefinedBackoffException):
934+
with pytest.raises(AirbyteTracedException):
941935
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})
942936

943937
assert (

unit_tests/sources/streams/http/error_handlers/test_http_status_error_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ def test_given_ok_response_http_status_error_handler_returns_success_action(mock
3434
403,
3535
ResponseAction.FAIL,
3636
FailureType.config_error,
37-
"Forbidden. You don't have permission to access this resource.",
37+
"HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
3838
),
3939
(
4040
404,
4141
ResponseAction.FAIL,
4242
FailureType.system_error,
43-
"Not found. The requested resource was not found on the server.",
43+
"HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
4444
),
4545
],
4646
)

0 commit comments

Comments
 (0)