Code Monkey home page Code Monkey logo

Comments (11)

jirimoravcik avatar jirimoravcik commented on August 25, 2024 2

Maybe run each variant 100 times to get some statistical significance?

from crawlee-python.

fnesveda avatar fnesveda commented on August 25, 2024 1

For the fixed concurrency this looks pretty good, but I'm afraid that adapting this to dynamic concurrency will require a big rewrite, as asyncio.Semaphore doesn't support changing the limit while it's running. Maybe we could implement our own version which would support that, it doesn't look like the current implementation is too complicated.

Also I think in the end we would like the Python AutoscaledPool to have a similar interface to the JS AutoscaledPool, and we'll have to adapt the concurrency approach to that interface too. For that interface, the approach from the post from Jirka would work better.

So perhaps in PoC v3 you could create some simple AutoscaledPool class with a similar interface to the JS one, without the actual scaling based on CPU and memory, and then you could choose the best concurrency approach that works with that.

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

PoC v1 - asyncio.gather

  • This solution uses asyncio.gather to execute a fixed number of (request processing) tasks together.
  • This has a massive drawback since the asyncio.gather will last as long as the slowest request processing.
  • In the next PoCs I'd like to try to add tasks one by one to the event loop and use some mechanism from asyncio library to monitor the number of running tasks.
class BeautifulSoupCrawler:
    
    def __init__(self, handle_request: Callable, max_depth: int, desired_concurrency: int) -> None:
        self.handle_request = handle_request
        self.max_depth = max_depth
        self.desired_concurrency = desired_concurrency

    async def run(self, start_urls: list) -> None:
        request_queue = await Actor.open_request_queue()

        # Enqueue the starting URLs in the default request queue
        for start_url in start_urls:
            url = start_url.get('url')
            Actor.log.info(f'Enqueuing {url} ...')
            await request_queue.add_request({'url': url, 'userData': {'depth': 0}})

        # Process all requests from the default request queue
        while not await request_queue.is_empty():
            tasks: set[asyncio.Task] = set()

            while len(tasks) < self.desired_concurrency:
                request = await request_queue.fetch_next_request()
                if not request:
                    break
                cor = self.handle_request(request, request_queue, self.max_depth)
                task = asyncio.create_task(cor)
                tasks.add(task)

            Actor.log.info(f'Gonna process {len(tasks)} requests')
            await asyncio.gather(*tasks)

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

PoC v2 - asyncio.Semaphore

  • This crawler uses asyncio.create_task to create tasks for processing requests, and then utilizing asyncio.Semaphore keeps the desired concurrency.
  • Drawback: As Franta pointed out, asyncio.Semaphore doesn't support changing the limit while it's running.
class BeautifulSoupCrawler:

    ASYNCIO_SLEEP_DURATION = 0.1

    def __init__(self, handle_request: Callable, max_depth: int, desired_concurrency: int) -> None:
        self.handle_request = handle_request
        self.max_depth = max_depth
        self.desired_concurrency = desired_concurrency

    async def run(self, start_urls: list) -> None:
        request_queue = await Actor.open_request_queue()

        # Enqueue the starting URLs in the request queue
        for start_url in start_urls:
            url = start_url.get('url')
            Actor.log.info(f'Enqueuing {url} ...')
            await request_queue.add_request({'url': url, 'userData': {'depth': 0}})

        # Create a semaphore to control concurrency
        semaphore = asyncio.Semaphore(self.desired_concurrency)

        # Create a set to keep track of running tasks
        running_tasks: set[asyncio.Task] = set()

        # Process all the requests
        while True:
            # Ensure that the desired concurrency is maintained
            while len(running_tasks) >= self.desired_concurrency:
                await asyncio.sleep(self.ASYNCIO_SLEEP_DURATION)
                running_tasks -= {task for task in running_tasks if task.done()}

            # Start processing the next request from the request queue
            if request := await request_queue.fetch_next_request():
                assert isinstance(request, dict)
                await semaphore.acquire()

                # Create a task to process the request
                cor = self.handle_request(request, request_queue, self.max_depth)
                task = asyncio.create_task(cor)
                task.add_done_callback(lambda _: semaphore.release())
                running_tasks.add(task)

            # Else request queue is empty...
            else:
                # In this case we have to wait for all running tasks to complete since they can enqueue more requests
                Actor.log.info('Waiting for running tasks to complete...')

                while len(running_tasks) != 0:
                    await asyncio.sleep(self.ASYNCIO_SLEEP_DURATION)
                    running_tasks -= {task for task in running_tasks if task.done()}

                # Request queue is empty and there are no running tasks, we are done
                if await request_queue.is_empty():
                    Actor.log.info('Breaking...')
                    break

        # Make sure that all running tasks are completed and the request queue is empty
        assert len(running_tasks) == 0
        assert (await request_queue.is_empty()) is True

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

@jirimoravcik, @fnesveda If you guys please look at that, I'll be glad for any feedback, concerns, opinions... Thanks.

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

Thanks. Yeah... I definitely want to stick to the interface of JS AutoscaledPool, this was just a first pitch trying to process the requests somehow concurrently. The post is great, I'll write tomorrow the PoC v3 utilizing asyncio.wait 💪.

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

PoC v3 - asyncio.wait

  • This crawler uses asyncio.create_task to create tasks for processing requests, and then utilizing asyncio.Semaphore keeps the desired concurrency.
class BeautifulSoupCrawler:

    def __init__(self, handle_request: Callable, max_depth: int, desired_concurrency: int) -> None:
        self.handle_request = handle_request
        self.max_depth = max_depth
        self.desired_concurrency = desired_concurrency

    async def run(self, start_urls: list) -> None:
        request_queue = await Actor.open_request_queue()

        # Enqueue the starting URLs in the request queue
        for start_url in start_urls:
            url = start_url.get('url')
            Actor.log.info(f'Enqueuing {url} ...')
            await request_queue.add_request({'url': url, 'userData': {'depth': 0}})

        # Create a list to keep track of running tasks
        running_tasks: set[asyncio.Task] = set()

        # Process all the requests
        while True:
            # Check if there is room for more tasks
            while len(running_tasks) >= self.desired_concurrency:
                done, _ = await asyncio.wait(running_tasks, return_when=asyncio.FIRST_COMPLETED)
                running_tasks -= done  # Remove completed tasks from the set

            # Start processing the next request from the request queue
            if request := await request_queue.fetch_next_request():
                assert isinstance(request, dict)

                # Create a task to process the request
                cor = self.handle_request(request, request_queue, self.max_depth)
                task = asyncio.create_task(cor)
                running_tasks.add(task)  # Add the task to the set

            # Else request queue is empty...
            else:
                # In this case we have to wait for all running tasks to complete since they can enqueue more requests
                Actor.log.info('Waiting for running tasks to complete...')
                done, _ = await asyncio.wait(running_tasks, return_when=asyncio.ALL_COMPLETED)
                running_tasks -= done
                assert len(running_tasks) == 0

                # Request queue is empty and there are no running tasks, we are done
                if await request_queue.is_empty():
                    Actor.log.info('Breaking...')
                    break

        # Make sure that all running tasks are completed and the request queue is empty
        assert len(running_tasks) == 0
        assert (await request_queue.is_empty()) is True

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

PoC v0 - the current state

  • Just processing the requests one at a time as we do in the Actor templates.
from typing import Callable

from apify import Actor


class BeautifulSoupCrawler:
    
    def __init__(self, handle_request: Callable, max_depth: int, desired_concurrency: int) -> None:
        self.handle_request = handle_request
        self.max_depth = max_depth
        self.desired_concurrency = desired_concurrency

    async def run(self, start_urls: list) -> None:
        request_queue = await Actor.open_request_queue()

        # Enqueue the starting URLs in the request queue
        for start_url in start_urls:
            url = start_url.get('url')
            Actor.log.info(f'Enqueuing {url} ...')
            await request_queue.add_request({'url': url, 'userData': {'depth': 0}})

        # Process all requests from the request queue one by one
        while request := await request_queue.fetch_next_request():
            await self.handle_request(request, request_queue, self.max_depth)

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

@jirimoravcik, @fnesveda A PoC v3 using the asyncio.wait is ready.

I also did some testing with the following configuration:

  • Desired concurrency: 10
  • Total requests made: 90
  • Input:
{
  "start_urls": [
    { "url": "https://apify.com" },
    { "url": "https://crawlee.dev" }
  ],
  "max_depth": 1
}

PoC v0:

PoC v1 - asyncio.gather:

PoC v2 - asyncio.Semaphore:

PoC v3 - asyncio.wait:

Of course, the only weird thing is the difference between local and platform execution time of Semaphore & wait solutions. I tried it many times, everytime resulting in the similar times. Do you have guys any idea what it causes?

from crawlee-python.

janbuchar avatar janbuchar commented on August 25, 2024

Maybe run each variant 100 times to get some statistical significance?

I'd even consider running the benchmark against some local mock HTTP service. My personal guess is that any difference between v2 and v3 is caused by the unpredictability of network delays.

from crawlee-python.

vdusek avatar vdusek commented on August 25, 2024

Measure the performance of the PoCs

  • All of the 4 variants were built as an Actor version on the platform.
  • Every variant was executed 75x on the platform with the configuration below.
  • In every Actor run 90 requests were made.

Configuration

  • desired_concurrency = 10
  • start_urls = [{ "url": "https://apify.com" }, { "url": "https://crawlee.dev" }]
  • max_depth = 1

Results

  • Average time of running crawler.run().
{
  "crawler_basic": 41.2554286702474,
  "crawler_gather": 22.39655220667521,
  "crawler_semaphore": 15.280439224243164,
  "crawler_wait": 13.826195780436198
}

Summary

  • So as expected semaphore and wait variants outperformed the gather, and of course the basic.
  • Since wait variant also supports a dynamic change of the concurrency, it seems like a clear winner for further steps.
  • I'm closing this issue and moving on to the next steps, as outlined in the follow-up issue #2.

from crawlee-python.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.