diff --git a/docs/examples/code/beautifulsoup_crawler_stop.py b/docs/examples/code/beautifulsoup_crawler_stop.py new file mode 100644 index 0000000000..e85ad2159f --- /dev/null +++ b/docs/examples/code/beautifulsoup_crawler_stop.py @@ -0,0 +1,39 @@ +import asyncio + +from crawlee.beautifulsoup_crawler import BeautifulSoupCrawler, BeautifulSoupCrawlingContext + + +async def main() -> None: + # Create an instance of the BeautifulSoupCrawler class, a crawler that automatically + # loads the URLs and parses their HTML using the BeautifulSoup library. + crawler = BeautifulSoupCrawler() + + # Define the default request handler, which will be called for every request. + # The handler receives a context parameter, providing various properties and + # helper methods. Here are a few key ones we use for demonstration: + # - request: an instance of the Request class containing details such as the URL + # being crawled and the HTTP method used. + # - soup: the BeautifulSoup object containing the parsed HTML of the response. + @crawler.router.default_handler + async def request_handler(context: BeautifulSoupCrawlingContext) -> None: + context.log.info(f'Processing {context.request.url} ...') + + # Create custom condition to stop crawler once it finds what it is looking for. + if 'crawlee' in context.request.url: + crawler.stop(reason='Manual stop of crawler after finding `crawlee` in the url.') + + # Extract data from the page. + data = { + 'url': context.request.url, + } + + # Push the extracted data to the default dataset. In local configuration, + # the data will be stored as JSON files in ./storage/datasets/default. + await context.push_data(data) + + # Run the crawler with the initial list of URLs. + await crawler.run(['https://crawlee.dev']) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/docs/examples/crawler_stop.mdx b/docs/examples/crawler_stop.mdx new file mode 100644 index 0000000000..cf23ee54d2 --- /dev/null +++ b/docs/examples/crawler_stop.mdx @@ -0,0 +1,15 @@ +--- +id: crawler-stop +title: Stopping a Crawler with stop method +--- + +import ApiLink from '@site/src/components/ApiLink'; +import CodeBlock from '@theme/CodeBlock'; + +import BeautifulSoupExample from '!!raw-loader!./code/beautifulsoup_crawler_stop.py'; + +This example demonstrates how to use `stop` method of `BasicCrawler` to stop crawler once the crawler finds what it is looking for. This method is available to all crawlers that inherit from `BasicCrawler` and in the example below it is shown on `BeautifulSoupCrawler`. Simply call `crawler.stop()` to stop the crawler. It will not continue to crawl through new requests. Requests that are already being concurrently processed are going to get finished. It is possible to call `stop` method with optional argument `reason` that is a string that will be used in logs and it can improve logs readability especially if you have multiple different conditions for triggering `stop`. + + + {BeautifulSoupExample} + diff --git a/src/crawlee/basic_crawler/_basic_crawler.py b/src/crawlee/basic_crawler/_basic_crawler.py index aeec0031e9..e3bd1632ac 100644 --- a/src/crawlee/basic_crawler/_basic_crawler.py +++ b/src/crawlee/basic_crawler/_basic_crawler.py @@ -303,6 +303,8 @@ def __init__( self._failed = False self._abort_on_error = abort_on_error + self._unexpected_stop = False + @property def log(self) -> logging.Logger: """The logger used by the crawler.""" @@ -328,13 +330,26 @@ def statistics(self) -> Statistics[StatisticsState]: """Statistics about the current (or last) crawler run.""" return self._statistics - @property - def _max_requests_count_exceeded(self) -> bool: - """Whether the maximum number of requests to crawl has been reached.""" + def stop(self, reason: str = 'Stop was called externally.') -> None: + """Set flag to stop crawler. + + This stops current crawler run regardless of whether all requests were finished. + + Args: + reason: Reason for stopping that will be used in logs. + """ + self._logger.info(f'Crawler.stop() was called with following reason: {reason}.') + self._unexpected_stop = True + + def _stop_if_max_requests_count_exceeded(self) -> None: + """Call `stop` when the maximum number of requests to crawl has been reached.""" if self._max_requests_per_crawl is None: - return False + return - return self._statistics.state.requests_finished >= self._max_requests_per_crawl + if self._statistics.state.requests_finished >= self._max_requests_per_crawl: + self.stop( + reason=f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. ' + ) async def _get_session(self) -> Session | None: """If session pool is being used, try to take a session from it.""" @@ -912,27 +927,25 @@ async def _commit_request_handler_result( await store.set_value(key, value.content, value.content_type) async def __is_finished_function(self) -> bool: + self._stop_if_max_requests_count_exceeded() + if self._unexpected_stop: + self._logger.info('The crawler will finish any remaining ongoing requests and shut down.') + return True + request_provider = await self.get_request_provider() is_finished = await request_provider.is_finished() - if self._max_requests_count_exceeded: - self._logger.info( - f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. ' - f'All ongoing requests have now completed. Total requests processed: ' - f'{self._statistics.state.requests_finished}. The crawler will now shut down.' - ) - return True - if self._abort_on_error and self._failed: return True return is_finished async def __is_task_ready_function(self) -> bool: - if self._max_requests_count_exceeded: + self._stop_if_max_requests_count_exceeded() + if self._unexpected_stop: self._logger.info( - f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. ' - f'The crawler will soon shut down. Ongoing requests will be allowed to complete.' + 'No new requests are allowed because crawler `stop` method was called. ' + 'Ongoing requests will be allowed to complete.' ) return False diff --git a/tests/unit/basic_crawler/test_basic_crawler.py b/tests/unit/basic_crawler/test_basic_crawler.py index 9e3768064e..be0ac178ae 100644 --- a/tests/unit/basic_crawler/test_basic_crawler.py +++ b/tests/unit/basic_crawler/test_basic_crawler.py @@ -4,10 +4,10 @@ import asyncio import json import logging +import os from collections import Counter from dataclasses import dataclass from datetime import timedelta -import os from pathlib import Path from typing import TYPE_CHECKING, Any from unittest.mock import AsyncMock, Mock @@ -941,3 +941,61 @@ async def handler(context: BasicCrawlingContext) -> None: '│ crawler_runtime │ 300.0 │', '└───────────────────────────────┴───────────┘', ] + + +async def test_crawler_manual_stop(httpbin: URL) -> None: + """Test that no new requests are handled after crawler.stop() is called.""" + start_urls = [ + str(httpbin / '1'), + str(httpbin / '2'), + str(httpbin / '3'), + ] + processed_urls = [] + + # Set max_concurrency to 1 to ensure testing urls are visited one by one in order. + crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=1)) + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + processed_urls.append(context.request.url) + if context.request.url == start_urls[1]: + crawler.stop() + + stats = await crawler.run(start_urls) + + # Verify that only 2 out of the 3 provided URLs were made + assert len(processed_urls) == 2 + assert stats.requests_total == 2 + assert stats.requests_finished == 2 + + +async def test_crawler_multiple_stops_in_parallel(httpbin: URL) -> None: + """Test that no new requests are handled after crawler.stop() is called, but ongoing requests can still finish.""" + start_urls = [ + str(httpbin / '1'), + str(httpbin / '2'), + str(httpbin / '3'), + ] + processed_urls = [] + + # Set max_concurrency to 2 to ensure two urls are being visited in parallel. + crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=2)) + + sleep_time_generator = iter([0, 0.1]) + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + processed_urls.append(context.request.url) + + # This sleep ensures that first request is processed quickly and triggers stop() almost immediately. + # Second request will have some sleep time to make sure it is still being processed after crawler.stop() was + # called from the first request and so the crawler is already shutting down. + await asyncio.sleep(next(sleep_time_generator)) + crawler.stop(reason=f'Stop called on {context.request.url}') + + stats = await crawler.run(start_urls) + + # Verify that only 2 out of the 3 provided URLs were made + assert len(processed_urls) == 2 + assert stats.requests_total == 2 + assert stats.requests_finished == 2