7
7
from operator import attrgetter
8
8
from typing import TYPE_CHECKING , TypeVar , cast
9
9
10
- import psutil
11
10
from sortedcontainers import SortedList
12
11
13
12
from crawlee import service_locator
16
15
from crawlee ._utils .context import ensure_context
17
16
from crawlee ._utils .docs import docs_group
18
17
from crawlee ._utils .recurring_task import RecurringTask
18
+ from crawlee ._utils .system import get_memory_info
19
19
from crawlee .events ._types import Event , EventSystemInfoData
20
20
21
21
if TYPE_CHECKING :
22
22
from types import TracebackType
23
23
24
+ from crawlee .configuration import Configuration
25
+
24
26
logger = getLogger (__name__ )
25
27
26
28
T = TypeVar ('T' )
@@ -36,89 +38,98 @@ class Snapshotter:
36
38
dynamically based on the current demand and system load.
37
39
"""
38
40
41
+ _EVENT_LOOP_SNAPSHOT_INTERVAL = timedelta (milliseconds = 500 )
42
+ """The interval at which the event loop is sampled."""
43
+
44
+ _CLIENT_SNAPSHOT_INTERVAL = timedelta (milliseconds = 1000 )
45
+ """The interval at which the client is sampled."""
46
+
47
+ _SNAPSHOT_HISTORY = timedelta (seconds = 30 )
48
+ """The time interval for which the snapshots are kept."""
49
+
50
+ _RESERVE_MEMORY_RATIO = 0.5
51
+ """Fraction of memory kept in reserve. Used to calculate critical memory overload threshold."""
52
+
53
+ _MEMORY_WARNING_COOLDOWN_PERIOD = timedelta (milliseconds = 10000 )
54
+ """Minimum time interval between logging successive critical memory overload warnings."""
55
+
56
+ _CLIENT_RATE_LIMIT_ERROR_RETRY_COUNT = 2
57
+ """Number of retries for a client request before considering it a failure due to rate limiting."""
58
+
39
59
def __init__ (
40
60
self ,
41
61
* ,
42
- event_loop_snapshot_interval : timedelta = timedelta (milliseconds = 500 ),
43
- client_snapshot_interval : timedelta = timedelta (milliseconds = 1000 ),
44
- max_used_cpu_ratio : float = 0.95 ,
45
- max_memory_size : ByteSize | None = None ,
46
- max_used_memory_ratio : float = 0.9 ,
47
- max_event_loop_delay : timedelta = timedelta (milliseconds = 50 ),
48
- max_client_errors : int = 1 ,
49
- snapshot_history : timedelta = timedelta (seconds = 30 ),
50
- available_memory_ratio : float | None = None ,
51
- reserve_memory_ratio : float = 0.5 ,
52
- memory_warning_cooldown_period : timedelta = timedelta (milliseconds = 10000 ),
53
- client_rate_limit_error_retry_count : int = 2 ,
62
+ max_used_cpu_ratio : float ,
63
+ max_used_memory_ratio : float ,
64
+ max_event_loop_delay : timedelta ,
65
+ max_client_errors : int ,
66
+ max_memory_size : ByteSize ,
54
67
) -> None :
55
68
"""A default constructor.
56
69
70
+ In most cases, you should use the `from_config` constructor to create a new instance based on
71
+ the provided configuration.
72
+
57
73
Args:
58
- event_loop_snapshot_interval: The interval at which the event loop is sampled.
59
- client_snapshot_interval: The interval at which the client is sampled.
60
74
max_used_cpu_ratio: Sets the ratio, defining the maximum CPU usage. When the CPU usage is higher than
61
75
the provided ratio, the CPU is considered overloaded.
62
- max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`. If `None`
63
- is provided, the max amount of memory to be used is set to one quarter of total system memory.
64
- I.e. on a system with 8192 MB, the `AutoscaledPool` will only use up to 2048 MB of memory.
65
76
max_used_memory_ratio: Sets the ratio, defining the maximum ratio of memory usage. When the memory usage
66
77
is higher than the provided ratio of `max_memory_size`, the memory is considered overloaded.
67
78
max_event_loop_delay: Sets the maximum delay of the event loop. When the delay is higher than the provided
68
79
value, the event loop is considered overloaded.
69
80
max_client_errors: Sets the maximum number of client errors (HTTP 429). When the number of client errors
70
81
is higher than the provided number, the client is considered overloaded.
71
- snapshot_history: Sets the time interval for which the snapshots are kept.
72
- available_memory_ratio: How big part of the system memory should be used if `max_memory_size` is not given.
73
- reserve_memory_ratio: Fraction of memory kept in reserve. Used to calculate critical memory overload
74
- threshold.
75
- memory_warning_cooldown_period: Minimum time interval between logging successive critical memory overload
76
- warnings.
77
- client_rate_limit_error_retry_count: Number of retries for a client request before considering it a failure
78
- due to rate limiting.
82
+ max_memory_size: Sets the maximum amount of system memory to be used by the `AutoscaledPool`.
79
83
"""
80
- if available_memory_ratio is None and max_memory_size is None :
81
- raise ValueError ('At least one of `available_memory_ratio` or `max_memory_size` must be specified' )
82
-
83
- self ._event_loop_snapshot_interval = event_loop_snapshot_interval
84
- self ._client_snapshot_interval = client_snapshot_interval
85
- self ._max_event_loop_delay = max_event_loop_delay
86
84
self ._max_used_cpu_ratio = max_used_cpu_ratio
87
85
self ._max_used_memory_ratio = max_used_memory_ratio
86
+ self ._max_event_loop_delay = max_event_loop_delay
88
87
self ._max_client_errors = max_client_errors
89
- self ._snapshot_history = snapshot_history
90
- self ._reserve_memory_ratio = reserve_memory_ratio
91
- self ._memory_warning_cooldown_period = memory_warning_cooldown_period
92
- self ._client_rate_limit_error_retry_count = client_rate_limit_error_retry_count
93
- self ._max_memory_size = max_memory_size or self ._get_default_max_memory_size (
94
- cast (float , available_memory_ratio )
95
- )
88
+ self ._max_memory_size = max_memory_size
96
89
97
90
self ._cpu_snapshots = self ._get_sorted_list_by_created_at (list [CpuSnapshot ]())
98
91
self ._event_loop_snapshots = self ._get_sorted_list_by_created_at (list [EventLoopSnapshot ]())
99
92
self ._memory_snapshots = self ._get_sorted_list_by_created_at (list [MemorySnapshot ]())
100
93
self ._client_snapshots = self ._get_sorted_list_by_created_at (list [ClientSnapshot ]())
101
94
102
- self ._snapshot_event_loop_task = RecurringTask (self ._snapshot_event_loop , self ._event_loop_snapshot_interval )
103
- self ._snapshot_client_task = RecurringTask (self ._snapshot_client , self ._client_snapshot_interval )
95
+ self ._snapshot_event_loop_task = RecurringTask (self ._snapshot_event_loop , self ._EVENT_LOOP_SNAPSHOT_INTERVAL )
96
+ self ._snapshot_client_task = RecurringTask (self ._snapshot_client , self ._CLIENT_SNAPSHOT_INTERVAL )
104
97
105
98
self ._timestamp_of_last_memory_warning : datetime = datetime .now (timezone .utc ) - timedelta (hours = 1 )
106
99
107
100
# Flag to indicate the context state.
108
101
self ._active = False
109
102
103
+ @classmethod
104
+ def from_config (cls , config : Configuration | None = None ) -> Snapshotter :
105
+ """Create a new instance based on the provided configuration.
106
+
107
+ Args:
108
+ config: The configuration object. Uses the global (default) configuration if not provided.
109
+ """
110
+ config = service_locator .get_configuration ()
111
+
112
+ # Compute the maximum memory size based on the provided configuration. If `memory_mbytes` is provided,
113
+ # it uses that value. Otherwise, it calculates the `max_memory_size` as a proportion of the system's
114
+ # total available memory based on `available_memory_ratio`.
115
+ max_memory_size = (
116
+ ByteSize .from_mb (config .memory_mbytes )
117
+ if config .memory_mbytes
118
+ else ByteSize (int (get_memory_info ().total_size .bytes * config .available_memory_ratio ))
119
+ )
120
+
121
+ return cls (
122
+ max_used_cpu_ratio = config .max_used_cpu_ratio ,
123
+ max_used_memory_ratio = config .max_used_memory_ratio ,
124
+ max_event_loop_delay = config .max_event_loop_delay ,
125
+ max_client_errors = config .max_client_errors ,
126
+ max_memory_size = max_memory_size ,
127
+ )
128
+
110
129
@staticmethod
111
130
def _get_sorted_list_by_created_at (input_list : list [T ]) -> SortedList [T ]:
112
131
return SortedList (input_list , key = attrgetter ('created_at' ))
113
132
114
- @staticmethod
115
- def _get_default_max_memory_size (available_memory_ratio : float ) -> ByteSize :
116
- """Default `memory_max_size` is 1/4 of the total system memory."""
117
- max_memory_size_in_bytes = int (psutil .virtual_memory ().total * available_memory_ratio )
118
- max_memory_size = ByteSize (max_memory_size_in_bytes )
119
- logger .info (f'Setting max_memory_size of this run to { max_memory_size } .' )
120
- return max_memory_size
121
-
122
133
@property
123
134
def active (self ) -> bool :
124
135
"""Indicates whether the context is active."""
@@ -281,7 +292,7 @@ def _snapshot_event_loop(self) -> None:
281
292
previous_snapshot = self ._event_loop_snapshots [- 1 ] if self ._event_loop_snapshots else None
282
293
283
294
if previous_snapshot :
284
- event_loop_delay = snapshot .created_at - previous_snapshot .created_at - self ._event_loop_snapshot_interval
295
+ event_loop_delay = snapshot .created_at - previous_snapshot .created_at - self ._EVENT_LOOP_SNAPSHOT_INTERVAL
285
296
snapshot .delay = event_loop_delay
286
297
287
298
snapshots = cast (list [Snapshot ], self ._event_loop_snapshots )
@@ -319,7 +330,7 @@ def _prune_snapshots(self, snapshots: list[Snapshot], now: datetime) -> None:
319
330
# We'll keep snapshots from this index onwards.
320
331
keep_from_index = None
321
332
for i , snapshot in enumerate (snapshots ):
322
- if now - snapshot .created_at <= self ._snapshot_history :
333
+ if now - snapshot .created_at <= self ._SNAPSHOT_HISTORY :
323
334
keep_from_index = i
324
335
break
325
336
@@ -338,11 +349,11 @@ def _evaluate_memory_load(self, current_memory_usage_size: ByteSize, snapshot_ti
338
349
snapshot_timestamp: The time at which the memory snapshot was taken.
339
350
"""
340
351
# Check if the warning has been logged recently to avoid spamming
341
- if snapshot_timestamp < self ._timestamp_of_last_memory_warning + self ._memory_warning_cooldown_period :
352
+ if snapshot_timestamp < self ._timestamp_of_last_memory_warning + self ._MEMORY_WARNING_COOLDOWN_PERIOD :
342
353
return
343
354
344
355
threshold_memory_size = self ._max_used_memory_ratio * self ._max_memory_size
345
- buffer_memory_size = self ._max_memory_size * (1 - self ._max_used_memory_ratio ) * self ._reserve_memory_ratio
356
+ buffer_memory_size = self ._max_memory_size * (1 - self ._max_used_memory_ratio ) * self ._RESERVE_MEMORY_RATIO
346
357
overload_memory_threshold_size = threshold_memory_size + buffer_memory_size
347
358
348
359
# Log a warning if current memory usage exceeds the critical overload threshold
0 commit comments