Skip to content

Commit 4fa6e46

Browse files
authored
chore: Convert unused AP input args into class consts (#911)
- Convert unused `AutoscaledPool` input arguments into class constants. - In a similar way, what we did to Snapshotter in #899.
1 parent 6940af4 commit 4fa6e46

File tree

3 files changed

+47
-46
lines changed

3 files changed

+47
-46
lines changed

src/crawlee/_autoscaling/autoscaled_pool.py

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@ class AutoscaledPool:
4343
any of the tasks, it is propagated and the pool is stopped.
4444
"""
4545

46+
_AUTOSCALE_INTERVAL = timedelta(seconds=10)
47+
"""Interval at which the autoscaled pool adjusts the desired concurrency based on the latest system status."""
48+
49+
_LOGGING_INTERVAL = timedelta(minutes=1)
50+
"""Interval at which the autoscaled pool logs its current state."""
51+
52+
_DESIRED_CONCURRENCY_RATIO = 0.9
53+
"""Minimum ratio of desired concurrency that must be reached before allowing further scale-up."""
54+
55+
_SCALE_UP_STEP_RATIO = 0.05
56+
"""Fraction of desired concurrency to add during each scale-up operation."""
57+
58+
_SCALE_DOWN_STEP_RATIO = 0.05
59+
"""Fraction of desired concurrency to remove during each scale-down operation."""
60+
61+
_TASK_TIMEOUT: timedelta | None = None
62+
"""Timeout within which the `run_task_function` must complete."""
63+
4664
def __init__(
4765
self,
4866
*,
@@ -51,17 +69,12 @@ def __init__(
5169
run_task_function: Callable[[], Awaitable],
5270
is_task_ready_function: Callable[[], Awaitable[bool]],
5371
is_finished_function: Callable[[], Awaitable[bool]],
54-
task_timeout: timedelta | None = None,
55-
autoscale_interval: timedelta = timedelta(seconds=10),
56-
logging_interval: timedelta = timedelta(minutes=1),
57-
desired_concurrency_ratio: float = 0.9,
58-
scale_up_step_ratio: float = 0.05,
59-
scale_down_step_ratio: float = 0.05,
6072
) -> None:
6173
"""A default constructor.
6274
6375
Args:
6476
system_status: Provides data about system utilization (load).
77+
concurrency_settings: Settings of concurrency levels.
6578
run_task_function: A function that performs an asynchronous resource-intensive task.
6679
is_task_ready_function: A function that indicates whether `run_task_function` should be called. This
6780
function is called every time there is free capacity for a new task and it should indicate whether
@@ -71,44 +84,21 @@ def __init__(
7184
resolves to `True` then the pool's run finishes. Being called only when there are no tasks being
7285
processed means that as long as `is_task_ready_function` keeps resolving to `True`,
7386
`is_finished_function` will never be called. To abort a run, use the `abort` method.
74-
task_timeout: Timeout in which the `run_task_function` needs to finish.
75-
autoscale_interval: Defines how often the pool should attempt to adjust the desired concurrency based on
76-
the latest system status. Setting it lower than 1 might have a severe impact on performance. We suggest
77-
using a value from 5 to 20.
78-
logging_interval: Specifies a period in which the instance logs its state, in seconds.
79-
desired_concurrency_ratio: Minimum level of desired concurrency to reach before more scaling up is allowed.
80-
scale_up_step_ratio: Defines the fractional amount of desired concurrency to be added with each scaling up.
81-
scale_down_step_ratio: Defines the amount of desired concurrency to be subtracted with each scaling down.
82-
concurrency_settings: Settings of concurrency levels.
8387
"""
84-
self._system_status = system_status
88+
concurrency_settings = concurrency_settings or ConcurrencySettings()
8589

90+
self._system_status = system_status
8691
self._run_task_function = run_task_function
8792
self._is_task_ready_function = is_task_ready_function
8893
self._is_finished_function = is_finished_function
89-
90-
self._task_timeout = task_timeout
91-
92-
self._logging_interval = logging_interval
93-
self._log_system_status_task = RecurringTask(self._log_system_status, logging_interval)
94-
95-
self._autoscale_task = RecurringTask(self._autoscale, autoscale_interval)
96-
97-
if desired_concurrency_ratio < 0 or desired_concurrency_ratio > 1:
98-
raise ValueError('desired_concurrency_ratio must be between 0 and 1 (non-inclusive)')
99-
100-
self._desired_concurrency_ratio = desired_concurrency_ratio
101-
102-
concurrency_settings = concurrency_settings or ConcurrencySettings()
103-
10494
self._desired_concurrency = concurrency_settings.desired_concurrency
10595
self._max_concurrency = concurrency_settings.max_concurrency
10696
self._min_concurrency = concurrency_settings.min_concurrency
97+
self._max_tasks_per_minute = concurrency_settings.max_tasks_per_minute
10798

108-
self._scale_up_step_ratio = scale_up_step_ratio
109-
self._scale_down_step_ratio = scale_down_step_ratio
99+
self._log_system_status_task = RecurringTask(self._log_system_status, self._LOGGING_INTERVAL)
100+
self._autoscale_task = RecurringTask(self._autoscale, self._AUTOSCALE_INTERVAL)
110101

111-
self._max_tasks_per_minute = concurrency_settings.max_tasks_per_minute
112102
self._is_paused = False
113103
self._current_run: _AutoscaledPoolRun | None = None
114104

@@ -195,7 +185,7 @@ def _autoscale(self) -> None:
195185
"""Inspect system load status and adjust desired concurrency if necessary. Do not call directly."""
196186
status = self._system_status.get_historical_system_info()
197187

198-
min_current_concurrency = math.floor(self._desired_concurrency_ratio * self.desired_concurrency)
188+
min_current_concurrency = math.floor(self._DESIRED_CONCURRENCY_RATIO * self.desired_concurrency)
199189
should_scale_up = (
200190
status.is_system_idle
201191
and self._desired_concurrency < self._max_concurrency
@@ -205,10 +195,10 @@ def _autoscale(self) -> None:
205195
should_scale_down = not status.is_system_idle and self._desired_concurrency > self._min_concurrency
206196

207197
if should_scale_up:
208-
step = math.ceil(self._scale_up_step_ratio * self._desired_concurrency)
198+
step = math.ceil(self._SCALE_UP_STEP_RATIO * self._desired_concurrency)
209199
self._desired_concurrency = min(self._max_concurrency, self._desired_concurrency + step)
210200
elif should_scale_down:
211-
step = math.ceil(self._scale_down_step_ratio * self._desired_concurrency)
201+
step = math.ceil(self._SCALE_DOWN_STEP_RATIO * self._desired_concurrency)
212202
self._desired_concurrency = max(self._min_concurrency, self._desired_concurrency - step)
213203

214204
def _log_system_status(self) -> None:
@@ -286,10 +276,10 @@ async def _worker_task(self) -> None:
286276
try:
287277
await asyncio.wait_for(
288278
self._run_task_function(),
289-
timeout=self._task_timeout.total_seconds() if self._task_timeout is not None else None,
279+
timeout=self._TASK_TIMEOUT.total_seconds() if self._TASK_TIMEOUT is not None else None,
290280
)
291281
except asyncio.TimeoutError:
292-
timeout_str = self._task_timeout.total_seconds() if self._task_timeout is not None else '*not set*'
282+
timeout_str = self._TASK_TIMEOUT.total_seconds() if self._TASK_TIMEOUT is not None else '*not set*'
293283
logger.warning(f'Task timed out after {timeout_str} seconds')
294284
finally:
295285
logger.debug('Worker task finished')

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,10 +307,10 @@ def __init__(
307307
self._snapshotter = Snapshotter.from_config(config)
308308
self._autoscaled_pool = AutoscaledPool(
309309
system_status=SystemStatus(self._snapshotter),
310+
concurrency_settings=concurrency_settings,
310311
is_finished_function=self.__is_finished_function,
311312
is_task_ready_function=self.__is_task_ready_function,
312313
run_task_function=self.__run_task_function,
313-
concurrency_settings=concurrency_settings,
314314
)
315315

316316
# State flags

tests/unit/_autoscaling/test_autoscaled_pool.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,10 @@ async def run() -> None:
143143
await pool.run()
144144

145145

146-
async def test_autoscales(system_status: SystemStatus | Mock) -> None:
146+
async def test_autoscales(
147+
monkeypatch: pytest.MonkeyPatch,
148+
system_status: SystemStatus | Mock,
149+
) -> None:
147150
done_count = 0
148151

149152
async def run() -> None:
@@ -169,6 +172,9 @@ def get_historical_system_info() -> SystemInfo:
169172

170173
cast(Mock, system_status.get_historical_system_info).side_effect = get_historical_system_info
171174

175+
# Override AP class attributes using monkeypatch.
176+
monkeypatch.setattr(AutoscaledPool, '_AUTOSCALE_INTERVAL', timedelta(seconds=0.1))
177+
172178
pool = AutoscaledPool(
173179
system_status=system_status,
174180
run_task_function=run,
@@ -179,7 +185,6 @@ def get_historical_system_info() -> SystemInfo:
179185
desired_concurrency=1,
180186
max_concurrency=4,
181187
),
182-
autoscale_interval=timedelta(seconds=0.1),
183188
)
184189

185190
pool_run_task = asyncio.create_task(pool.run(), name='pool run task')
@@ -209,15 +214,19 @@ def get_historical_system_info() -> SystemInfo:
209214
await pool_run_task
210215

211216

212-
async def test_autoscales_uses_desired_concurrency_ratio(system_status: SystemStatus | Mock) -> None:
217+
async def test_autoscales_uses_desired_concurrency_ratio(
218+
monkeypatch: pytest.MonkeyPatch,
219+
system_status: SystemStatus | Mock,
220+
) -> None:
213221
"""Test that desired concurrency ratio can limit desired concurrency.
214222
215223
This test creates situation where only one task is ready and then no other task is ever ready.
216224
This creates situation where the system could scale up desired concurrency, but it will not do so because
217225
desired_concurrency_ratio=1 means that first the system would have to increase current concurrency to same number as
218226
desired concurrency and due to no other task ever being ready, it will never happen. Thus desired concurrency will
219227
stay 2 as was the initial setup, even though other conditions would allow the increase. (max_concurrency=4,
220-
system being idle)."""
228+
system being idle).
229+
"""
221230

222231
async def run() -> None:
223232
await asyncio.sleep(0.1)
@@ -237,6 +246,10 @@ def get_historical_system_info() -> SystemInfo:
237246

238247
cast(Mock, system_status.get_historical_system_info).side_effect = get_historical_system_info
239248

249+
# Override AP class attributes using monkeypatch.
250+
monkeypatch.setattr(AutoscaledPool, '_AUTOSCALE_INTERVAL', timedelta(seconds=0.1))
251+
monkeypatch.setattr(AutoscaledPool, '_DESIRED_CONCURRENCY_RATIO', 1)
252+
240253
pool = AutoscaledPool(
241254
system_status=system_status,
242255
run_task_function=run,
@@ -247,8 +260,6 @@ def get_historical_system_info() -> SystemInfo:
247260
desired_concurrency=2,
248261
max_concurrency=4,
249262
),
250-
autoscale_interval=timedelta(seconds=0.1),
251-
desired_concurrency_ratio=1,
252263
)
253264

254265
pool_run_task = asyncio.create_task(pool.run(), name='pool run task')

0 commit comments

Comments
 (0)