Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def check_connection(
for stream in streams[: min(self.stream_count, len(streams))]:
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
message = f"Stream {stream.name} is not available: {reason}"
logger.warning(message)
return False, message
except Exception as error:
error_message = (
f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,56 +31,56 @@
400: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Bad request. Please check your request parameters.",
error_message="HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
),
401: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Unauthorized. Please ensure you are authenticated correctly.",
error_message="HTTP Status Code: 401. Error: Unauthorized. Please ensure you are authenticated correctly.",
),
403: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Forbidden. You don't have permission to access this resource.",
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
404: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Not found. The requested resource was not found on the server.",
error_message="HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
),
405: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="Method not allowed. Please check your request method.",
error_message="HTTP Status Code: 405. Error: Method not allowed. Please check your request method.",
),
408: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Request timeout.",
error_message="HTTP Status Code: 408. Error: Request timeout.",
),
429: ErrorResolution(
response_action=ResponseAction.RATE_LIMITED,
failure_type=FailureType.transient_error,
error_message="Too many requests.",
error_message="HTTP Status Code: 429. Error: Too many requests.",
),
500: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Internal server error.",
error_message="HTTP Status Code: 500. Error: Internal server error.",
),
502: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Bad gateway.",
error_message="HTTP Status Code: 502. Error: Bad gateway.",
),
503: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Service unavailable.",
error_message="HTTP Status Code: 503. Error: Service unavailable.",
),
504: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Gateway timeout.",
error_message="HTTP Status Code: 504. Error: Gateway timeout.",
),
}
12 changes: 11 additions & 1 deletion airbyte_cdk/sources/streams/http/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

import requests

from airbyte_cdk.models import FailureType


class BaseBackoffException(requests.exceptions.HTTPError):
def __init__(
self,
request: requests.PreparedRequest,
response: Optional[Union[requests.Response, Exception]],
error_message: str = "",
failure_type: Optional[FailureType] = None,
):
self.failure_type = failure_type
if isinstance(response, requests.Response):
error_message = (
error_message
Expand Down Expand Up @@ -43,14 +47,20 @@ def __init__(
request: requests.PreparedRequest,
response: Optional[Union[requests.Response, Exception]],
error_message: str = "",
failure_type: Optional[FailureType] = None,
):
"""
:param backoff: how long to backoff in seconds
:param request: the request that triggered this backoff exception
:param response: the response that triggered the backoff exception
"""
self.backoff = backoff
super().__init__(request=request, response=response, error_message=error_message)
super().__init__(
request=request,
response=response,
error_message=error_message,
failure_type=failure_type,
)


class DefaultBackoffException(BaseBackoffException):
Expand Down
33 changes: 24 additions & 9 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
AirbyteStreamStatus,
AirbyteStreamStatusReason,
AirbyteStreamStatusReasonType,
FailureType,
Level,
StreamDescriptor,
)
Expand All @@ -35,6 +36,7 @@
ResponseAction,
)
from airbyte_cdk.sources.streams.http.exceptions import (
BaseBackoffException,
DefaultBackoffException,
RateLimitBackoffException,
RequestBodyException,
Expand Down Expand Up @@ -290,15 +292,25 @@ def _send_with_retry(
backoff_handler = http_client_default_backoff_handler(
max_tries=max_tries, max_time=max_time
)
# backoff handlers wrap _send, so it will always return a response
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
request,
request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
) # type: ignore # mypy can't infer that backoff_handler wraps _send

return response
# backoff handlers wrap _send, so it will always return a response -- except when all retries are exhausted
try:
response = backoff_handler(rate_limit_backoff_handler(user_backoff_handler))(
request,
request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
) # type: ignore # mypy can't infer that backoff_handler wraps _send

return response
except BaseBackoffException as e:
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
raise MessageRepresentationAirbyteTracedErrors(
internal_message=f"Exhausted available request attempts. Exception: {e}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
failure_type=e.failure_type or FailureType.system_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
)

def _send(
self,
Expand Down Expand Up @@ -492,19 +504,22 @@ def _handle_error_resolution(
request=request,
response=(response if response is not None else exc),
error_message=error_message,
failure_type=error_resolution.failure_type,
)

elif retry_endlessly:
raise RateLimitBackoffException(
request=request,
response=(response if response is not None else exc),
error_message=error_message,
failure_type=error_resolution.failure_type,
)

raise DefaultBackoffException(
request=request,
response=(response if response is not None else exc),
error_message=error_message,
failure_type=error_resolution.failure_type,
)

elif response:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,11 @@ def _make_handled_request(self) -> Any:
except requests.exceptions.RequestException as e:
if e.response is not None:
if e.response.status_code == 429 or e.response.status_code >= 500:
raise DefaultBackoffException(request=e.response.request, response=e.response)
raise DefaultBackoffException(
request=e.response.request,
response=e.response,
failure_type=FailureType.transient_error,
)
if self._wrap_refresh_token_exception(e):
message = "Refresh token is invalid or expired. Please re-authenticate from Sources/<your source>/Settings."
raise AirbyteTracedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.utils import AirbyteTracedException

Expand Down Expand Up @@ -244,7 +243,7 @@ def test_given_traced_config_error_when_start_job_and_raise_this_exception_and_a
Since this is a config error, we assume the other jobs will fail for the same reasons.
"""
job_tracker = JobTracker(1)
self._job_repository.start.side_effect = MessageRepresentationAirbyteTracedErrors(
self._job_repository.start.side_effect = AirbyteTracedException(
"Can't create job", failure_type=FailureType.config_error
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_default_error_handler_with_default_response_filter(
),
ResponseAction.RETRY,
FailureType.system_error,
"Bad request. Please check your request parameters.",
"HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
),
(
"_with_http_response_status_402_fail_with_default_failure_type",
Expand All @@ -118,7 +118,7 @@ def test_default_error_handler_with_default_response_filter(
),
ResponseAction.FAIL,
FailureType.config_error,
"Forbidden. You don't have permission to access this resource.",
"HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
(
"_with_http_response_status_200_fail_with_contained_error_message",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
ErrorResolution(
response_action=ResponseAction.IGNORE,
failure_type=FailureType.config_error,
error_message="Forbidden. You don't have permission to access this resource.",
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
id="test_http_code_matches_ignore_action",
),
Expand All @@ -59,7 +59,7 @@
ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="Too many requests.",
error_message="HTTP Status Code: 429. Error: Too many requests.",
),
id="test_http_code_matches_retry_action",
),
Expand Down Expand Up @@ -104,7 +104,7 @@
ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="Forbidden. You don't have permission to access this resource.",
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
id="test_predicate_matches_headers",
),
Expand Down
16 changes: 5 additions & 11 deletions unit_tests/sources/declarative/requesters/test_http_requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,12 @@
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.streams.call_rate import (
AbstractAPIBudget,
HttpAPIBudget,
MovingWindowCallRatePolicy,
Rate,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_cdk.sources.streams.call_rate import HttpAPIBudget
from airbyte_cdk.sources.streams.http.exceptions import (
RequestBodyException,
UserDefinedBackoffException,
)
from airbyte_cdk.sources.types import Config
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


@pytest.fixture
Expand Down Expand Up @@ -880,7 +874,7 @@ def test_request_attempt_count_is_tracked_across_retries(http_requester_factory)
response.status_code = 500
http_requester._http_client._session.send.return_value = response

with pytest.raises(UserDefinedBackoffException):
with pytest.raises(AirbyteTracedException):
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})

assert (
Expand All @@ -906,7 +900,7 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_
response.status_code = 500
http_requester._http_client._session.send.return_value = response

with pytest.raises(UserDefinedBackoffException):
with pytest.raises(AirbyteTracedException):
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})

assert (
Expand Down Expand Up @@ -937,7 +931,7 @@ def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any
response.status_code = 500
http_requester._http_client._session.send.return_value = response

with pytest.raises(UserDefinedBackoffException):
with pytest.raises(AirbyteTracedException):
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})

assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ def test_given_ok_response_http_status_error_handler_returns_success_action(mock
403,
ResponseAction.FAIL,
FailureType.config_error,
"Forbidden. You don't have permission to access this resource.",
"HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
),
(
404,
ResponseAction.FAIL,
FailureType.system_error,
"Not found. The requested resource was not found on the server.",
"HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
),
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def test_http_availability_raises_unhandled_error(mocker):

assert (
False,
"Not found. The requested resource was not found on the server.",
"HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
) == HttpAvailabilityStrategy().check_availability(http_stream, logger)


Expand Down
Loading
Loading