Code Monkey home page Code Monkey logo

faktory_worker_python's People

Contributors

ajjahn avatar alexisprince1994 avatar antoine-gallix avatar cdrx avatar gowabash avatar rlankfo avatar schniz avatar shatgupt avatar tswayne avatar valo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

faktory_worker_python's Issues

Cannot install version 0.5.0 via pip

Today an update of Faktory was released (0.5.0) which included the backtrace parameter, but also the changes the following commit: 4aaaa37#diff-60f61ab7a8d1910d86d9fda2261620314edcae5894d5aaa236b821c7256badd7

The setup.py now refers to files that do not seem to be packaged. Below the output when I run pip install faktory==0.5.0 on MacOSX

Collecting faktory==0.5.0
  Downloading faktory-0.5.0.tar.gz (10 kB)
    ERROR: Command errored out with exit status 1:
     command: /usr/local/anaconda3/envs/blendle-streaming/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/setup.py'"'"'; __file__='"'"'/private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-pip-egg-info-3q6va8k9
         cwd: /private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/
    Complete output (5 lines):
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/private/var/folders/b7/tph8q_z12_lb13s4wd7rk1ch0000gn/T/pip-install-qkkjas4s/faktory_710524b00c484ed9950199e63eee2b3d/setup.py", line 4, in <module>
        dev_requires = open("dev-requirements.txt").read().strip().split("\n")
    FileNotFoundError: [Errno 2] No such file or directory: 'dev-requirements.txt'
    ----------------------------------------
WARNING: Discarding https://files.pythonhosted.org/packages/de/77/4a47ad1a84d33faaf179f0eb7f2e2f9d2d94118f9517d393aaf7d710d05d/faktory-0.5.0.tar.gz#sha256=101630a5788f5d11c17a1d5f0e1e04a8cc9164a886f54846fe29317d91d51327 (from https://pypi.org/simple/faktory/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
ERROR: Could not find a version that satisfies the requirement faktory==0.5.0
ERROR: No matching distribution found for faktory==0.5.0

worker thread got OSError: [WinError 6] The handle is invalid when stop worker from Web UI

  • issue describe
    I run the faktory server by runing docker image and follow Getting Started Python instrction to create client and worker test script and run the script individuly. At bigining all the process work finely, However when I quiet and then stop the work from the Web UI I got this error as below. Even though waiting for 1 minute the worker thread still hanging there.
    WARNING:faktory.worker:Faktory has quieted this worker, will not run any more tasks
    WARNING:faktory.worker:Faktory has asked this worker to shutdown, will cancel any pending tasks still running 25s time
    INFO:faktory.connection:Disconnected
    Error in atexit._run_exitfuncs:
    Traceback (most recent call last):
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\concurrent\futures\process.py", line 101, in _python_exit
        thread_wakeup.wakeup()
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\concurrent\futures\process.py", line 89, in wakeup
        self._writer.send_bytes(b"")
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\multiprocessing\connection.py", line 183, in send_bytes
        self._check_closed()
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\multiprocessing\connection.py", line 136, in _check_closed
        raise OSError("handle is closed")
    OSError: handle is closed
    Exception in thread QueueManagerThread:
    Traceback (most recent call last):
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\threading.py", line 917, in _bootstrap_inner
        self.run()
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\threading.py", line 865, in run
        self._target(*self._args, **self._kwargs)
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\concurrent\futures\process.py", line 354, in _queue_management_worker
        ready = wait(readers + worker_sentinels)
    File "C:\Users\1907003\AppData\Local\Programs\Python\Python37\Lib\multiprocessing\connection.py", line 872, in wait
        ov.cancel()
    OSError: [WinError 6] The handle is invalid。
    
  • environment
    os : Win10
    python : python 3.7.3 virtualenv
    faktory package : 0.4.0
  • step to reproduce the error
  1. start the faktory server by docker image
  2. run the worker and client script as following
    worker
    logging.basicConfig(level=logging.INFO)
    time.sleep(1)
    
    def adder(x, y):
        logging.info("%d + %d = %d", x, y, x + y)
    
    if __name__ == "__main__":
        w = Worker(queues=['default'], concurrency=1)
        w.register('adder', adder)
    w.run()
    
    client
    time.sleep(1)
    
    with faktory.connection() as client:
    while True:
        client.queue('adder', args=(random.randint(0,1000), random.randint(0,1000)))
        time.sleep(5)
    
  3. quiet and then stop the worker from Web UI
  4. get the error from worker thread

i/o timeout Closing connection

Hi guys .

we are trying Faktory our client is python API wrote using python Falcon and workers also in python. the problem is when doing load test on the API we got this error in faktory " i/o timeout Closing connection "
the code snippet below we use to push to faktory

client code

faktory_client = faktory.Client('tcp://ip:7419')
faktory_client.queue('x', args=('1','2','3'))
faktory_client.disconnect()

worker code

w = Worker('tcp://ip:7419', queues=['default'], concurrency=100)
w.register('recommendation', recommendation_updater)

we are using machine with 8 CPU. utilization reaches 1% of CPUs . and we are using tcp in the connection to faktory .
we are trying to push 10K concurrent request .

Worker doesn't reconnect to Faktory if the connection is reset

I have a single threaded worker defined as:

w = Worker(queues=['etherbi_decode'],
           concurrency=1,
           executor=ThreadPoolExecutor)

which works for a 2-3 hours and at some point falls into an infinite loop, not getting any new jobs and not sending heartbeats to the faktory server.

Logs from the worker before it falls into infinite loop:

INFO:faktory.connection:Connecting to faktory-faktory:7419 (with password None)
ERROR:faktory.worker:Task failed: 50c676346b5a4307826de2245caf5593
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "etherbi/decoder/decoder_worker.py", line 225, in decode_bucket
    _enqueue_block_decoded_task(task_number, 'calculate_burn_rate', 'etherbi_burn_rate_calculation')
  File "etherbi/decoder/decoder_worker.py", line 187, in _enqueue_block_decoded_task
    if faktory_client.queue(task, queue=queue, args=[task_number]):
  File "/app/src/faktory/faktory/client.py", line 32, in queue
    self.connect()
  File "/app/src/faktory/faktory/client.py", line 21, in connect
    self.is_connected = self.faktory.connect()
  File "/app/src/faktory/faktory/_proto.py", line 64, in connect
    self.socket.connect((self.host, self.port))
socket.timeout: timed out
INFO:__main__:Transaction to 0xaf30d2a7e90d7dc361c8c4585e9bb7d2f6f15bc7 is recognized in block 3917149 on position 98.
INFO:__main__:Transaction to 0xaf30d2a7e90d7dc361c8c4585e9bb7d2f6f15bc7 is recognized in block 3917154 on position 2.
<SOME_WORK_RELATED_LOGS_AS_THE_ABOVE>
INFO:faktory.connection:Connecting to faktory-faktory:7419 (with password None)
INFO:faktory.connection:Disconnected

running sudo strace -p <PID> -e trace=network -f -s 10000 against the process I get infinite stream of

[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0
[pid 16254] recvfrom(3, "", 4096, 0, NULL, NULL) = 0

We use python and elixir clients against the same faktory server and so far this behavior is observed only with the python workers.

Worker version: c5cb89b
Server version: 0.7.0

Explaination on the thread execution model

The worker has the option to choose threads over processes for it's concurrent execution model. But how does it exactly work? For n being the number of concurrent jobs, does it means that n threads are started when the worker starts, and that those same three threads will be processing multiple jobs without restarting (model 1) ? Or does it means that each new job will start it's own thread that is closed at the end of it, up to a maximum of n threads simultaneously (model 2)?

I am using thread-local global variables, that I expected to be created anew at each job execution (model 2), but from my debugging, I start to suspect that threads processes multiple tasks in a row without restarting (model 1).

Can you confirm/infirm my conclusion?

Batch Support

I reached out to Mike Perham about batch support in Python and he asked me to open an issue here.

Access to Job status in Python client for faktory

  • Which Faktory package and version?
  • 1.7.0
  • Which Faktory worker package and version?
  • 1.0.0
  • Please include any relevant worker configuration
  • Please include any relevant error messages or stacktraces

Is it possible to have API's that can give access to Enqueued, Busy and Processed jobs via the python client for Faktory.
If I go ahead with the enterprise version of Faktory, how will python client support the access to Job Status.

As per this link [https://github.com/contribsys/faktory/wiki/Ent-Tracking] these API's are available only in Ruby and GO.

AttributeError: Can't pickle local object 'ETL_task.<locals>.task'

I see the following error in the logs of a faktory worker. The worker works correctly with simple test tasks like a simple print('hello'), or print(number * f"hello {name}"), showing that the worker can run jobs, take string and integer arguments. I then queue a job that works well when running outside of a worker, and I receive the following error message:

Task failed: f194d1e1811843739f297a1efe81cc74
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'ETL_task.<locals>.task'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/antoine/.cache/pypoetry/virtualenvs/dataworker-poc-BCFfAKR5-py3.10/lib/python3.10/site-packages/faktory/worker.py", line 248, in send_status_to_faktory
    future.result(timeout=1)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'ETL_task.<locals>.task'

To understand my error message, in my case ETL_task is the name of the module defining the decorator, and task is the name of the wrapped function inside my decorator.

It's hard for me to even diagnose where the problem comes from, as the stack trace goes through two packages that are not my code. I also try to run the worker through python -m pdb, and I still see the error message but without falling into the debugger.

How can I understand what's happening?

worker job id

I run the folowing example

from faktory import Worker
import logging
import time


def your_function(x, y):
    time.sleep(15)
    return x + y


w = Worker(queues=['default'], concurrency=1)
w.register('test', your_function)
print(w.get_worker_id())
logging.basicConfig(level=logging.DEBUG)
w.run()  # runs until control-c or worker shutdown from Faktory web UI

worker id prints as 32860cb098ae4e13ab8a7f44af498bf1
but heartbeat shows
DEBUG:faktory.worker:Sending heartbeat for worker 9e9775cb1abc47b7b6c6bc1ec6689610

looking in the web interface it seem that the debug message contains the correct id.

Example worker restarting itself on 3.9

What I'm I seeing:

When running the sample code it seems like the worker crashes/restarts itself without the registered function being called.

What I'm expecting:

For the registered function to be ran.

What I'm doing:

I have some code that I've been running without problem on 4 different macs. This is the first time I try to run in on an m1. All of the other macs have python 3.7, this one has 3.9. Other than that everything remained the same. I tried to remove as much code from my working project to try to reproduce this but it then ocurred to me to use the sample code from the readme. And it behaves the same.

What I did is I added some print statements to that code and ran it:

from faktory import Worker

def your_function(x, y):
    print("running your_function")
    return x + y

w = Worker(queues=['default'], concurrency=1)
w.register('test', your_function)
print("after register")
w.run()  # runs until control-c or worker shutdown from Faktory web UI

After running the example server code I get on the terminal the expected output:

after register

...but if I use the client code to send a job to be executed now the console shows this:

after register
after register

Like the whole code is being run again and the function is not executed.

Any ideas on how to debug this are appreciated.

Thank you

Slow Job Processing due to Concurrency Architecture

I've noticed in some simple testing that this doesn't process jobs very fast. It seems that this is due to the code structure of how the jobs are fetched. This seems to be because the Thread/Process that pulls jobs can only pull a job once there is an Executor Thread that is not busy. If there are no Executor Threads ready, we hit a 0.25s sleep before checking again (source).

This means that once a job completes, we could almost take up to 0.25s before we start working another job.

I know the Ruby faktory client, the "Executor" (Processor) threads are the ones that are doing the fetching, so they will immediately pull the next job once they're complete with their current job.

Would you be open to a pull request to refactor this code to match the Ruby implementation where the Executor Threads are the ones doing the fetching?

How to use the `at` argument in queue?

Hi, there's an at argument in the queue function and I believe it is used for setting the time the producer is sent to the server, but how do I use this? What is its value type?

from Client.py:
def queue( self, task: str, args: typing.Iterable = None, queue: str = "default", priority: int = 5, jid: str = None, custom=None, reserve_for=None, at=None, retry=5, backtrace=0, )

BrokenProcessPool error

Hey guys! I've been using this faktory adaptor for quite some time and have had great success with it!

My application consists of a faktory worker inside of a kubernetes pod, which runs a image processing algorithm. The system works really well, but when I checked the logs recently I encoutered this error:

                    ╭──── Traceback (most recent call last) ─────╮              
                    │ /app/venv/lib/python3.8/site-packages/fakt │              
                    │ ory/worker.py:248 in                       │              
                    │ send_status_to_faktory                     │              
                    │                                            │              
                    │   245 │   │   │   if future.done():        │              
                    │   246 │   │   │   │   self._pending.remove │              
                    │   247 │   │   │   │   try:                 │              
                    │ ❱ 248 │   │   │   │   │   future.result(ti │              
                    │   249 │   │   │   │   │   self._ack(future │              
                    │   250 │   │   │   │   except KeyboardInter │              
                    │   251 │   │   │   │   │   self._fail(futur │              
                    │                                            │              
                    │ /usr/local/lib/python3.8/concurrent/future │              
                    │ s/_base.py:437 in result                   │              
                    │                                            │              
                    │   434 │   │   │   │   if self._state in [C │              
                    │   435 │   │   │   │   │   raise CancelledE │              
                    │   436 │   │   │   │   elif self._state ==  │              
                    │ ❱ 437 │   │   │   │   │   return self.__ge │              
                    │   438 │   │   │   │                        │              
                    │   439 │   │   │   │   self._condition.wait │              
                    │   440                                      │              
                    │                                            │              
                    │ /usr/local/lib/python3.8/concurrent/future │              
                    │ s/_base.py:389 in __get_result             │              
                    │                                            │              
                    │   386 │   def __get_result(self):          │              
                    │   387 │   │   if self._exception:          │              
                    │   388 │   │   │   try:                     │              
                    │ ❱ 389 │   │   │   │   raise self._exceptio │              
                    │   390 │   │   │   finally:                 │              
                    │   391 │   │   │   │   # Break a reference  │              
                    │   392 │   │   │   │   self = None          │              
                    ╰────────────────────────────────────────────╯              
                    BrokenProcessPool: A process in the process                 
                    pool was terminated abruptly while the future               
                    was running or pending.

I saw that this error apparently was resolved in #42 , but my pods are runnin faktory 1.0.0, so the fix should be availiable. This does not kill the pod or make it restart, but I would like to get to the bottom of this. Any tips?

Thanks in advance!

Exception when pushing a job

when args is a tuple:

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.5/concurrent/futures/process.py", line 175, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/src/discovery/tasks/heartbeat.py", line 37, in heartbeat
    'mac': mac
  File "/src/discovery/marshall.py", line 74, in process
    return self._process(data, device, update_last_seen)
  File "/src/discovery/marshall.py", line 162, in _process
    device.sync()
  File "/src/discovery/models/device.py", line 128, in sync
    faktory.push('server.sync', 'discovery.sync', args=(str(self.uuid), ), retry=3)
  File "/src/server/async/__init__.py", line 12, in push
    c.queue(task=task, queue=queue, args=args, retry=retry, priority=priority, reserve_for=reserve_for)
  File "/usr/local/lib/python3.5/dist-packages/faktory/client.py", line 57, in queue
    if not isinstance(args, (typing.Iterator, typing.Set, typing.List, typing.Tuple)):
  File "/usr/lib/python3.5/typing.py", line 725, in __instancecheck__
    raise TypeError("Tuples cannot be used with isinstance().")
TypeError: Tuples cannot be used with isinstance().
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/faktory/worker.py", line 225, in send_status_to_faktory
    future.result(timeout=1)
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
TypeError: Tuples cannot be used with isinstance().

Single arg as array

I am working with go and python and for a python call to work with go, when there is only one argument, you should pass this argument as array (or it will accuse an error from the go - string to [] interface {}).

I do not understand much of python so I do not know if it is easier to update the documentation or test when the argument is a single arg.

Great job, by the way :-)

with faktory.connection() as client: client.queue('some_job', args=([single_argument]), queue='some_job')

ImportError: cannot import name 'Worker' on Example

I am trying to use the Worker Example, and I am getting the following error.

Traceback (most recent call last):
  File "faktory.py", line 1, in <module>
    from faktory import Worker
  File "/home/ubuntu/scale/python-service/faktory.py", line 1, in <module>
    from faktory import Worker
ImportError: cannot import name 'Worker'```

Faktory is installed, since the Client is working perfectly fine.

I am running it in ubuntu 14.04 and I am using python 3.6.

Any ideas of what is going on? I will update the README.md if necessary after finding the issue.

Thanks!

Worker crashes if there's an error when fetching new jobs

For example, if there's a timeout, the worker will crash. Relevant traceback:

timeout: The read operation timed out
  File "entry.py", line 36, in <module>
    w.run()
  File "faktory/worker.py", line 149, in run
    self.tick()
  File "inventory/common/faktory/worker.py", line 84, in tick
    job = self.faktory.fetch(self.get_queues())
  File "faktory/_proto.py", line 160, in fetch
    job = next(self.get_message())
  File "faktory/_proto.py", line 115, in get_message
    buffer = socket.recv(self.buffer_size)
  File "ssl.py", line 1056, in recv
    return self.read(buflen)
  File "ssl.py", line 931, in read
    return self._sslobj.read(len)

Getting INFO times out after starting a lot of workers

  • using python
  • start ~15 workers
  • ping INFO every 90s or so

Several times a day, the worker gets hung waiting for INFO to return.

Possibly related, workers sometimes will not receive work from queue despite jobs being present.
(Just had a case of 20 workers, 5 jobs, no work being done; restart them, they pull the work)

Faktory 1.4.1 - running in Docker on AWS. contribsys/faktory:latest

FaktoryConnectionResetError in Connection.reply

We are seeing occasional stack traces in our faktory workers like:

'FAIL {"jid": "4e9f7352c8004407bc53287aabbffd3c", "errtype": "FaktoryConnectionResetError", "message": ""}'

These are occurring within faktory._proto.Connection's reply method. Currently line 325:
https://github.com/cdrx/faktory_worker_python/blob/master/faktory/_proto.py#L325

sent = self.socket.send(buffer)

I'm not very familiar with this code but it seems like some kind of retry might be needed here?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.