Skip to content
15 changes: 6 additions & 9 deletions airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#

import logging
import traceback
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Tuple
from typing import Any, List, Mapping, Tuple, Union

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


Expand All @@ -34,20 +35,16 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker

if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
if not self.use_check_availability:
return True, None

availability_strategy = HttpAvailabilityStrategy()

try:
for stream in streams[: min(self.stream_count, len(streams))]:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
Expand Down
43 changes: 31 additions & 12 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,30 @@
import logging
import traceback
from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


def evaluate_availability(
stream: Union[Stream, AbstractStream], logger: logging.Logger
) -> Tuple[bool, Optional[str]]:
"""
As a transition period, we want to support both Stream and AbstractStream until we migrate everything to AbstractStream.
"""
if isinstance(stream, Stream):
return HttpAvailabilityStrategy().check_availability(stream, logger)
elif isinstance(stream, AbstractStream):
availability = stream.check_availability()
return availability.is_available, availability.reason
else:
raise ValueError(f"Unsupported stream type {type(stream)}")


@dataclass(frozen=True)
class DynamicStreamCheckConfig:
"""Defines the configuration for dynamic stream during connection checking. This class specifies
Expand Down Expand Up @@ -51,7 +68,7 @@ def check_connection(
) -> Tuple[bool, Any]:
"""Checks the connection to the source and its streams."""
try:
streams = source.streams(config=config)
streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
if not streams:
return False, f"No streams to connect to from source {source}"
except Exception as error:
Expand Down Expand Up @@ -82,13 +99,15 @@ def check_connection(
return True, None

def _check_stream_availability(
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
self,
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
stream_name: str,
logger: logging.Logger,
) -> Tuple[bool, Any]:
"""Checks if streams are available."""
availability_strategy = HttpAvailabilityStrategy()
try:
stream = stream_name_to_stream[stream_name]
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
message = f"Stream {stream_name} is not available: {reason}"
logger.warning(message)
Expand All @@ -98,7 +117,10 @@ def _check_stream_availability(
return True, None

def _check_dynamic_streams_availability(
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
self,
source: AbstractSource,
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
logger: logging.Logger,
) -> Tuple[bool, Any]:
"""Checks the availability of dynamic streams."""
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
Expand Down Expand Up @@ -135,18 +157,15 @@ def _map_generated_streams(
def _check_generated_streams_availability(
self,
generated_streams: List[Dict[str, Any]],
stream_name_to_stream: Dict[str, Any],
stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
logger: logging.Logger,
max_count: int,
) -> Tuple[bool, Any]:
"""Checks availability of generated dynamic streams."""
availability_strategy = HttpAvailabilityStrategy()
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
stream = stream_name_to_stream[declarative_stream["name"]]
try:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
stream_is_available, reason = evaluate_availability(stream, logger)
if not stream_is_available:
message = f"Dynamic Stream {stream.name} is not available: {reason}"
logger.warning(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AlwaysAvailableAvailabilityStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream
Expand Down Expand Up @@ -325,7 +322,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=cursor.cursor_field.cursor_field_key
if hasattr(cursor, "cursor_field")
Expand Down Expand Up @@ -362,7 +358,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=None,
logger=self.logger,
Expand Down Expand Up @@ -417,7 +412,6 @@ def _group_streams(
partition_generator=partition_generator,
name=declarative_stream.name,
json_schema=declarative_stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=get_primary_key_from_stream(declarative_stream.primary_key),
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from .abstract_file_based_availability_strategy import (
AbstractFileBasedAvailabilityStrategy,
AbstractFileBasedAvailabilityStrategyWrapper,
)
from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy
from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy

__all__ = [
"AbstractFileBasedAvailabilityStrategy",
"AbstractFileBasedAvailabilityStrategyWrapper",
"DefaultFileBasedAvailabilityStrategy",
]
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AbstractAvailabilityStrategy,
StreamAvailability,
StreamAvailable,
StreamUnavailable,
)
from airbyte_cdk.sources.streams.core import Stream

if TYPE_CHECKING:
Expand All @@ -28,7 +22,7 @@ def check_availability( # type: ignore[override] # Signature doesn't match bas
self,
stream: Stream,
logger: logging.Logger,
_: Optional[Source],
source: Optional[Source] = None,
) -> Tuple[bool, Optional[str]]:
"""
Perform a connection check for the stream.
Expand All @@ -51,23 +45,3 @@ def check_availability_and_parsability(
Returns (True, None) if successful, otherwise (False, <error message>).
"""
...


class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy):
def __init__(self, stream: AbstractFileBasedStream) -> None:
self.stream = stream

def check_availability(self, logger: logging.Logger) -> StreamAvailability:
is_available, reason = self.stream.availability_strategy.check_availability(
self.stream, logger, None
)
if is_available:
return StreamAvailable()
return StreamUnavailable(reason or "")

def check_availability_and_parsability(
self, logger: logging.Logger
) -> Tuple[bool, Optional[str]]:
return self.stream.availability_strategy.check_availability_and_parsability(
self.stream, logger, None
)
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool:
)

@cached_property
@deprecated("Deprecated as of CDK version 3.7.0.")
def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy:
return self._availability_strategy

Expand Down
2 changes: 0 additions & 2 deletions airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
from airbyte_cdk.sources.file_based.availability_strategy import (
AbstractFileBasedAvailabilityStrategy,
AbstractFileBasedAvailabilityStrategyWrapper,
)
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
Expand Down Expand Up @@ -97,7 +96,6 @@ def create_from_stream(
),
name=stream.name,
json_schema=stream.get_json_schema(),
availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream),
primary_key=pk,
cursor_field=cursor_field,
logger=logger,
Expand Down
1 change: 1 addition & 0 deletions airbyte_cdk/sources/streams/availability_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from airbyte_cdk.sources import Source


# FIXME this
class AvailabilityStrategy(ABC):
"""
Abstract base class for checking stream availability.
Expand Down
12 changes: 6 additions & 6 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,6 @@ def cursor_field(self) -> Optional[str]:
:return: The name of the field used as a cursor. Nested cursor fields are not supported.
"""

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
:return: The stream's availability
"""

@abstractmethod
def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand All @@ -94,3 +88,9 @@ def cursor(self) -> Cursor:
"""
:return: The cursor associated with this stream.
"""

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
:return: If the stream is available and if not, why
"""
43 changes: 0 additions & 43 deletions airbyte_cdk/sources/streams/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy
from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade
from airbyte_cdk.sources.streams.concurrent.availability_strategy import (
AbstractAvailabilityStrategy,
AlwaysAvailableAvailabilityStrategy,
)
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage
Expand Down Expand Up @@ -101,7 +96,6 @@ def create_from_stream(
name=stream.name,
namespace=stream.namespace,
json_schema=stream.get_json_schema(),
availability_strategy=AlwaysAvailableAvailabilityStrategy(),
primary_key=pk,
cursor_field=cursor_field,
logger=logger,
Expand Down Expand Up @@ -210,18 +204,6 @@ def get_json_schema(self) -> Mapping[str, Any]:
def supports_incremental(self) -> bool:
return self._legacy_stream.supports_incremental

def check_availability(
self, logger: logging.Logger, source: Optional["Source"] = None
) -> Tuple[bool, Optional[str]]:
"""
Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters
:param logger: (ignored)
:param source: (ignored)
:return:
"""
availability = self._abstract_stream.check_availability()
return availability.is_available(), availability.message()

def as_airbyte_stream(self) -> AirbyteStream:
return self._abstract_stream.as_airbyte_stream()

Expand Down Expand Up @@ -370,28 +352,3 @@ def generate(self) -> Iterable[Partition]:
self._cursor_field,
self._state,
)


@deprecated(
"Availability strategy has been soft deprecated. Do not use. Class is subject to removal",
category=ExperimentalClassWarning,
)
class AvailabilityStrategyFacade(AvailabilityStrategy):
def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy):
self._abstract_availability_strategy = abstract_availability_strategy

def check_availability(
self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None
) -> Tuple[bool, Optional[str]]:
"""
Checks stream availability.

Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy.

:param stream: (unused)
:param logger: logger object to use
:param source: (unused)
:return: A tuple of (boolean, str). If boolean is true, then the stream
"""
stream_availability = self._abstract_availability_strategy.check_availability(logger)
return stream_availability.is_available(), stream_availability.message()
Loading
Loading