Code Monkey home page Code Monkey logo

sqs-workers's People

Contributors

goncalossilva avatar imankulov avatar lukemerrett avatar proxi avatar schnouki avatar sodre avatar tartansandal avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

sqs-workers's Issues

Wildcard dependency on werkzeug breaks sqs-workers

Werkzeug 2.1.0 released on 2022-03-28 and removed bind_arguments and validate_arguments (changelog) which breaks this library when installing fresh as it has a wild card requirement on the werkzeug version.

Traceback (most recent call last):
  File "/Users/rsiemens/sqs-worker-spike/worker.py", line 1, in <module>
    from jobs import queue
  File "/Users/rsiemens/sqs-worker-spike/jobs.py", line 1, in <module>
    from sqs_workers import SQSEnv
  File "/Users/rsiemens/sqs-worker-spike/.venv/lib/python3.9/site-packages/sqs_workers/__init__.py", line 9, in <module>
    from sqs_workers.queue import JobQueue, RawQueue
  File "/Users/rsiemens/sqs-worker-spike/.venv/lib/python3.9/site-packages/sqs_workers/queue.py", line 11, in <module>
    from sqs_workers.async_task import AsyncTask
  File "/Users/rsiemens/sqs-worker-spike/.venv/lib/python3.9/site-packages/sqs_workers/async_task.py", line 5, in <module>
    from sqs_workers.utils import adv_bind_arguments
  File "/Users/rsiemens/sqs-worker-spike/.venv/lib/python3.9/site-packages/sqs_workers/utils.py", line 5, in <module>
    from werkzeug.utils import bind_arguments, validate_arguments
ImportError: cannot import name 'bind_arguments' from 'werkzeug.utils' (/Users/rsiemens/sqs-worker-spike/.venv/lib/python3.9/site-packages/werkzeug/utils.py)

Support for arbitrary SQS messages?

Hi. I came across this project because I am looking for a solution that will allow to run an execution pool (workers) that can process tasks from any AWS SQS queues with little effort. I mean queues that were not created by the application but are already present in AWS. These are e.g. various types of notifications from AWS systems, such as the creation of a file in S3.

I saw this:

Unless you are the part of the Doist development team, you most likely don't need it. It's something opinionated, built out of our own internal needs and probably provides little value for outside developers.

But I still hoped that this package allows for "just processing SQS messages".

I tried this script:

from sqs_workers import SQSEnv, create_standard_queue
sqs = SQSEnv()
import sdb

queue = sqs.queue("my_queue")

@queue.processor("process_blabla")
def process_blabla(*args):
    sdb.set_trace()
    pass

queue.process_queue()

But I only see:

UserWarning: Error while processing my_queue.None

from:

def process_message_fallback(self, message):
    warnings.warn(
        "Error while processing {}.{}".format(self.name, get_job_name(message))
    )
    return False
def get_job_name(message):
    attrs = message.message_attributes or {}
    return (attrs.get("JobName") or {}).get("StringValue")

Question: Is this project meant to support arbitrary SQS messages, or only those that have a special format that you use in your projects? e.g. mentioned "JobName"

Take the queue url directly instead of its name

Taking in the queue name instead of its url is an unfortunate choice, as it takes another SQS API call to figure out the queue url from its name, and this API call is covered by a dedicated permission (sqs:GetQueueUrl).

It would be great to see the change that allows code to take in the preexisting queue url directly instead of its name, and send/receive/delete messages on such a queue relying only on these limited IAM permissions:

  • sqs:ReceiveMessage
  • sqs:DeleteMessage
  • sqs:DeleteMessageBatch
  • sqs:ChangeMessageVisibility
  • sqs:ChangeMessageVisibilityBatch
  • sqs:SendMessage
  • sqs:SendMessageBatch

pickle doesn't work on mac m1

I've tried to use this library but had a problem with the pickled inside the library

Hope this traceback helps you to fix this bug

Traceback (most recent call last):
  File "/Users/svyatoslavkalina/Desktop/Projects/new/app.py", line 24, in <module>
    sqs.process_queues()
  File "/Users/svyatoslavkalina/Desktop/Projects/new/venv/lib/python3.10/site-packages/sqs_workers/sqs_env.py", line 132, in process_queues
    p.start()
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'module' object

Process finished with exit code 1

Example of code:

# import pip
from sqs_workers import SQSEnv, create_standard_queue


sqs = SQSEnv()

create_standard_queue(sqs, "emails")
queue = sqs.queue("emails")


@queue.processor("send_email")
def send_email(to, subject, body):
    print('asd')

queue.add_job("send_email", to="[email protected]", subject="Hello world", body="hello world")


sqs.process_queues()

Unexpected behaviour of .delay

Before job added to the queue, we executing adv_bind_arguments here

If processor has **kwargs defined inside, all data will be lost and "Argument missing" error will be raised. This is example of adv_bind_arguments behaviour with **kwargs function:

from sqs_workers.utils import adv_bind_arguments

def wrapper_func(*args, **kwargs):
     def func(a):
           print(a)
     func(*args, **kwargs)
     print(args, kwargs)

adv_bind_arguments(wrapper_func, [], dict(a=1))
# pring result: {'args': (), 'kwargs': {}}

As you can see a=1 is lost and "Argument missing" error will be raise when processor will try to execute this job.
This is especially critical when you want to decorate your sqs processor, because you should have *args, **kwargs statement inside a decorator.

How well can sqs-workers scale?

I have an SQS queue which might receives millions of tasks a day. Does this implementation of sqs-worker has the capability to scale to larger workloads without failure?

Rather generic name for a package meant for one team

Hello,

If this package is meant to be used by the Doist only, it would be neater to give it a corresponding namespace, like sqs-workers. This would allow other developers to actually publish generic SQS worker libraries. Otherwise this is a bit like squatting.

Thanks in advance!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.