diff --git a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py index 66002f9f9..2dac2ecc3 100644 --- a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py @@ -49,8 +49,9 @@ def check_connection( for stream in streams[: min(self.stream_count, len(streams))]: stream_is_available, reason = evaluate_availability(stream, logger) if not stream_is_available: - logger.warning(f"Stream {stream.name} is not available: {reason}") - return False, reason + message = f"Stream {stream.name} is not available: {reason}" + logger.warning(message) + return False, message except Exception as error: error_message = ( f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" diff --git a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py index da616e0ee..45716768f 100644 --- a/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py +++ b/airbyte_cdk/sources/streams/http/error_handlers/default_error_mapping.py @@ -31,56 +31,56 @@ 400: ErrorResolution( response_action=ResponseAction.FAIL, failure_type=FailureType.system_error, - error_message="Bad request. Please check your request parameters.", + error_message="HTTP Status Code: 400. Error: Bad request. Please check your request parameters.", ), 401: ErrorResolution( response_action=ResponseAction.FAIL, failure_type=FailureType.config_error, - error_message="Unauthorized. Please ensure you are authenticated correctly.", + error_message="HTTP Status Code: 401. Error: Unauthorized. Please ensure you are authenticated correctly.", ), 403: ErrorResolution( response_action=ResponseAction.FAIL, failure_type=FailureType.config_error, - error_message="Forbidden. You don't have permission to access this resource.", + error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.", ), 404: ErrorResolution( response_action=ResponseAction.FAIL, failure_type=FailureType.system_error, - error_message="Not found. The requested resource was not found on the server.", + error_message="HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.", ), 405: ErrorResolution( response_action=ResponseAction.FAIL, failure_type=FailureType.system_error, - error_message="Method not allowed. Please check your request method.", + error_message="HTTP Status Code: 405. Error: Method not allowed. Please check your request method.", ), 408: ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="Request timeout.", + error_message="HTTP Status Code: 408. Error: Request timeout.", ), 429: ErrorResolution( response_action=ResponseAction.RATE_LIMITED, failure_type=FailureType.transient_error, - error_message="Too many requests.", + error_message="HTTP Status Code: 429. Error: Too many requests.", ), 500: ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="Internal server error.", + error_message="HTTP Status Code: 500. Error: Internal server error.", ), 502: ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="Bad gateway.", + error_message="HTTP Status Code: 502. Error: Bad gateway.", ), 503: ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="Service unavailable.", + error_message="HTTP Status Code: 503. Error: Service unavailable.", ), 504: ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="Gateway timeout.", + error_message="HTTP Status Code: 504. Error: Gateway timeout.", ), } diff --git a/airbyte_cdk/sources/streams/http/exceptions.py b/airbyte_cdk/sources/streams/http/exceptions.py index ee4687626..73d2947fa 100644 --- a/airbyte_cdk/sources/streams/http/exceptions.py +++ b/airbyte_cdk/sources/streams/http/exceptions.py @@ -7,6 +7,8 @@ import requests +from airbyte_cdk.models import FailureType + class BaseBackoffException(requests.exceptions.HTTPError): def __init__( @@ -14,7 +16,9 @@ def __init__( request: requests.PreparedRequest, response: Optional[Union[requests.Response, Exception]], error_message: str = "", + failure_type: Optional[FailureType] = None, ): + self.failure_type = failure_type if isinstance(response, requests.Response): error_message = ( error_message @@ -43,6 +47,7 @@ def __init__( request: requests.PreparedRequest, response: Optional[Union[requests.Response, Exception]], error_message: str = "", + failure_type: Optional[FailureType] = None, ): """ :param backoff: how long to backoff in seconds @@ -50,7 +55,12 @@ def __init__( :param response: the response that triggered the backoff exception """ self.backoff = backoff - super().__init__(request=request, response=response, error_message=error_message) + super().__init__( + request=request, + response=response, + error_message=error_message, + failure_type=failure_type, + ) class DefaultBackoffException(BaseBackoffException): diff --git a/airbyte_cdk/sources/streams/http/http_client.py b/airbyte_cdk/sources/streams/http/http_client.py index 63c451c49..7f22e0dee 100644 --- a/airbyte_cdk/sources/streams/http/http_client.py +++ b/airbyte_cdk/sources/streams/http/http_client.py @@ -18,6 +18,7 @@ AirbyteStreamStatus, AirbyteStreamStatusReason, AirbyteStreamStatusReasonType, + FailureType, Level, StreamDescriptor, ) @@ -35,6 +36,7 @@ ResponseAction, ) from airbyte_cdk.sources.streams.http.exceptions import ( + BaseBackoffException, DefaultBackoffException, RateLimitBackoffException, RequestBodyException, @@ -290,15 +292,25 @@ def _send_with_retry( backoff_handler = http_client_default_backoff_handler( max_tries=max_tries, max_time=max_time ) - # backoff handlers wrap _send, so it will always return a response - response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))( - request, - request_kwargs, - log_formatter=log_formatter, - exit_on_rate_limit=exit_on_rate_limit, - ) # type: ignore # mypy can't infer that backoff_handler wraps _send - - return response + # backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted + try: + response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))( + request, + request_kwargs, + log_formatter=log_formatter, + exit_on_rate_limit=exit_on_rate_limit, + ) # type: ignore # mypy can't infer that backoff_handler wraps _send + + return response + except BaseBackoffException as e: + self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True) + raise MessageRepresentationAirbyteTracedErrors( + internal_message=f"Exhausted available request attempts. Exception: {e}", + message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}", + failure_type=e.failure_type or FailureType.system_error, + exception=e, + stream_descriptor=StreamDescriptor(name=self._name), + ) def _send( self, @@ -492,6 +504,7 @@ def _handle_error_resolution( request=request, response=(response if response is not None else exc), error_message=error_message, + failure_type=error_resolution.failure_type, ) elif retry_endlessly: @@ -499,12 +512,14 @@ def _handle_error_resolution( request=request, response=(response if response is not None else exc), error_message=error_message, + failure_type=error_resolution.failure_type, ) raise DefaultBackoffException( request=request, response=(response if response is not None else exc), error_message=error_message, + failure_type=error_resolution.failure_type, ) elif response: diff --git a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py index e7a4477c2..ed7a45d49 100644 --- a/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py +++ b/airbyte_cdk/sources/streams/http/requests_native_auth/abstract_oauth.py @@ -240,7 +240,11 @@ def _make_handled_request(self) -> Any: except requests.exceptions.RequestException as e: if e.response is not None: if e.response.status_code == 429 or e.response.status_code >= 500: - raise DefaultBackoffException(request=e.response.request, response=e.response) + raise DefaultBackoffException( + request=e.response.request, + response=e.response, + failure_type=FailureType.transient_error, + ) if self._wrap_refresh_token_exception(e): message = "Refresh token is invalid or expired. Please re-authenticate from Sources//Settings." raise AirbyteTracedException( diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index dd953ca56..727f517c4 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -19,7 +19,6 @@ from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository from airbyte_cdk.sources.message import MessageRepository -from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.utils import AirbyteTracedException @@ -244,7 +243,7 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a Since this is a config error, we assume the other jobs will fail for the same reasons. """ job_tracker = JobTracker(1) - self._job_repository.start.side_effect = MessageRepresentationAirbyteTracedErrors( + self._job_repository.start.side_effect = AirbyteTracedException( "Can't create job", failure_type=FailureType.config_error ) diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py b/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py index 3b13fe130..bf4e1e321 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_default_error_handler.py @@ -92,7 +92,7 @@ def test_default_error_handler_with_default_response_filter( ), ResponseAction.RETRY, FailureType.system_error, - "Bad request. Please check your request parameters.", + "HTTP Status Code: 400. Error: Bad request. Please check your request parameters.", ), ( "_with_http_response_status_402_fail_with_default_failure_type", @@ -118,7 +118,7 @@ def test_default_error_handler_with_default_response_filter( ), ResponseAction.FAIL, FailureType.config_error, - "Forbidden. You don't have permission to access this resource.", + "HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.", ), ( "_with_http_response_status_200_fail_with_contained_error_message", diff --git a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py index ecb9c7032..87e522d4a 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/test_http_response_filter.py @@ -44,7 +44,7 @@ ErrorResolution( response_action=ResponseAction.IGNORE, failure_type=FailureType.config_error, - error_message="Forbidden. You don't have permission to access this resource.", + error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.", ), id="test_http_code_matches_ignore_action", ), @@ -59,7 +59,7 @@ ErrorResolution( response_action=ResponseAction.RETRY, failure_type=FailureType.transient_error, - error_message="Too many requests.", + error_message="HTTP Status Code: 429. Error: Too many requests.", ), id="test_http_code_matches_retry_action", ), @@ -104,7 +104,7 @@ ErrorResolution( response_action=ResponseAction.FAIL, failure_type=FailureType.config_error, - error_message="Forbidden. You don't have permission to access this resource.", + error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.", ), id="test_predicate_matches_headers", ), diff --git a/unit_tests/sources/declarative/requesters/test_http_requester.py b/unit_tests/sources/declarative/requesters/test_http_requester.py index 8fce688d7..211b34890 100644 --- a/unit_tests/sources/declarative/requesters/test_http_requester.py +++ b/unit_tests/sources/declarative/requesters/test_http_requester.py @@ -29,18 +29,12 @@ InterpolatedRequestOptionsProvider, ) from airbyte_cdk.sources.message import MessageRepository -from airbyte_cdk.sources.streams.call_rate import ( - AbstractAPIBudget, - HttpAPIBudget, - MovingWindowCallRatePolicy, - Rate, -) -from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction +from airbyte_cdk.sources.streams.call_rate import HttpAPIBudget from airbyte_cdk.sources.streams.http.exceptions import ( RequestBodyException, - UserDefinedBackoffException, ) from airbyte_cdk.sources.types import Config +from airbyte_cdk.utils.traced_exception import AirbyteTracedException @pytest.fixture @@ -880,7 +874,7 @@ def test_request_attempt_count_is_tracked_across_retries(http_requester_factory) response.status_code = 500 http_requester._http_client._session.send.return_value = response - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException): http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={}) assert ( @@ -906,7 +900,7 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_ response.status_code = 500 http_requester._http_client._session.send.return_value = response - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException): http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={}) assert ( @@ -937,7 +931,7 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any response.status_code = 500 http_requester._http_client._session.send.return_value = response - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException): http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={}) assert ( diff --git a/unit_tests/sources/streams/http/error_handlers/test_http_status_error_handler.py b/unit_tests/sources/streams/http/error_handlers/test_http_status_error_handler.py index 97e71ee5a..b49bc17f8 100644 --- a/unit_tests/sources/streams/http/error_handlers/test_http_status_error_handler.py +++ b/unit_tests/sources/streams/http/error_handlers/test_http_status_error_handler.py @@ -34,13 +34,13 @@ def test_given_ok_response_http_status_error_handler_returns_success_action(mock 403, ResponseAction.FAIL, FailureType.config_error, - "Forbidden. You don't have permission to access this resource.", + "HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.", ), ( 404, ResponseAction.FAIL, FailureType.system_error, - "Not found. The requested resource was not found on the server.", + "HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.", ), ], ) diff --git a/unit_tests/sources/streams/http/test_availability_strategy.py b/unit_tests/sources/streams/http/test_availability_strategy.py index bf49e09b4..91718c636 100644 --- a/unit_tests/sources/streams/http/test_availability_strategy.py +++ b/unit_tests/sources/streams/http/test_availability_strategy.py @@ -107,7 +107,7 @@ def test_http_availability_raises_unhandled_error(mocker): assert ( False, - "Not found. The requested resource was not found on the server.", + "HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.", ) == HttpAvailabilityStrategy().check_availability(http_stream, logger) diff --git a/unit_tests/sources/streams/http/test_http.py b/unit_tests/sources/streams/http/test_http.py index f7ad9e47e..d2bc4071e 100644 --- a/unit_tests/sources/streams/http/test_http.py +++ b/unit_tests/sources/streams/http/test_http.py @@ -6,14 +6,16 @@ import logging from http import HTTPStatus from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union -from unittest.mock import ANY, MagicMock, patch +from unittest.mock import ANY, MagicMock, Mock, patch import pytest import requests from requests.exceptions import InvalidURL from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type +from airbyte_cdk.sources.message.repository import MessageRepository from airbyte_cdk.sources.streams import CheckpointMixin +from airbyte_cdk.sources.streams.call_rate import APIBudget from airbyte_cdk.sources.streams.checkpoint import ResumableFullRefreshCursor from airbyte_cdk.sources.streams.checkpoint.substream_resumable_full_refresh_cursor import ( SubstreamResumableFullRefreshCursor, @@ -26,12 +28,12 @@ ResponseAction, ) from airbyte_cdk.sources.streams.http.exceptions import ( - DefaultBackoffException, RequestBodyException, UserDefinedBackoffException, ) -from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors +from airbyte_cdk.sources.streams.http.http_client import HttpClient from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +from airbyte_cdk.utils import AirbyteTracedException from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets @@ -40,7 +42,22 @@ class StubBasicReadHttpStream(HttpStream): primary_key = "" def __init__(self, deduplicate_query_params: bool = False, **kwargs): + disable_retries = False + if "disable_retries" in kwargs: + disable_retries = kwargs.pop("disable_retries") super().__init__(**kwargs) + self._http_client = HttpClient( + name=self.name, + logger=self.logger, + error_handler=self.get_error_handler(), + api_budget=kwargs.get("api_budget", Mock(spec=APIBudget)), + authenticator=kwargs.get("authenticator", None), + use_cache=self.use_cache, + backoff_strategy=self.get_backoff_strategy(), + message_repository=kwargs.get("message_repository", Mock(spec=MessageRepository)), + disable_retries=disable_retries, + ) + self.resp_counter = 1 self._deduplicate_query_params = deduplicate_query_params @@ -169,7 +186,7 @@ def test_stub_custom_backoff_http_stream(mocker): send_mock = mocker.patch.object(requests.Session, "send", return_value=req) - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException): list(stream.read_records(SyncMode.full_refresh)) assert send_mock.call_count == stream.max_retries + 1 @@ -193,10 +210,10 @@ def get_error_handler(self) -> Optional[ErrorHandler]: req.status_code = HTTPStatus.TOO_MANY_REQUESTS send_mock = mocker.patch.object(requests.Session, "send", return_value=req) - with pytest.raises(UserDefinedBackoffException, match="Too many requests") as excinfo: + with pytest.raises( + AirbyteTracedException, match="Exception: HTTP Status Code: 429. Error: Too many requests." + ): list(stream.read_records(SyncMode.full_refresh)) - assert isinstance(excinfo.value.request, requests.PreparedRequest) - assert isinstance(excinfo.value.response, requests.Response) if retries <= 0: assert send_mock.call_count == 1 else: @@ -230,7 +247,7 @@ def test_4xx_error_codes_http_stream(mocker, http_code): req.status_code = http_code mocker.patch.object(requests.Session, "send", return_value=req) - with pytest.raises(MessageRepresentationAirbyteTracedErrors): + with pytest.raises(AirbyteTracedException): list(stream.read_records(SyncMode.full_refresh)) @@ -262,7 +279,7 @@ def test_error_codes_http_stream_error_resolution_with_response_secrets_filtered mocker.patch.object(requests.Session, "send", return_value=res) # proceed - with pytest.raises(MessageRepresentationAirbyteTracedErrors) as err: + with pytest.raises(AirbyteTracedException) as err: list(stream.read_records(SyncMode.full_refresh)) # we expect the header secrets are obscured @@ -286,7 +303,10 @@ def test_raise_on_http_errors_off_429(mocker): req.status_code = 429 mocker.patch.object(requests.Session, "send", return_value=req) - with pytest.raises(DefaultBackoffException, match="Too many requests"): + with pytest.raises( + AirbyteTracedException, + match="Exhausted available request attempts. Please see logs for more details. Exception: HTTP Status Code: 429. Error: Too many requests.", + ): stream.exit_on_rate_limit = True list(stream.read_records(SyncMode.full_refresh)) @@ -299,7 +319,7 @@ def test_raise_on_http_errors_off_5xx(mocker, status_code): req.status_code = status_code send_mock = mocker.patch.object(requests.Session, "send", return_value=req) - with pytest.raises(DefaultBackoffException): + with pytest.raises(AirbyteTracedException): list(stream.read_records(SyncMode.full_refresh)) assert send_mock.call_count == stream.max_retries + 1 @@ -330,7 +350,7 @@ def test_raise_on_http_errors(mocker, error): stream = AutoFailFalseHttpStream() send_mock = mocker.patch.object(requests.Session, "send", side_effect=error()) - with pytest.raises(DefaultBackoffException): + with pytest.raises(AirbyteTracedException): list(stream.read_records(SyncMode.full_refresh)) assert send_mock.call_count == stream.max_retries + 1 @@ -548,6 +568,9 @@ def test_using_cache(mocker, requests_mock): class AutoFailTrueHttpStream(StubBasicReadHttpStream): raise_on_http_errors = True + def __init__(self, **kwargs): + super().__init__(disable_retries=True, **kwargs) + def should_retry(self, *args, **kwargs): return True @@ -580,14 +603,16 @@ def test_http_stream_adapter_http_status_error_handler_should_retry_false_raise_ @pytest.mark.parametrize("status_code", range(400, 600)) def test_send_raise_on_http_errors_logs(mocker, status_code): - mocker.patch("time.sleep", lambda x: None) stream = AutoFailTrueHttpStream() - res = requests.Response() + res = Mock(spec=requests.Response) res.status_code = status_code + res.headers = {} mocker.patch.object(requests.Session, "send", return_value=res) mocker.patch.object(stream._http_client, "_logger") - with pytest.raises(requests.exceptions.HTTPError): - response = stream._http_client.send_request("GET", "https://g", {}, exit_on_rate_limit=True) + with pytest.raises(AirbyteTracedException): + _, response = stream._http_client.send_request( + "GET", "https://g", {}, exit_on_rate_limit=True + ) stream._http_client.logger.error.assert_called_with(response.text) assert response.status_code == status_code diff --git a/unit_tests/sources/streams/http/test_http_client.py b/unit_tests/sources/streams/http/test_http_client.py index 5cc6d20e4..d82548f05 100644 --- a/unit_tests/sources/streams/http/test_http_client.py +++ b/unit_tests/sources/streams/http/test_http_client.py @@ -555,7 +555,7 @@ def backoff_time(self, *args, **kwargs): session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException) as e: http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={} ) @@ -583,7 +583,7 @@ def backoff_time(self, *args, **kwargs): session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException) as e: http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={} ) @@ -613,7 +613,7 @@ def backoff_time(self, *args, **kwargs): session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException) as e: http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={} ) @@ -652,7 +652,7 @@ def backoff_time(self, *args, **kwargs): session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException) as e: http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={} ) @@ -680,7 +680,7 @@ def backoff_time(self, *args, **kwargs): session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(UserDefinedBackoffException): + with pytest.raises(AirbyteTracedException) as e: http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", request_kwargs={} ) @@ -709,7 +709,7 @@ def test_backoff_strategy_endless( session_send.return_value = mocked_response with patch.object(requests.Session, "send", return_value=mocked_response) as mocked_send: - with pytest.raises(expected_error): + with pytest.raises(AirbyteTracedException) as e: http_client.send_request( http_method="get", url="https://test_base_url.com/v1/endpoint", @@ -744,3 +744,62 @@ def test_given_different_headers_then_response_is_not_cached(requests_mock): ) assert second_response.json()["test"] == "second response" + + +@pytest.mark.usefixtures("mock_sleep") +@pytest.mark.parametrize( + "response_code, expected_failure_type, error_message, exception_class", + [ + (400, FailureType.system_error, "test error message", UserDefinedBackoffException), + (401, FailureType.config_error, "test error message", UserDefinedBackoffException), + (403, FailureType.transient_error, "test error message", UserDefinedBackoffException), + (400, FailureType.system_error, "test error message", DefaultBackoffException), + (401, FailureType.config_error, "test error message", DefaultBackoffException), + (403, FailureType.transient_error, "test error message", DefaultBackoffException), + (400, FailureType.system_error, "test error message", RateLimitBackoffException), + (401, FailureType.config_error, "test error message", RateLimitBackoffException), + (403, FailureType.transient_error, "test error message", RateLimitBackoffException), + ], +) +def test_send_with_retry_raises_airbyte_traced_exception_with_failure_type( + response_code, expected_failure_type, error_message, exception_class, requests_mock +): + if exception_class == UserDefinedBackoffException: + + class CustomBackoffStrategy: + def backoff_time(self, response_or_exception, attempt_count): + return 0.1 + + backoff_strategy = CustomBackoffStrategy() + response_action = ResponseAction.RETRY + elif exception_class == RateLimitBackoffException: + backoff_strategy = None + response_action = ResponseAction.RATE_LIMITED + else: + backoff_strategy = None + response_action = ResponseAction.RETRY + + error_mapping = { + response_code: ErrorResolution(response_action, expected_failure_type, error_message), + } + + http_client = HttpClient( + name="test", + logger=MagicMock(spec=logging.Logger), + error_handler=HttpStatusErrorHandler( + logger=MagicMock(), error_mapping=error_mapping, max_retries=1 + ), + backoff_strategy=backoff_strategy, + ) + + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + status_code=response_code, + json={"error": error_message}, + headers={}, + ) + + with pytest.raises(AirbyteTracedException) as e: + http_client.send_request(http_method="get", url="https://airbyte.io/", request_kwargs={}) + assert e.value.failure_type == expected_failure_type