Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 64 additions & 53 deletions src/crawlee/_autoscaling/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from operator import attrgetter
from typing import TYPE_CHECKING, TypeVar, cast

import psutil
from sortedcontainers import SortedList

from crawlee import service_locator
Expand All @@ -16,11 +15,14 @@
from crawlee._utils.context import ensure_context
from crawlee._utils.docs import docs_group
from crawlee._utils.recurring_task import RecurringTask
from crawlee._utils.system import get_memory_info
from crawlee.events._types import Event, EventSystemInfoData

if TYPE_CHECKING:
from types import TracebackType

from crawlee.configuration import Configuration

logger = getLogger(__name__)

T = TypeVar('T')
Expand All @@ -36,89 +38,98 @@ class Snapshotter:
dynamically based on the current demand and system load.
"""

_EVENT_LOOP_SNAPSHOT_INTERVAL = timedelta(milliseconds=500)
"""The interval at which the event loop is sampled."""

_CLIENT_SNAPSHOT_INTERVAL = timedelta(milliseconds=1000)
"""The interval at which the client is sampled."""

_SNAPSHOT_HISTORY = timedelta(seconds=30)
"""The time interval for which the snapshots are kept."""

_RESERVE_MEMORY_RATIO = 0.5
"""Fraction of memory kept in reserve. Used to calculate critical memory overload threshold."""

_MEMORY_WARNING_COOLDOWN_PERIOD = timedelta(milliseconds=10000)
"""Minimum time interval between logging successive critical memory overload warnings."""

_CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT = 2
"""Number of retries for a client request before considering it a failure due to rate limiting."""

def __init__(
self,
*,
event_loop_snapshot_interval: timedelta = timedelta(milliseconds=500),
client_snapshot_interval: timedelta = timedelta(milliseconds=1000),
max_used_cpu_ratio: float = 0.95,
max_memory_size: ByteSize | None = None,
max_used_memory_ratio: float = 0.9,
max_event_loop_delay: timedelta = timedelta(milliseconds=50),
max_client_errors: int = 1,
snapshot_history: timedelta = timedelta(seconds=30),
available_memory_ratio: float | None = None,
reserve_memory_ratio: float = 0.5,
memory_warning_cooldown_period: timedelta = timedelta(milliseconds=10000),
client_rate_limit_error_retry_count: int = 2,
max_used_cpu_ratio: float,
max_used_memory_ratio: float,
max_event_loop_delay: timedelta,
max_client_errors: int,
max_memory_size: ByteSize,
) -> None:
"""A default constructor.

In most cases, you should use the `from_config` constructor to create a new instance based on
the provided configuration.

Args:
event_loop_snapshot_interval: The interval at which the event loop is sampled.
client_snapshot_interval: The interval at which the client is sampled.
max_used_cpu_ratio: Sets the ratio, defining the maximum CPU usage. When the CPU usage is higher than
the provided ratio, the CPU is considered overloaded.
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. If `None`
is provided, the max amount of memory to be used is set to one quarter of total system memory.
I.e. on a system with 8192 MB, the `AutoscaledPool` will only use up to 2048 MB of memory.
max_used_memory_ratio: Sets the ratio, defining the maximum ratio of memory usage. When the memory usage
is higher than the provided ratio of `max_memory_size`, the memory is considered overloaded.
max_event_loop_delay: Sets the maximum delay of the event loop. When the delay is higher than the provided
value, the event loop is considered overloaded.
max_client_errors: Sets the maximum number of client errors (HTTP 429). When the number of client errors
is higher than the provided number, the client is considered overloaded.
snapshot_history: Sets the time interval for which the snapshots are kept.
available_memory_ratio: How big part of the system memory should be used if `max_memory_size` is not given.
reserve_memory_ratio: Fraction of memory kept in reserve. Used to calculate critical memory overload
threshold.
memory_warning_cooldown_period: Minimum time interval between logging successive critical memory overload
warnings.
client_rate_limit_error_retry_count: Number of retries for a client request before considering it a failure
due to rate limiting.
max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`.
"""
if available_memory_ratio is None and max_memory_size is None:
raise ValueError('At least one of `available_memory_ratio` or `max_memory_size` must be specified')

self._event_loop_snapshot_interval = event_loop_snapshot_interval
self._client_snapshot_interval = client_snapshot_interval
self._max_event_loop_delay = max_event_loop_delay
self._max_used_cpu_ratio = max_used_cpu_ratio
self._max_used_memory_ratio = max_used_memory_ratio
self._max_event_loop_delay = max_event_loop_delay
self._max_client_errors = max_client_errors
self._snapshot_history = snapshot_history
self._reserve_memory_ratio = reserve_memory_ratio
self._memory_warning_cooldown_period = memory_warning_cooldown_period
self._client_rate_limit_error_retry_count = client_rate_limit_error_retry_count
self._max_memory_size = max_memory_size or self._get_default_max_memory_size(
cast(float, available_memory_ratio)
)
self._max_memory_size = max_memory_size

self._cpu_snapshots = self._get_sorted_list_by_created_at(list[CpuSnapshot]())
self._event_loop_snapshots = self._get_sorted_list_by_created_at(list[EventLoopSnapshot]())
self._memory_snapshots = self._get_sorted_list_by_created_at(list[MemorySnapshot]())
self._client_snapshots = self._get_sorted_list_by_created_at(list[ClientSnapshot]())

self._snapshot_event_loop_task = RecurringTask(self._snapshot_event_loop, self._event_loop_snapshot_interval)
self._snapshot_client_task = RecurringTask(self._snapshot_client, self._client_snapshot_interval)
self._snapshot_event_loop_task = RecurringTask(self._snapshot_event_loop, self._EVENT_LOOP_SNAPSHOT_INTERVAL)
self._snapshot_client_task = RecurringTask(self._snapshot_client, self._CLIENT_SNAPSHOT_INTERVAL)

self._timestamp_of_last_memory_warning: datetime = datetime.now(timezone.utc) - timedelta(hours=1)

# Flag to indicate the context state.
self._active = False

@classmethod
def from_config(cls, config: Configuration | None = None) -> Snapshotter:
"""Create a new instance based on the provided configuration.

Args:
config: The configuration object. Uses the global (default) configuration if not provided.
"""
config = service_locator.get_configuration()

# Compute the maximum memory size based on the provided configuration. If `memory_mbytes` is provided,
# it uses that value. Otherwise, it calculates the `max_memory_size` as a proportion of the system's
# total available memory based on `available_memory_ratio`.
max_memory_size = (
ByteSize.from_mb(config.memory_mbytes)
if config.memory_mbytes
else ByteSize(int(get_memory_info().total_size.bytes * config.available_memory_ratio))
)

return cls(
max_used_cpu_ratio=config.max_used_cpu_ratio,
max_used_memory_ratio=config.max_used_memory_ratio,
max_event_loop_delay=config.max_event_loop_delay,
max_client_errors=config.max_client_errors,
max_memory_size=max_memory_size,
)

@staticmethod
def _get_sorted_list_by_created_at(input_list: list[T]) -> SortedList[T]:
return SortedList(input_list, key=attrgetter('created_at'))

@staticmethod
def _get_default_max_memory_size(available_memory_ratio: float) -> ByteSize:
"""Default `memory_max_size` is 1/4 of the total system memory."""
max_memory_size_in_bytes = int(psutil.virtual_memory().total * available_memory_ratio)
max_memory_size = ByteSize(max_memory_size_in_bytes)
logger.info(f'Setting max_memory_size of this run to {max_memory_size}.')
return max_memory_size

@property
def active(self) -> bool:
"""Indicates whether the context is active."""
Expand Down Expand Up @@ -281,7 +292,7 @@ def _snapshot_event_loop(self) -> None:
previous_snapshot = self._event_loop_snapshots[-1] if self._event_loop_snapshots else None

if previous_snapshot:
event_loop_delay = snapshot.created_at - previous_snapshot.created_at - self._event_loop_snapshot_interval
event_loop_delay = snapshot.created_at - previous_snapshot.created_at - self._EVENT_LOOP_SNAPSHOT_INTERVAL
snapshot.delay = event_loop_delay

snapshots = cast(list[Snapshot], self._event_loop_snapshots)
Expand Down Expand Up @@ -319,7 +330,7 @@ def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None:
# We'll keep snapshots from this index onwards.
keep_from_index = None
for i, snapshot in enumerate(snapshots):
if now - snapshot.created_at <= self._snapshot_history:
if now - snapshot.created_at <= self._SNAPSHOT_HISTORY:
keep_from_index = i
break

Expand All @@ -338,11 +349,11 @@ def _evaluate_memory_load(self, current_memory_usage_size: ByteSize, snapshot_ti
snapshot_timestamp: The time at which the memory snapshot was taken.
"""
# Check if the warning has been logged recently to avoid spamming
if snapshot_timestamp < self._timestamp_of_last_memory_warning + self._memory_warning_cooldown_period:
if snapshot_timestamp < self._timestamp_of_last_memory_warning + self._MEMORY_WARNING_COOLDOWN_PERIOD:
return

threshold_memory_size = self._max_used_memory_ratio * self._max_memory_size
buffer_memory_size = self._max_memory_size * (1 - self._max_used_memory_ratio) * self._reserve_memory_ratio
buffer_memory_size = self._max_memory_size * (1 - self._max_used_memory_ratio) * self._RESERVE_MEMORY_RATIO
overload_memory_threshold_size = threshold_memory_size + buffer_memory_size

# Log a warning if current memory usage exceeds the critical overload threshold
Expand Down
12 changes: 10 additions & 2 deletions src/crawlee/_service_locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ def get_event_manager(self) -> EventManager:
if self._event_manager is None:
from crawlee.events import LocalEventManager

self._event_manager = LocalEventManager()
self._event_manager = (
LocalEventManager().from_config(config=self._configuration)
if self._configuration
else LocalEventManager.from_config()
)

self._event_manager_was_retrieved = True
return self._event_manager
Expand All @@ -75,7 +79,11 @@ def get_storage_client(self) -> BaseStorageClient:
if self._storage_client is None:
from crawlee.storage_clients import MemoryStorageClient

self._storage_client = MemoryStorageClient.from_config()
self._storage_client = (
MemoryStorageClient.from_config(config=self._configuration)
if self._configuration
else MemoryStorageClient.from_config()
)

self._storage_client_was_retrieved = True
return self._storage_client
Expand Down
16 changes: 13 additions & 3 deletions src/crawlee/browsers/_playwright_browser_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from playwright.async_api import Playwright, async_playwright
from typing_extensions import override

from crawlee import service_locator
from crawlee._utils.context import ensure_context
from crawlee._utils.docs import docs_group
from crawlee.browsers._base_browser_plugin import BaseBrowserPlugin
Expand Down Expand Up @@ -35,8 +36,8 @@ def __init__(
self,
*,
browser_type: BrowserType = 'chromium',
browser_launch_options: Mapping[str, Any] | None = None,
browser_new_context_options: Mapping[str, Any] | None = None,
browser_launch_options: dict[str, Any] | None = None,
browser_new_context_options: dict[str, Any] | None = None,
max_open_pages_per_browser: int = 20,
) -> None:
"""A default constructor.
Expand All @@ -52,8 +53,17 @@ def __init__(
max_open_pages_per_browser: The maximum number of pages that can be opened in a single browser instance.
Once reached, a new browser instance will be launched to handle the excess.
"""
config = service_locator.get_configuration()

# Default browser launch options are based on the configuration.
default_launch_browser_options = {
'headless': config.headless,
'executable_path': config.default_browser_path,
'chromium_sandbox': not config.disable_browser_sandbox,
}

self._browser_type = browser_type
self._browser_launch_options = browser_launch_options or {}
self._browser_launch_options = default_launch_browser_options | (browser_launch_options or {})
self._browser_new_context_options = browser_new_context_options or {}
self._max_open_pages_per_browser = max_open_pages_per_browser

Expand Down
Loading
Loading