From 253041f79cdd0c2027a082fea70a816de743a868 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 11 Dec 2024 12:34:25 +0100 Subject: [PATCH 1/7] Add possibility to stop crawler. Add test. TODO: Document this and create example. --- src/crawlee/basic_crawler/_basic_crawler.py | 46 +++++++++++++------ .../unit/basic_crawler/test_basic_crawler.py | 27 ++++++++++- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/src/crawlee/basic_crawler/_basic_crawler.py b/src/crawlee/basic_crawler/_basic_crawler.py index aeec0031e9..9d5f96f953 100644 --- a/src/crawlee/basic_crawler/_basic_crawler.py +++ b/src/crawlee/basic_crawler/_basic_crawler.py @@ -303,6 +303,9 @@ def __init__( self._failed = False self._abort_on_error = abort_on_error + self._unexpected_stop: bool = False + self._unexpected_stop_reason = '' + @property def log(self) -> logging.Logger: """The logger used by the crawler.""" @@ -328,13 +331,26 @@ def statistics(self) -> Statistics[StatisticsState]: """Statistics about the current (or last) crawler run.""" return self._statistics - @property - def _max_requests_count_exceeded(self) -> bool: - """Whether the maximum number of requests to crawl has been reached.""" + def stop(self, reason: str = 'Stop was called externally.') -> None: + """Set flag to stop crawler. + + This stops current crawler run regardless of whether all requests were finished. + + Args: + reason: Reason for stopping that will be used in logs. + """ + self._unexpected_stop_reason = reason + self._unexpected_stop = True + + def _stop_if_max_requests_count_exceeded(self) -> None: + """Call `stop` when the maximum number of requests to crawl has been reached.""" if self._max_requests_per_crawl is None: - return False + return - return self._statistics.state.requests_finished >= self._max_requests_per_crawl + if self._statistics.state.requests_finished >= self._max_requests_per_crawl: + self.stop( + reason=f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. ' + ) async def _get_session(self) -> Session | None: """If session pool is being used, try to take a session from it.""" @@ -912,27 +928,29 @@ async def _commit_request_handler_result( await store.set_value(key, value.content, value.content_type) async def __is_finished_function(self) -> bool: - request_provider = await self.get_request_provider() - is_finished = await request_provider.is_finished() - - if self._max_requests_count_exceeded: + self._stop_if_max_requests_count_exceeded() + if self._unexpected_stop: self._logger.info( - f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. ' - f'All ongoing requests have now completed. Total requests processed: ' + f'Crawler `stop` method was called with reason: {self._unexpected_stop_reason}' + f'All ongoing requests have now completed. Total requests processed:' f'{self._statistics.state.requests_finished}. The crawler will now shut down.' ) return True + request_provider = await self.get_request_provider() + is_finished = await request_provider.is_finished() + if self._abort_on_error and self._failed: return True return is_finished async def __is_task_ready_function(self) -> bool: - if self._max_requests_count_exceeded: + self._stop_if_max_requests_count_exceeded() + if self._unexpected_stop: self._logger.info( - f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. ' - f'The crawler will soon shut down. Ongoing requests will be allowed to complete.' + f'No new requests are allowed because crawler `stop` method was called with reason: ' + f'{self._unexpected_stop_reason}. Ongoing requests will be allowed to complete. ' ) return False diff --git a/tests/unit/basic_crawler/test_basic_crawler.py b/tests/unit/basic_crawler/test_basic_crawler.py index 9e3768064e..1f4e0b659a 100644 --- a/tests/unit/basic_crawler/test_basic_crawler.py +++ b/tests/unit/basic_crawler/test_basic_crawler.py @@ -4,10 +4,10 @@ import asyncio import json import logging +import os from collections import Counter from dataclasses import dataclass from datetime import timedelta -import os from pathlib import Path from typing import TYPE_CHECKING, Any from unittest.mock import AsyncMock, Mock @@ -941,3 +941,28 @@ async def handler(context: BasicCrawlingContext) -> None: '│ crawler_runtime │ 300.0 │', '└───────────────────────────────┴───────────┘', ] + + +async def test_crawler_manual_stop(httpbin: URL) -> None: + start_urls = [ + str(httpbin / '1'), + str(httpbin / '2'), + str(httpbin / '3'), + ] + processed_urls = [] + + # Set max_concurrency to 1 to ensure testing urls are visited one by one in order. + crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=1)) + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + processed_urls.append(context.request.url) + if context.request.url == start_urls[1]: + crawler.stop() + + stats = await crawler.run(start_urls) + + # Verify that only 2 out of the 3 provided URLs were made + assert len(processed_urls) == 2 + assert stats.requests_total == 2 + assert stats.requests_finished == 2 From c0d3090078431ed9d5c835bc5df450bfeabaf17e Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 11 Dec 2024 14:06:09 +0100 Subject: [PATCH 2/7] Add docs and example how to use crawler.stop() --- .../code/beautifulsoup_crawler_stop.py | 39 +++++++++++++++++++ docs/examples/crawler_stop.mdx | 15 +++++++ src/crawlee/basic_crawler/_basic_crawler.py | 2 +- 3 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 docs/examples/code/beautifulsoup_crawler_stop.py create mode 100644 docs/examples/crawler_stop.mdx diff --git a/docs/examples/code/beautifulsoup_crawler_stop.py b/docs/examples/code/beautifulsoup_crawler_stop.py new file mode 100644 index 0000000000..e85ad2159f --- /dev/null +++ b/docs/examples/code/beautifulsoup_crawler_stop.py @@ -0,0 +1,39 @@ +import asyncio + +from crawlee.beautifulsoup_crawler import BeautifulSoupCrawler, BeautifulSoupCrawlingContext + + +async def main() -> None: + # Create an instance of the BeautifulSoupCrawler class, a crawler that automatically + # loads the URLs and parses their HTML using the BeautifulSoup library. + crawler = BeautifulSoupCrawler() + + # Define the default request handler, which will be called for every request. + # The handler receives a context parameter, providing various properties and + # helper methods. Here are a few key ones we use for demonstration: + # - request: an instance of the Request class containing details such as the URL + # being crawled and the HTTP method used. + # - soup: the BeautifulSoup object containing the parsed HTML of the response. + @crawler.router.default_handler + async def request_handler(context: BeautifulSoupCrawlingContext) -> None: + context.log.info(f'Processing {context.request.url} ...') + + # Create custom condition to stop crawler once it finds what it is looking for. + if 'crawlee' in context.request.url: + crawler.stop(reason='Manual stop of crawler after finding `crawlee` in the url.') + + # Extract data from the page. + data = { + 'url': context.request.url, + } + + # Push the extracted data to the default dataset. In local configuration, + # the data will be stored as JSON files in ./storage/datasets/default. + await context.push_data(data) + + # Run the crawler with the initial list of URLs. + await crawler.run(['https://crawlee.dev']) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/docs/examples/crawler_stop.mdx b/docs/examples/crawler_stop.mdx new file mode 100644 index 0000000000..3dcd8661d9 --- /dev/null +++ b/docs/examples/crawler_stop.mdx @@ -0,0 +1,15 @@ +--- +id: crawler-stop +title: Crawler stop +--- + +import ApiLink from '@site/src/components/ApiLink'; +import CodeBlock from '@theme/CodeBlock'; + +import BeautifulSoupExample from '!!raw-loader!./code/beautifulsoup_crawler_stop.py'; + +This example demonstrates how to use `stop` method of `BasicCrawler` to stop crawler once the crawler finds what it is looking for. This method is available to all crawlers that inherit from `BasicCrawler` and in the example below it is shown on `BeautifulSoupCrawler`. Simply call `crawler.stop()` to stop the crawler. It will not continue to crawl through new requests. Requests that are already being concurrently processed are going to get finished. It is possible to call `stop` method with optional argument `reason` that is a string that will be used in logs and it can improve logs readability especially if you have multiple different conditions for triggering `stop`. + + + {BeautifulSoupExample} + diff --git a/src/crawlee/basic_crawler/_basic_crawler.py b/src/crawlee/basic_crawler/_basic_crawler.py index 9d5f96f953..add458a873 100644 --- a/src/crawlee/basic_crawler/_basic_crawler.py +++ b/src/crawlee/basic_crawler/_basic_crawler.py @@ -932,7 +932,7 @@ async def __is_finished_function(self) -> bool: if self._unexpected_stop: self._logger.info( f'Crawler `stop` method was called with reason: {self._unexpected_stop_reason}' - f'All ongoing requests have now completed. Total requests processed:' + f'All ongoing requests have now completed. Total requests processed: ' f'{self._statistics.state.requests_finished}. The crawler will now shut down.' ) return True From b364e82b47ff4b216869a33989111ffa40ee96a1 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Fri, 13 Dec 2024 09:55:45 +0100 Subject: [PATCH 3/7] Add extra test for shutdown with ongoing requets. Imporve logs. --- src/crawlee/basic_crawler/_basic_crawler.py | 11 +++---- .../unit/basic_crawler/test_basic_crawler.py | 33 +++++++++++++++++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/src/crawlee/basic_crawler/_basic_crawler.py b/src/crawlee/basic_crawler/_basic_crawler.py index add458a873..fb3bac0759 100644 --- a/src/crawlee/basic_crawler/_basic_crawler.py +++ b/src/crawlee/basic_crawler/_basic_crawler.py @@ -339,6 +339,7 @@ def stop(self, reason: str = 'Stop was called externally.') -> None: Args: reason: Reason for stopping that will be used in logs. """ + self._logger.info(f'Crawler.stop() was called with following reason: {reason}.') self._unexpected_stop_reason = reason self._unexpected_stop = True @@ -930,11 +931,7 @@ async def _commit_request_handler_result( async def __is_finished_function(self) -> bool: self._stop_if_max_requests_count_exceeded() if self._unexpected_stop: - self._logger.info( - f'Crawler `stop` method was called with reason: {self._unexpected_stop_reason}' - f'All ongoing requests have now completed. Total requests processed: ' - f'{self._statistics.state.requests_finished}. The crawler will now shut down.' - ) + self._logger.info('The crawler will finish any remaining ongoing requests and shut down.') return True request_provider = await self.get_request_provider() @@ -949,8 +946,8 @@ async def __is_task_ready_function(self) -> bool: self._stop_if_max_requests_count_exceeded() if self._unexpected_stop: self._logger.info( - f'No new requests are allowed because crawler `stop` method was called with reason: ' - f'{self._unexpected_stop_reason}. Ongoing requests will be allowed to complete. ' + 'No new requests are allowed because crawler `stop` method was called. ' + 'Ongoing requests will be allowed to complete. ' ) return False diff --git a/tests/unit/basic_crawler/test_basic_crawler.py b/tests/unit/basic_crawler/test_basic_crawler.py index 1f4e0b659a..3800d94f48 100644 --- a/tests/unit/basic_crawler/test_basic_crawler.py +++ b/tests/unit/basic_crawler/test_basic_crawler.py @@ -944,6 +944,7 @@ async def handler(context: BasicCrawlingContext) -> None: async def test_crawler_manual_stop(httpbin: URL) -> None: + """Test that no new requests are handled after crawler.stop() is called.""" start_urls = [ str(httpbin / '1'), str(httpbin / '2'), @@ -966,3 +967,35 @@ async def handler(context: BasicCrawlingContext) -> None: assert len(processed_urls) == 2 assert stats.requests_total == 2 assert stats.requests_finished == 2 + + +async def test_crawler_multiple_stops_in_parallel(httpbin: URL) -> None: + """Test that no new requests are handled after crawler.stop() is called, but ongoing requests can still finish.""" + start_urls = [ + str(httpbin / '1'), + str(httpbin / '2'), + str(httpbin / '3'), + ] + processed_urls = [] + + # Set max_concurrency to 2 to ensure two urls are being visited in parallel. + crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=2)) + + sleep_time_generator = iter([0, 0.1]) + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + processed_urls.append(context.request.url) + + # This sleep ensures that first request is processed quickly and triggers stop() almost immediately. + # Second request will have some sleep time to make sure it is still being processed after crawler.stop() was + # called from the first request and so the crawler is already shutting down. + await asyncio.sleep(next(sleep_time_generator)) + crawler.stop(reason=f'Stop called on {context.request.url}') + + stats = await crawler.run(start_urls) + + # Verify that only 2 out of the 3 provided URLs were made + assert len(processed_urls) == 2 + assert stats.requests_total == 2 + assert stats.requests_finished == 2 From 2078eade10005a9ca2aa18866851d35a64dae3f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20Proch=C3=A1zka?= Date: Tue, 17 Dec 2024 08:01:35 +0100 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: Vlada Dusek --- src/crawlee/basic_crawler/_basic_crawler.py | 2 +- tests/unit/basic_crawler/test_basic_crawler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crawlee/basic_crawler/_basic_crawler.py b/src/crawlee/basic_crawler/_basic_crawler.py index fb3bac0759..ca1b6d0c4d 100644 --- a/src/crawlee/basic_crawler/_basic_crawler.py +++ b/src/crawlee/basic_crawler/_basic_crawler.py @@ -947,7 +947,7 @@ async def __is_task_ready_function(self) -> bool: if self._unexpected_stop: self._logger.info( 'No new requests are allowed because crawler `stop` method was called. ' - 'Ongoing requests will be allowed to complete. ' + 'Ongoing requests will be allowed to complete.' ) return False diff --git a/tests/unit/basic_crawler/test_basic_crawler.py b/tests/unit/basic_crawler/test_basic_crawler.py index 3800d94f48..be0ac178ae 100644 --- a/tests/unit/basic_crawler/test_basic_crawler.py +++ b/tests/unit/basic_crawler/test_basic_crawler.py @@ -991,7 +991,7 @@ async def handler(context: BasicCrawlingContext) -> None: # Second request will have some sleep time to make sure it is still being processed after crawler.stop() was # called from the first request and so the crawler is already shutting down. await asyncio.sleep(next(sleep_time_generator)) - crawler.stop(reason=f'Stop called on {context.request.url}') + crawler.stop(reason=f'Stop called on {context.request.url}') stats = await crawler.run(start_urls) From 6a99031b55e08ae721b2653603d6b453f8d93431 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 17 Dec 2024 08:03:41 +0100 Subject: [PATCH 5/7] Implicit type --- src/crawlee/basic_crawler/_basic_crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/crawlee/basic_crawler/_basic_crawler.py b/src/crawlee/basic_crawler/_basic_crawler.py index ca1b6d0c4d..18efc28cf0 100644 --- a/src/crawlee/basic_crawler/_basic_crawler.py +++ b/src/crawlee/basic_crawler/_basic_crawler.py @@ -303,7 +303,7 @@ def __init__( self._failed = False self._abort_on_error = abort_on_error - self._unexpected_stop: bool = False + self._unexpected_stop = False self._unexpected_stop_reason = '' @property From da6ef6e672c24c61971f6fca1c78c7154e8778bd Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Tue, 17 Dec 2024 08:06:52 +0100 Subject: [PATCH 6/7] Remove self._unexpected_stop_reason --- src/crawlee/basic_crawler/_basic_crawler.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/crawlee/basic_crawler/_basic_crawler.py b/src/crawlee/basic_crawler/_basic_crawler.py index 18efc28cf0..e3bd1632ac 100644 --- a/src/crawlee/basic_crawler/_basic_crawler.py +++ b/src/crawlee/basic_crawler/_basic_crawler.py @@ -304,7 +304,6 @@ def __init__( self._abort_on_error = abort_on_error self._unexpected_stop = False - self._unexpected_stop_reason = '' @property def log(self) -> logging.Logger: @@ -340,7 +339,6 @@ def stop(self, reason: str = 'Stop was called externally.') -> None: reason: Reason for stopping that will be used in logs. """ self._logger.info(f'Crawler.stop() was called with following reason: {reason}.') - self._unexpected_stop_reason = reason self._unexpected_stop = True def _stop_if_max_requests_count_exceeded(self) -> None: From ab467442e97f98b0c8ad058071875d07fd57b78f Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 18 Dec 2024 08:00:51 +0100 Subject: [PATCH 7/7] Update example title - review comment --- docs/examples/crawler_stop.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/crawler_stop.mdx b/docs/examples/crawler_stop.mdx index 3dcd8661d9..cf23ee54d2 100644 --- a/docs/examples/crawler_stop.mdx +++ b/docs/examples/crawler_stop.mdx @@ -1,6 +1,6 @@ --- id: crawler-stop -title: Crawler stop +title: Stopping a Crawler with stop method --- import ApiLink from '@site/src/components/ApiLink';