From 8909dfde971dbf7038598ba1c436c7eaa05ec498 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 24 Apr 2025 14:54:30 +0200 Subject: [PATCH 1/4] Remove redundant retires from batch_add_requests Align sync and async version to both swallow exception, but return unprocessed requets Add tests --- .../clients/resource_clients/request_queue.py | 117 +++++++----------- tests/unit/test_client_request_queue.py | 91 ++++++++++++++ 2 files changed, 135 insertions(+), 73 deletions(-) create mode 100644 tests/unit/test_client_request_queue.py diff --git a/src/apify_client/clients/resource_clients/request_queue.py b/src/apify_client/clients/resource_clients/request_queue.py index bb028b0a..4b77ab10 100644 --- a/src/apify_client/clients/resource_clients/request_queue.py +++ b/src/apify_client/clients/resource_clients/request_queue.py @@ -3,11 +3,9 @@ import asyncio import logging import math -from dataclasses import dataclass -from datetime import timedelta +from collections.abc import Iterable from queue import Queue -from time import sleep -from typing import TYPE_CHECKING, Any, TypedDict +from typing import Any, TypedDict from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields from more_itertools import constrained_batches @@ -16,9 +14,6 @@ from apify_client._utils import catch_not_found_or_throw, pluck_data from apify_client.clients.base import ResourceClient, ResourceClientAsync -if TYPE_CHECKING: - from collections.abc import Iterable - logger = logging.getLogger(__name__) _RQ_MAX_REQUESTS_PER_BATCH = 25 @@ -41,17 +36,9 @@ class BatchAddRequestsResult(TypedDict): unprocessedRequests: list[dict] -@dataclass -class AddRequestsBatch: - """Batch of requests to add to the request queue. - - Args: - requests: List of requests to be added to the request queue. - num_of_retries: Number of times this batch has been retried. - """ - - requests: Iterable[dict] - num_of_retries: int = 0 +def _get_unprocessed_request_from_request(request: dict[str, str]) -> dict[str, str]: + relevant_keys = {'url', 'uniqueKey', 'method'} + return {key: value for key, value in request.items() if key in relevant_keys} class RequestQueueClient(ResourceClient): @@ -297,8 +284,6 @@ def batch_add_requests( *, forefront: bool = False, max_parallel: int = 1, - max_unprocessed_requests_retries: int = 3, - min_delay_between_unprocessed_requests_retries: timedelta = timedelta(milliseconds=500), ) -> BatchAddRequestsResult: """Add requests to the request queue in batches. @@ -312,9 +297,6 @@ def batch_add_requests( max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable to the async client. For the sync client, this value must be set to 1, as parallel execution is not supported. - max_unprocessed_requests_retries: Number of retry attempts for unprocessed requests. - min_delay_between_unprocessed_requests_retries: Minimum delay between retry attempts for unprocessed - requests. Returns: Result containing lists of processed and unprocessed requests. @@ -335,42 +317,43 @@ def batch_add_requests( ) # Put the batches into the queue for processing. - queue = Queue[AddRequestsBatch]() + queue = Queue[Iterable[dict]]() - for b in batches: - queue.put(AddRequestsBatch(b)) + for batch in batches: + queue.put(batch) processed_requests = list[dict]() - unprocessed_requests = list[dict]() + unprocessed_requests = dict[str, dict]() # Process all batches in the queue sequentially. while not queue.empty(): - batch = queue.get() + request_batch = queue.get() + # All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response. + for request in request_batch: + unprocessed_requests[request['uniqueKey']] = _get_unprocessed_request_from_request(request) - # Send the batch to the API. - response = self.http_client.call( - url=self._url('requests/batch'), - method='POST', - params=request_params, - json=list(batch.requests), - timeout_secs=_MEDIUM_TIMEOUT, - ) - - # Retry if the request failed and the retry limit has not been reached. - if not response.is_success and batch.num_of_retries < max_unprocessed_requests_retries: - batch.num_of_retries += 1 - sleep(min_delay_between_unprocessed_requests_retries.total_seconds()) - queue.put(batch) + try: + # Send the batch to the API. + response = self.http_client.call( + url=self._url('requests/batch'), + method='POST', + params=request_params, + json=list(request_batch), + timeout_secs=_MEDIUM_TIMEOUT, + ) - # Otherwise, add the processed/unprocessed requests to their respective lists. - else: response_parsed = parse_date_fields(pluck_data(response.json())) processed_requests.extend(response_parsed.get('processedRequests', [])) - unprocessed_requests.extend(response_parsed.get('unprocessedRequests', [])) + + for processed_request in response_parsed.get('processedRequests', []): + unprocessed_requests.pop(processed_request['uniqueKey'], None) + + except Exception as exc: + logger.warning(f'Error occurred while processing a batch of requests: {exc}') return { 'processedRequests': processed_requests, - 'unprocessedRequests': unprocessed_requests, + 'unprocessedRequests': list(unprocessed_requests.values()), } def batch_delete_requests(self, requests: list[dict]) -> dict: @@ -661,24 +644,26 @@ async def delete_request_lock( async def _batch_add_requests_worker( self, - queue: asyncio.Queue[AddRequestsBatch], + queue: asyncio.Queue[Iterable[dict]], request_params: dict, - max_unprocessed_requests_retries: int, - min_delay_between_unprocessed_requests_retries: timedelta, ) -> BatchAddRequestsResult: """Worker function to process a batch of requests. - This worker will process batches from the queue, retrying requests that fail until the retry limit is reached. + This worker will process batches from the queue. Return result containing lists of processed and unprocessed requests by the worker. """ processed_requests = list[dict]() - unprocessed_requests = list[dict]() + unprocessed_requests = dict[str, dict]() while True: # Get the next batch from the queue. try: - batch = await queue.get() + request_batch = await queue.get() + # All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response. + for request in request_batch: + unprocessed_requests[request['uniqueKey']] = _get_unprocessed_request_from_request(request) + except asyncio.CancelledError: break @@ -688,22 +673,15 @@ async def _batch_add_requests_worker( url=self._url('requests/batch'), method='POST', params=request_params, - json=list(batch.requests), + json=list(request_batch), timeout_secs=_MEDIUM_TIMEOUT, ) response_parsed = parse_date_fields(pluck_data(response.json())) + processed_requests.extend(response_parsed.get('processedRequests', [])) - # Retry if the request failed and the retry limit has not been reached. - if not response.is_success and batch.num_of_retries < max_unprocessed_requests_retries: - batch.num_of_retries += 1 - await asyncio.sleep(min_delay_between_unprocessed_requests_retries.total_seconds()) - await queue.put(batch) - - # Otherwise, add the processed/unprocessed requests to their respective lists. - else: - processed_requests.extend(response_parsed.get('processedRequests', [])) - unprocessed_requests.extend(response_parsed.get('unprocessedRequests', [])) + for processed_request in response_parsed.get('processedRequests', []): + unprocessed_requests.pop(processed_request['uniqueKey'], None) except Exception as exc: logger.warning(f'Error occurred while processing a batch of requests: {exc}') @@ -714,7 +692,7 @@ async def _batch_add_requests_worker( return { 'processedRequests': processed_requests, - 'unprocessedRequests': unprocessed_requests, + 'unprocessedRequests': list(unprocessed_requests.values()), } async def batch_add_requests( @@ -723,8 +701,6 @@ async def batch_add_requests( *, forefront: bool = False, max_parallel: int = 5, - max_unprocessed_requests_retries: int = 3, - min_delay_between_unprocessed_requests_retries: timedelta = timedelta(milliseconds=500), ) -> BatchAddRequestsResult: """Add requests to the request queue in batches. @@ -738,15 +714,12 @@ async def batch_add_requests( max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable to the async client. For the sync client, this value must be set to 1, as parallel execution is not supported. - max_unprocessed_requests_retries: Number of retry attempts for unprocessed requests. - min_delay_between_unprocessed_requests_retries: Minimum delay between retry attempts for unprocessed - requests. Returns: Result containing lists of processed and unprocessed requests. """ tasks = set[asyncio.Task]() - queue: asyncio.Queue[AddRequestsBatch] = asyncio.Queue() + queue: asyncio.Queue[Iterable[dict]] = asyncio.Queue() request_params = self._params(clientKey=self.client_key, forefront=forefront) # Compute the payload size limit to ensure it doesn't exceed the maximum allowed size. @@ -760,15 +733,13 @@ async def batch_add_requests( ) for batch in batches: - await queue.put(AddRequestsBatch(batch)) + await queue.put(batch) # Start a required number of worker tasks to process the batches. for i in range(max_parallel): coro = self._batch_add_requests_worker( queue, request_params, - max_unprocessed_requests_retries, - min_delay_between_unprocessed_requests_retries, ) task = asyncio.create_task(coro, name=f'batch_add_requests_worker_{i}') tasks.add(task) diff --git a/tests/unit/test_client_request_queue.py b/tests/unit/test_client_request_queue.py new file mode 100644 index 00000000..ed433754 --- /dev/null +++ b/tests/unit/test_client_request_queue.py @@ -0,0 +1,91 @@ +import respx + +from apify_client import ApifyClient, ApifyClientAsync + +_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT = """{ + "data": { + "processedRequests": [ + { + "requestId": "YiKoxjkaS9gjGTqhF", + "uniqueKey": "http://example.com/1", + "wasAlreadyPresent": true, + "wasAlreadyHandled": false + } + ], + "unprocessedRequests": [ + { + "uniqueKey": "http://example.com/2", + "url": "http://example.com/2", + "method": "GET" + } + ] + } +}""" + + +@respx.mock +async def test_batch_not_processed_due_to_exception_async() -> None: + """Test that all requests are unprocessed unless explicitly stated by the server that they have been processed.""" + client = ApifyClientAsync(token='') + + respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401)) + requests = [ + {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, + {'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'}, + ] + rq_client = client.request_queue(request_queue_id='whatever') + + response = await rq_client.batch_add_requests(requests=requests) + assert response['unprocessedRequests'] == requests + + +@respx.mock +async def test_batch_processed_partially_async() -> None: + client = ApifyClientAsync(token='') + + respx.route(method='POST', host='api.apify.com').mock( + return_value=respx.MockResponse(200, content=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT) + ) + requests = [ + {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, + {'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'}, + ] + rq_client = client.request_queue(request_queue_id='whatever') + + response = await rq_client.batch_add_requests(requests=requests) + assert requests[0]['uniqueKey'] in {request['uniqueKey'] for request in response['processedRequests']} + assert response['unprocessedRequests'] == [requests[1]] + + +@respx.mock +def test_batch_not_processed_due_to_exception_sync() -> None: + """Test that all requests are unprocessed unless explicitly stated by the server that they have been processed.""" + client = ApifyClient(token='') + + respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401)) + requests = [ + {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, + {'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'}, + ] + rq_client = client.request_queue(request_queue_id='whatever') + + response = rq_client.batch_add_requests(requests=requests) + assert response['unprocessedRequests'] == requests + + +@respx.mock +async def test_batch_processed_partially_sync() -> None: + client = ApifyClient(token='') + + respx.route(method='POST', host='api.apify.com').mock( + return_value=respx.MockResponse(200, content=_PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT) + ) + requests = [ + {'uniqueKey': 'http://example.com/1', 'url': 'http://example.com/1', 'method': 'GET'}, + {'uniqueKey': 'http://example.com/2', 'url': 'http://example.com/2', 'method': 'GET'}, + ] + rq_client = client.request_queue(request_queue_id='whatever') + + response = rq_client.batch_add_requests(requests=requests) + assert requests[0]['uniqueKey'] in {request['uniqueKey'] for request in response['processedRequests']} + assert response['unprocessedRequests'] == [requests[1]] From 4495516729ea612fc5d43c49aefcb39bc9138c61 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 24 Apr 2025 15:29:10 +0200 Subject: [PATCH 2/4] Remove try/except from the `batch_add_requests` --- .../clients/resource_clients/request_queue.py | 66 ++++++++----------- tests/unit/test_client_timeouts.py | 2 +- 2 files changed, 29 insertions(+), 39 deletions(-) diff --git a/src/apify_client/clients/resource_clients/request_queue.py b/src/apify_client/clients/resource_clients/request_queue.py index 4b77ab10..8b5e05a9 100644 --- a/src/apify_client/clients/resource_clients/request_queue.py +++ b/src/apify_client/clients/resource_clients/request_queue.py @@ -332,24 +332,20 @@ def batch_add_requests( for request in request_batch: unprocessed_requests[request['uniqueKey']] = _get_unprocessed_request_from_request(request) - try: - # Send the batch to the API. - response = self.http_client.call( - url=self._url('requests/batch'), - method='POST', - params=request_params, - json=list(request_batch), - timeout_secs=_MEDIUM_TIMEOUT, - ) - - response_parsed = parse_date_fields(pluck_data(response.json())) - processed_requests.extend(response_parsed.get('processedRequests', [])) + # Send the batch to the API. + response = self.http_client.call( + url=self._url('requests/batch'), + method='POST', + params=request_params, + json=list(request_batch), + timeout_secs=_MEDIUM_TIMEOUT, + ) - for processed_request in response_parsed.get('processedRequests', []): - unprocessed_requests.pop(processed_request['uniqueKey'], None) + response_parsed = parse_date_fields(pluck_data(response.json())) + processed_requests.extend(response_parsed.get('processedRequests', [])) - except Exception as exc: - logger.warning(f'Error occurred while processing a batch of requests: {exc}') + for processed_request in response_parsed.get('processedRequests', []): + unprocessed_requests.pop(processed_request['uniqueKey'], None) return { 'processedRequests': processed_requests, @@ -667,28 +663,22 @@ async def _batch_add_requests_worker( except asyncio.CancelledError: break - try: - # Send the batch to the API. - response = await self.http_client.call( - url=self._url('requests/batch'), - method='POST', - params=request_params, - json=list(request_batch), - timeout_secs=_MEDIUM_TIMEOUT, - ) - - response_parsed = parse_date_fields(pluck_data(response.json())) - processed_requests.extend(response_parsed.get('processedRequests', [])) - - for processed_request in response_parsed.get('processedRequests', []): - unprocessed_requests.pop(processed_request['uniqueKey'], None) - - except Exception as exc: - logger.warning(f'Error occurred while processing a batch of requests: {exc}') - - finally: - # Mark the batch as done whether it succeeded or failed. - queue.task_done() + # Send the batch to the API. + response = await self.http_client.call( + url=self._url('requests/batch'), + method='POST', + params=request_params, + json=list(request_batch), + timeout_secs=_MEDIUM_TIMEOUT, + ) + + response_parsed = parse_date_fields(pluck_data(response.json())) + processed_requests.extend(response_parsed.get('processedRequests', [])) + + for processed_request in response_parsed.get('processedRequests', []): + unprocessed_requests.pop(processed_request['uniqueKey'], None) + + queue.task_done() return { 'processedRequests': processed_requests, diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py index 82362644..b99aeee5 100644 --- a/tests/unit/test_client_timeouts.py +++ b/tests/unit/test_client_timeouts.py @@ -116,7 +116,7 @@ def assert_timeout(expected_timeout: int, request: Request) -> Response: (RequestQueueClient, 'delete_request', request_queue._SMALL_TIMEOUT, {'request_id': 123}), (RequestQueueClient, 'prolong_request_lock', request_queue._MEDIUM_TIMEOUT, {'request_id': 123, 'lock_secs': 1}), (RequestQueueClient, 'delete_request_lock', request_queue._SMALL_TIMEOUT, {'request_id': 123}), - (RequestQueueClient, 'batch_add_requests', request_queue._MEDIUM_TIMEOUT, {'requests': [{}]}), + (RequestQueueClient, 'batch_add_requests', request_queue._MEDIUM_TIMEOUT, {'requests': [{'uniqueKey': '123'}]}), (RequestQueueClient, 'batch_delete_requests', request_queue._SMALL_TIMEOUT, {'requests': [{}]}), (RequestQueueClient, 'list_requests', request_queue._MEDIUM_TIMEOUT, {}), ] From e411ca4811bca99224eb863e04ec715081356ed2 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Thu, 24 Apr 2025 15:58:56 +0200 Subject: [PATCH 3/4] Remove unnecessary complexity --- .../clients/resource_clients/request_queue.py | 57 +++++++------------ tests/unit/test_client_request_queue.py | 18 +++--- tests/unit/test_client_timeouts.py | 2 +- 3 files changed, 33 insertions(+), 44 deletions(-) diff --git a/src/apify_client/clients/resource_clients/request_queue.py b/src/apify_client/clients/resource_clients/request_queue.py index 8b5e05a9..297d7619 100644 --- a/src/apify_client/clients/resource_clients/request_queue.py +++ b/src/apify_client/clients/resource_clients/request_queue.py @@ -36,11 +36,6 @@ class BatchAddRequestsResult(TypedDict): unprocessedRequests: list[dict] -def _get_unprocessed_request_from_request(request: dict[str, str]) -> dict[str, str]: - relevant_keys = {'url', 'uniqueKey', 'method'} - return {key: value for key, value in request.items() if key in relevant_keys} - - class RequestQueueClient(ResourceClient): """Sub-client for manipulating a single request queue.""" @@ -323,14 +318,11 @@ def batch_add_requests( queue.put(batch) processed_requests = list[dict]() - unprocessed_requests = dict[str, dict]() + unprocessed_requests = list[dict]() # Process all batches in the queue sequentially. while not queue.empty(): request_batch = queue.get() - # All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response. - for request in request_batch: - unprocessed_requests[request['uniqueKey']] = _get_unprocessed_request_from_request(request) # Send the batch to the API. response = self.http_client.call( @@ -343,13 +335,11 @@ def batch_add_requests( response_parsed = parse_date_fields(pluck_data(response.json())) processed_requests.extend(response_parsed.get('processedRequests', [])) - - for processed_request in response_parsed.get('processedRequests', []): - unprocessed_requests.pop(processed_request['uniqueKey'], None) + unprocessed_requests.extend(response_parsed.get('unprocessedRequests', [])) return { 'processedRequests': processed_requests, - 'unprocessedRequests': list(unprocessed_requests.values()), + 'unprocessedRequests': unprocessed_requests, } def batch_delete_requests(self, requests: list[dict]) -> dict: @@ -650,39 +640,36 @@ async def _batch_add_requests_worker( Return result containing lists of processed and unprocessed requests by the worker. """ processed_requests = list[dict]() - unprocessed_requests = dict[str, dict]() + unprocessed_requests = list[dict]() while True: # Get the next batch from the queue. try: request_batch = await queue.get() - # All requests are considered unprocessed unless explicitly mentioned in `processedRequests` response. - for request in request_batch: - unprocessed_requests[request['uniqueKey']] = _get_unprocessed_request_from_request(request) - except asyncio.CancelledError: break - # Send the batch to the API. - response = await self.http_client.call( - url=self._url('requests/batch'), - method='POST', - params=request_params, - json=list(request_batch), - timeout_secs=_MEDIUM_TIMEOUT, - ) - - response_parsed = parse_date_fields(pluck_data(response.json())) - processed_requests.extend(response_parsed.get('processedRequests', [])) - - for processed_request in response_parsed.get('processedRequests', []): - unprocessed_requests.pop(processed_request['uniqueKey'], None) - - queue.task_done() + try: + # Send the batch to the API. + response = await self.http_client.call( + url=self._url('requests/batch'), + method='POST', + params=request_params, + json=list(request_batch), + timeout_secs=_MEDIUM_TIMEOUT, + ) + + response_parsed = parse_date_fields(pluck_data(response.json())) + processed_requests.extend(response_parsed.get('processedRequests', [])) + unprocessed_requests.extend(response_parsed.get('unprocessedRequests', [])) + + finally: + # Mark the batch as done whether it succeeded or failed. + queue.task_done() return { 'processedRequests': processed_requests, - 'unprocessedRequests': list(unprocessed_requests.values()), + 'unprocessedRequests': unprocessed_requests, } async def batch_add_requests( diff --git a/tests/unit/test_client_request_queue.py b/tests/unit/test_client_request_queue.py index ed433754..8e339305 100644 --- a/tests/unit/test_client_request_queue.py +++ b/tests/unit/test_client_request_queue.py @@ -1,5 +1,7 @@ +import pytest import respx +import apify_client from apify_client import ApifyClient, ApifyClientAsync _PARTIALLY_ADDED_BATCH_RESPONSE_CONTENT = """{ @@ -24,8 +26,8 @@ @respx.mock -async def test_batch_not_processed_due_to_exception_async() -> None: - """Test that all requests are unprocessed unless explicitly stated by the server that they have been processed.""" +async def test_batch_not_processed_raises_exception_async() -> None: + """Test that client exceptions are not silently ignored""" client = ApifyClientAsync(token='') respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401)) @@ -35,8 +37,8 @@ async def test_batch_not_processed_due_to_exception_async() -> None: ] rq_client = client.request_queue(request_queue_id='whatever') - response = await rq_client.batch_add_requests(requests=requests) - assert response['unprocessedRequests'] == requests + with pytest.raises(apify_client._errors.ApifyApiError): + await rq_client.batch_add_requests(requests=requests) @respx.mock @@ -58,8 +60,8 @@ async def test_batch_processed_partially_async() -> None: @respx.mock -def test_batch_not_processed_due_to_exception_sync() -> None: - """Test that all requests are unprocessed unless explicitly stated by the server that they have been processed.""" +def test_batch_not_processed_raises_exception_sync() -> None: + """Test that client exceptions are not silently ignored""" client = ApifyClient(token='') respx.route(method='POST', host='api.apify.com').mock(return_value=respx.MockResponse(401)) @@ -69,8 +71,8 @@ def test_batch_not_processed_due_to_exception_sync() -> None: ] rq_client = client.request_queue(request_queue_id='whatever') - response = rq_client.batch_add_requests(requests=requests) - assert response['unprocessedRequests'] == requests + with pytest.raises(apify_client._errors.ApifyApiError): + rq_client.batch_add_requests(requests=requests) @respx.mock diff --git a/tests/unit/test_client_timeouts.py b/tests/unit/test_client_timeouts.py index b99aeee5..82362644 100644 --- a/tests/unit/test_client_timeouts.py +++ b/tests/unit/test_client_timeouts.py @@ -116,7 +116,7 @@ def assert_timeout(expected_timeout: int, request: Request) -> Response: (RequestQueueClient, 'delete_request', request_queue._SMALL_TIMEOUT, {'request_id': 123}), (RequestQueueClient, 'prolong_request_lock', request_queue._MEDIUM_TIMEOUT, {'request_id': 123, 'lock_secs': 1}), (RequestQueueClient, 'delete_request_lock', request_queue._SMALL_TIMEOUT, {'request_id': 123}), - (RequestQueueClient, 'batch_add_requests', request_queue._MEDIUM_TIMEOUT, {'requests': [{'uniqueKey': '123'}]}), + (RequestQueueClient, 'batch_add_requests', request_queue._MEDIUM_TIMEOUT, {'requests': [{}]}), (RequestQueueClient, 'batch_delete_requests', request_queue._SMALL_TIMEOUT, {'requests': [{}]}), (RequestQueueClient, 'list_requests', request_queue._MEDIUM_TIMEOUT, {}), ] From 717d399e00227000d2783888534a08d5ebb499d6 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 25 Apr 2025 14:54:28 +0200 Subject: [PATCH 4/4] Log deprecation warning messages when using deprecated arguments --- .../clients/resource_clients/request_queue.py | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/apify_client/clients/resource_clients/request_queue.py b/src/apify_client/clients/resource_clients/request_queue.py index 297d7619..75d74d5c 100644 --- a/src/apify_client/clients/resource_clients/request_queue.py +++ b/src/apify_client/clients/resource_clients/request_queue.py @@ -5,7 +5,7 @@ import math from collections.abc import Iterable from queue import Queue -from typing import Any, TypedDict +from typing import TYPE_CHECKING, Any, TypedDict from apify_shared.utils import filter_out_none_values_recursively, ignore_docs, parse_date_fields from more_itertools import constrained_batches @@ -14,6 +14,9 @@ from apify_client._utils import catch_not_found_or_throw, pluck_data from apify_client.clients.base import ResourceClient, ResourceClientAsync +if TYPE_CHECKING: + from datetime import timedelta + logger = logging.getLogger(__name__) _RQ_MAX_REQUESTS_PER_BATCH = 25 @@ -279,6 +282,8 @@ def batch_add_requests( *, forefront: bool = False, max_parallel: int = 1, + max_unprocessed_requests_retries: int | None = None, + min_delay_between_unprocessed_requests_retries: timedelta | None = None, ) -> BatchAddRequestsResult: """Add requests to the request queue in batches. @@ -292,10 +297,17 @@ def batch_add_requests( max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable to the async client. For the sync client, this value must be set to 1, as parallel execution is not supported. + max_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release. + min_delay_between_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release. Returns: Result containing lists of processed and unprocessed requests. """ + if max_unprocessed_requests_retries: + logger.warning('`max_unprocessed_requests_retries` is deprecated and not used anymore.') + if min_delay_between_unprocessed_requests_retries: + logger.warning('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.') + if max_parallel != 1: raise NotImplementedError('max_parallel is only supported in async client') @@ -678,6 +690,8 @@ async def batch_add_requests( *, forefront: bool = False, max_parallel: int = 5, + max_unprocessed_requests_retries: int | None = None, + min_delay_between_unprocessed_requests_retries: timedelta | None = None, ) -> BatchAddRequestsResult: """Add requests to the request queue in batches. @@ -691,10 +705,17 @@ async def batch_add_requests( max_parallel: Specifies the maximum number of parallel tasks for API calls. This is only applicable to the async client. For the sync client, this value must be set to 1, as parallel execution is not supported. + max_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release. + min_delay_between_unprocessed_requests_retries: Deprecated argument. Will be removed in next major release. Returns: Result containing lists of processed and unprocessed requests. """ + if max_unprocessed_requests_retries: + logger.warning('`max_unprocessed_requests_retries` is deprecated and not used anymore.') + if min_delay_between_unprocessed_requests_retries: + logger.warning('`min_delay_between_unprocessed_requests_retries` is deprecated and not used anymore.') + tasks = set[asyncio.Task]() queue: asyncio.Queue[Iterable[dict]] = asyncio.Queue() request_params = self._params(clientKey=self.client_key, forefront=forefront)