Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/examples/code/beautifulsoup_crawler_stop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio

from crawlee.beautifulsoup_crawler import BeautifulSoupCrawler, BeautifulSoupCrawlingContext


async def main() -> None:
# Create an instance of the BeautifulSoupCrawler class, a crawler that automatically
# loads the URLs and parses their HTML using the BeautifulSoup library.
crawler = BeautifulSoupCrawler()

# Define the default request handler, which will be called for every request.
# The handler receives a context parameter, providing various properties and
# helper methods. Here are a few key ones we use for demonstration:
# - request: an instance of the Request class containing details such as the URL
# being crawled and the HTTP method used.
# - soup: the BeautifulSoup object containing the parsed HTML of the response.
@crawler.router.default_handler
async def request_handler(context: BeautifulSoupCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')

# Create custom condition to stop crawler once it finds what it is looking for.
if 'crawlee' in context.request.url:
crawler.stop(reason='Manual stop of crawler after finding `crawlee` in the url.')

# Extract data from the page.
data = {
'url': context.request.url,
}

# Push the extracted data to the default dataset. In local configuration,
# the data will be stored as JSON files in ./storage/datasets/default.
await context.push_data(data)

# Run the crawler with the initial list of URLs.
await crawler.run(['https://crawlee.dev'])


if __name__ == '__main__':
asyncio.run(main())
15 changes: 15 additions & 0 deletions docs/examples/crawler_stop.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
id: crawler-stop
title: Stopping a Crawler with stop method
---

import ApiLink from '@site/src/components/ApiLink';
import CodeBlock from '@theme/CodeBlock';

import BeautifulSoupExample from '!!raw-loader!./code/beautifulsoup_crawler_stop.py';

This example demonstrates how to use `stop` method of <ApiLink to="class/BasicCrawler">`BasicCrawler`</ApiLink> to stop crawler once the crawler finds what it is looking for. This method is available to all crawlers that inherit from <ApiLink to="class/BasicCrawler">`BasicCrawler`</ApiLink> and in the example below it is shown on <ApiLink to="class/BeautifulSoupCrawler">`BeautifulSoupCrawler`</ApiLink>. Simply call `crawler.stop()` to stop the crawler. It will not continue to crawl through new requests. Requests that are already being concurrently processed are going to get finished. It is possible to call `stop` method with optional argument `reason` that is a string that will be used in logs and it can improve logs readability especially if you have multiple different conditions for triggering `stop`.

<CodeBlock className="language-python">
{BeautifulSoupExample}
</CodeBlock>
45 changes: 29 additions & 16 deletions src/crawlee/basic_crawler/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ def __init__(
self._failed = False
self._abort_on_error = abort_on_error

self._unexpected_stop = False

@property
def log(self) -> logging.Logger:
"""The logger used by the crawler."""
Expand All @@ -328,13 +330,26 @@ def statistics(self) -> Statistics[StatisticsState]:
"""Statistics about the current (or last) crawler run."""
return self._statistics

@property
def _max_requests_count_exceeded(self) -> bool:
"""Whether the maximum number of requests to crawl has been reached."""
def stop(self, reason: str = 'Stop was called externally.') -> None:
"""Set flag to stop crawler.

This stops current crawler run regardless of whether all requests were finished.

Args:
reason: Reason for stopping that will be used in logs.
"""
self._logger.info(f'Crawler.stop() was called with following reason: {reason}.')
self._unexpected_stop = True

def _stop_if_max_requests_count_exceeded(self) -> None:
"""Call `stop` when the maximum number of requests to crawl has been reached."""
if self._max_requests_per_crawl is None:
return False
return

return self._statistics.state.requests_finished >= self._max_requests_per_crawl
if self._statistics.state.requests_finished >= self._max_requests_per_crawl:
self.stop(
reason=f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. '
)

async def _get_session(self) -> Session | None:
"""If session pool is being used, try to take a session from it."""
Expand Down Expand Up @@ -912,27 +927,25 @@ async def _commit_request_handler_result(
await store.set_value(key, value.content, value.content_type)

async def __is_finished_function(self) -> bool:
self._stop_if_max_requests_count_exceeded()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I understand the logic, but I don't like the names:
_is_finished calls _stop_if -> IMO "property getter" is stopping the crawler (by reading the names of the methods)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be ugly name but it is as explicit as it can get. This is internal name so it is no big deal to change. Do you have preferred naming?

Copy link

@MatousMarik MatousMarik Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my idea is a bit different, but you don't have to consider it at all.

I will try to explain it:
I don't like the logic that previously if you wanted to know if you were stopped you checked all the relevant flags and properties, mainly _max_requests_count_exceeded.
Now you are adding a new flag _unexpected_stop. So why don't you just check as before + _unexpected_stop. Why do you have to add call called _stop_if_something before each of those checks that would do the same thing as accessing the property _max_requests_count_exceeded and then checking only for "unexpected".
As I see it you had 1 flag and wanted to add a second one, that is a different one but is used in the same decision. So instead of checking them both you just decided you rename flag 1 and set it also when you would set the flag 2.

I would say you did something like this:

i_am_dirty is flag 1

do_i_want_to_take_a_shower = decision: 
1. return i_am_dirty

now adding flag 2 = i_am_hot
the process would be =>
rename flag 1 to i_want_to_take_a_shower # that is always set when i_am_hot would be and

do_i_want_to_take_a_shower would change to:
1. if i_am_dirty_check: i_want_to_take_a_shower = true
2. return i_want_to_take_a_shower # this makes sense, because were setting it also whenever we would set i_am_hot

Proposal:

  • keep original property _max_requests_count_exceeded
  • and also check for _unexpected_stop
  • if you don't want to check both of them in each case you are deciding whether your current _unexpected_stop flag is set (while currently you are kind of doing that by 1. calling _stop_if 2. checking _unexpected_stop), you can create new single point of truth like _should_stop_flag, that would act as property that would check both _max_requests_count_exceeded and _unexpected_stop and possibly some other in the future

In the example:

do_i_want_to_take_a_shower:
1. return i_am_dirty or i_am_hot

Sorry for this useless [might not even fit in nitpick category]...

if self._unexpected_stop:
self._logger.info('The crawler will finish any remaining ongoing requests and shut down.')
return True

request_provider = await self.get_request_provider()
is_finished = await request_provider.is_finished()

if self._max_requests_count_exceeded:
self._logger.info(
f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. '
f'All ongoing requests have now completed. Total requests processed: '
f'{self._statistics.state.requests_finished}. The crawler will now shut down.'
)
return True

if self._abort_on_error and self._failed:
return True

return is_finished

async def __is_task_ready_function(self) -> bool:
if self._max_requests_count_exceeded:
self._stop_if_max_requests_count_exceeded()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

if self._unexpected_stop:
self._logger.info(
f'The crawler has reached its limit of {self._max_requests_per_crawl} requests per crawl. '
f'The crawler will soon shut down. Ongoing requests will be allowed to complete.'
'No new requests are allowed because crawler `stop` method was called. '
'Ongoing requests will be allowed to complete.'
)
return False

Expand Down
60 changes: 59 additions & 1 deletion tests/unit/basic_crawler/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import asyncio
import json
import logging
import os
from collections import Counter
from dataclasses import dataclass
from datetime import timedelta
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any
from unittest.mock import AsyncMock, Mock
Expand Down Expand Up @@ -941,3 +941,61 @@ async def handler(context: BasicCrawlingContext) -> None:
'│ crawler_runtime │ 300.0 │',
'└───────────────────────────────┴───────────┘',
]


async def test_crawler_manual_stop(httpbin: URL) -> None:
"""Test that no new requests are handled after crawler.stop() is called."""
start_urls = [
str(httpbin / '1'),
str(httpbin / '2'),
str(httpbin / '3'),
]
processed_urls = []

# Set max_concurrency to 1 to ensure testing urls are visited one by one in order.
crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=1))

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
processed_urls.append(context.request.url)
if context.request.url == start_urls[1]:
crawler.stop()

stats = await crawler.run(start_urls)

# Verify that only 2 out of the 3 provided URLs were made
assert len(processed_urls) == 2
assert stats.requests_total == 2
assert stats.requests_finished == 2


async def test_crawler_multiple_stops_in_parallel(httpbin: URL) -> None:
"""Test that no new requests are handled after crawler.stop() is called, but ongoing requests can still finish."""
start_urls = [
str(httpbin / '1'),
str(httpbin / '2'),
str(httpbin / '3'),
]
processed_urls = []

# Set max_concurrency to 2 to ensure two urls are being visited in parallel.
crawler = BasicCrawler(concurrency_settings=ConcurrencySettings(max_concurrency=2))

sleep_time_generator = iter([0, 0.1])

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
processed_urls.append(context.request.url)

# This sleep ensures that first request is processed quickly and triggers stop() almost immediately.
# Second request will have some sleep time to make sure it is still being processed after crawler.stop() was
# called from the first request and so the crawler is already shutting down.
await asyncio.sleep(next(sleep_time_generator))
crawler.stop(reason=f'Stop called on {context.request.url}')

stats = await crawler.run(start_urls)

# Verify that only 2 out of the 3 provided URLs were made
assert len(processed_urls) == 2
assert stats.requests_total == 2
assert stats.requests_finished == 2
Loading