Code Monkey home page Code Monkey logo

faktory_worker_go's People

Contributors

adsteel avatar besser avatar claytonnorthey92 avatar diegoy avatar fkollmann avatar hassansin avatar igneous avatar jonathan-wondereur avatar jose-zenledger avatar joshjimenez-cd avatar koesie10 avatar matt-lougheed avatar mperham avatar valve avatar vosmith avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

faktory_worker_go's Issues

heartbeat error: ERR Unknown worker

We have faktory setup in GCP.

There is a compute engine VM running docker image contribsys/faktory:1.5.1 and a cloud run worker service using github.com/contribsys/faktory_worker_go v1.4.2

A few days ago, for several hours we saw saw heartbeat error: ERR Unknown worker 4pf5cn20s4elh every 10 seconds.

It looks like there was a network hiccup so it makes sense that faktory wasn't able to get a heartbeat from a worker for a time so it assumed the worker was bad and no longer recognized it.

What doesn't make sense if during the time of this error, it looks like the worker microservice was still able to process some events. The error was going on for hours and we see quite a few events still got processed throughout that time.

Is this expected? Trying to understand faktory better to better debug and utilize it.

Enqueued Jobs are Stuck

About a week ago, I noticed that the enqueued jobs was growing.

Screen Shot 2020-02-10 at 3 24 49 PM

I tried updating from faktory 1..1.0-1 to 1.2.0-1. I also updated faktory_worker_go to the latest release. This did not solve the problem. On some of my jobs, they might kick off another job right before they return. I've noticed that the kick-off job does not get removed from the queue while the new job gets added. The worker is still running, though it is not picking up new jobs, nor does it seem to tell faktory that it is done with existing jobs. The worker is running and has no logs. My worker is configured as so:

// Run faktory worker server and register jobs
mgr := worker.NewManager()
// faktory.RegisterJobs(mgr, procedures) - application code which calls mgr.Register for each known job
mgr.ProcessStrictPriorityQueues(faktory.Critical, faktory.Default, faktory.Bulk)
mgr.Run()

Please let me know how else I can help debug.

Windows build

Add an Appveyor Windows CI build to ensure the Go worker process can execute successfully on Windows.

All worker are assigned the same wid

Hello again,
When running multiple workers in different docker containers, they all end up having the same worker id. And it confuses the faktory server thinking there is only one worker.

https://github.com/contribsys/faktory_worker_go/blob/master/runner.go#L106 rand.Int63() by default always uses the same seed value of 1. So it returns the same deterministic values when invoked repeatedly. I think it should use an unique id generator instead of a random number for worker id.

Can't get a minimal working example

Hi,

I've successfully make faktory_worker_ruby working but I can't make it worked with go.

Faktory is installed and is running.

I copy and run the test/main.go in my project and it didn't work. So I clone this repo and tried to run test/main.go with no success.

I'm having the following error in both cases:

nicolas@mbp-nicolas faktory_worker_go % go run test/main.go
2021/01/19 18:01:35.217337 faktory_worker_go 1.4.0 PID 88049 now ready to process jobs
&{Pool:0xc000190220}
panic: Expected: <nil>

goroutine 6 [running]:
main.unique.func1(0xc0001922c0, 0xc0001922c0, 0x0)
	/Users/nicolas/www/others/faktory_worker_go/test/main.go:117 +0x359
github.com/contribsys/faktory/client.(*Pool).With(0xc00018a2b0, 0xc0001a7fa0, 0x0, 0x0)
	/Users/nicolas/go/pkg/mod/github.com/contribsys/[email protected]/client/pool.go:54 +0x9f
main.unique()
	/Users/nicolas/www/others/faktory_worker_go/test/main.go:98 +0x127
main.main.func3(0xc00001c3f0)
	/Users/nicolas/www/others/faktory_worker_go/test/main.go:80 +0x38
created by main.main
	/Users/nicolas/www/others/faktory_worker_go/test/main.go:74 +0x6bb
exit status 2

Thanks in advance for your help

fwg cycles if I use faktory server above 1.2.0

I recently upgraded our Faktory server to 1.4.0 but I am now seeing that our golang workers always say started less than 1minute ago. I get the same behavior for 1.3.0. If you refresh the page frequently you can see the process disappear. strangely it does still appear to run jobs, but I find it concerning that it is disappearing.

tested locally with
github.com/contribsys/faktory_worker_go v1.3.0-1

Testing/Mocking

Is there a best practice as to how to mock faktory for testing?

.Get() on pool not following the pool capacity

To understand how the pools work I created a pool with capacity of 3.

        pool, err := faktory.NewPool(3)
 	if err != nil {
		log.Fatalln("Unable to connect to Faktory", err.Error())
	}
	log.Println("Pool created, length = ", pool.Len()) // length = 0

Then I called .Get() 10 times, expecting it to fail from the fourth call to .Get(). But all 10 calls succeeded.

        cli := []*faktory.Client{}
	for i := 0; i < 10; i++ {
		c, err := pool.Get()
		if err != nil {
			log.Fatalln("Unable to get client from pool", err.Error()) // never reached
		}
		cli = append(cli, c)
		fmt.Println("Got client ", i+1)
		fmt.Println("Length of pool", pool.Len())
	}

I was even able to push jobs using all 10 *faktory.Client.

        for i := 0; i < 10; i++ {
		job := faktory.NewJob("lazy", i)
		cli[i].Push(job) // succeeded, job was seen from web ui
	}

Finally I tried to return all of them, but only 5 clients were returns.

	for i := 0; i < 10; i++ {
		c := cli[i]
		pool.Put(c)
		fmt.Println("Got client ", i+1)
		fmt.Println("Length of pool", pool.Len()) // didn't go beyond 5.
	}

Is this an expected behavior? If yes, how to control the total number of connections?

Implement hard shutdown

FWR implements a hard shutdown timeout of 25 seconds, exactly like Sidekiq, but FWG never implemented it because contexts were not part of the original API design. A year or two later we integrated Contexts but only to provide access to helper Values.

Today FWG will pause forever, waiting for all jobs to stop. Most jobs execute quickly and finish within 30 seconds but longer jobs can delay shutdown, leading to platforms like Heroku KILLing FWG without warning, orphaning those long jobs for the reservation timeout (default: 30 minutes).

The proper and canonical shutdown process should:

  1. receive an external signal
  2. flag that shutdown is starting
  3. internal goroutines should exit any process loop/select
  4. run any registered Shutdown callbacks
  5. pause up to ShutdownTimeout for existing jobs to finish
  6. cancel job context so lingering jobs quickly die with an error
  7. FAIL any jobs in (5) so they can be re-executed soon
  8. Close connection pool and exit.

Ideally this will guarantee that FWG can gracefully stop the vast majority of jobs and FAIL any which linger past 25 seconds so they can be re-executed quickly after process restart.

This is the fix for contribsys/faktory#468

Need context access to set and get the context

I used https://github.com/gocraft/work before this. In that, I can create my own Context and can use it in middleware and inside the worker func.

In faktory, I can get the value from the context from the interface. But I could not set the value to the context.

image

Can you help me to set context values?
like SetValue("log_map", map[string]interface{}{"app": "demo app", "user_id": 12})

Because our logs print current user related values in all logs using the context.

Job results not being reported properly.

I'm seeing fairly frequent instances in which a job appears to be running for 30 minutes -- but, as far as I can tell, is not.

One of the job types where I see this has a metric that is recorded to DataDog when it successfully completes. That metric is never above 90 seconds or so. I am seeing some job failures here and there, but they are all below 30 minutes.

Instead, I suspect this is related to another problem we're having: We're seeing bursts of dial errors with workers and other clients trying to connect to Faktory unsuccessfully. What would happen if a job completed (success or failure), but the worker was unable to report that status back to the server because it couldn't get a connection to the server?

Looking at faktory_worker_go:

https://github.com/contribsys/faktory_worker_go/blob/master/runner.go#L260

				mgr.with(func(c *faktory.Client) error {
					if err != nil {
						return c.Fail(job.Jid, err, nil)
					} else {
						return c.Ack(job.Jid)
					}
				})

It appears that .with can return an error, but that this error is routinely ignored.

I don't think calling panic is appropriate for this situation, but perhaps some combination of the following might be helpful:

  1. Getting and holding a connection to both retrieve the job, and report the result (and maybe as a side-effect, make the connection available to the worker via context so it can fire jobs without having to establish its own connection).
  2. Introducing a pluggable logger mechanism so we can at least record that these failures are happening.
  3. Having some sort of retry loop, specifically around reporting the results of a job.

The first option would have some risks/challenges of its own, of course. You'd need to ensure the connection didn't time out, handle reconnecting if it did go away (either due to timeout or a server failure), etc. I'm sure you have more insight into how it would impact operations concerns in general, so forgive me if it's an Obviously Stupid Idea. That said, for situations involving a relatively high job volume (mid-hundreds- low-thousands per second), the many-transient-connections thing has proven to be a bit of a challenge (I'm paying attention to #219 / #222 for this reason, and we've been having to be careful about tuning things like FIN_WAIT and such in our server configuration).

Test if a job is pushed to the client from within a worker

I am currently using the testing package to test the worker's perform method. Within my worker's method, I am pushing new jobs to a different worker and I would like to assert if that is happening.

myFunc := func(ctx context.Context, args ...interface{}) error {
	return help.With(func(cl *faktory.Client) error {
        job := faktory.NewJob("AnotherJob", 2)
        return cl.Push(job)
    })
}

pool, err := faktory.NewPool(5)
perf := worker.NewTestExecutor(pool)

somejob := faktory.NewJob("OneJob", 1)
err = perf.Execute(somejob, myFunc)

// assert if another job is pushed...

It would be really helpful to have such helper.

Worker struct not updating between different unit tests

I have a series of golang tests using a local pool with concurrency of 1. It seems that the worker state is leaking between unit tests nondeterministically, and I'm struggling to find in the docs or source code where this could be happening. We initialize a struct called Worker that contains a faktory_worker_go.Manager instance, and call worker.manager.Register(job_name, worker.JobFunc) for all of our job handlers. Inside the handlers, the reference to the worker struct sometimes does not contain variables from the current test, and instead contains old variables from previous tests. Any ideas why?

Cannot set retries to 0.

https://github.com/contribsys/faktory/blob/master/server/commands.go#L57

Setting this in the server creates a problem wherein Go-based clients cannot set the retry count to 0: The use of omitempty in the definition of Job will result in any field whose value is the default for that type being omitted. Since 0 is the default value for int in Go, the JSON marshaller omits the field when this is set to zero.

You could address this by making the field a *int, but that will result in compile errors for existing Go clients that attempt to set this value. You could also address this by removing the omitempty declaration, but that will break clients silently if they construct a Job without going through faktory.NewJob and don't set this field properly.

Regardless, I'm now stuck in a situation where my only option is to use -1 and thus send jobs to the dead queue, when I'd prefer they get discarded. Any suggestions here?

Sending structs to jobs

Hey Mike, sorry for bothering you too much.

I have a question, Is there a way of sending a struct to a job?

import (
  faktory "github.com/contribsys/faktory/client"
)

type Potato struct {
  id string
  name string
}

potato := &Potato{"123", "Go potato!!"}

client, err := faktory.Open()
job := faktory.NewJob("SomeJob", potato)
err = client.Push(job)

I'm not sure if I have to use job.SetCustom("potato", potato) or not... If I have to do it, I don't know how to get potato from the job. 🤕

thank you very much!

Getting number of Enqueued jobs in log file?

Is there any to get the number of jobs that are in the Enqueued status? I know it can be done from the /info endpoint but can it be added to the log so it can be scraped? Any other suggestions?

Thanks

Support callback on job death

As of today, there's no way to know when a Job is about to die besides taking a look on Faktory's web UI Morgue view.

Sidekiq offers a sidekiq_retries_exhausted hook to handle this. It would be nice if Faktory offered something similar so we can handle those cases programmatically.

Decouple polling-for-jobs from performing-work to improve capacity to scale out.

Currently, the connections value for FWG is used in a way that guarantees each connection will sit and block waiting for a job, then dispatch the work in its corresponding goroutine. This approach is simple, and effective -- to a point. We've found that as we increase the number of connections to Faktory, the load on Redis (or within Faktory -- we haven't profiled it very close yet) becomes a problem with many connections all waiting for jobs to perform. In our situation, we have a concurrency of 64 per node (which leaves our nodes pretty massively underutilized at the moment), and 8 nodes. Somewhere between 26-32 nodes, the contention around waiting for jobs becomes a significant issue that impacts the ability of other processes to add jobs to the queue.

We'd really like to be able to move to a concurrency of more like 1,024 per node, because we have a workload amenable to very wide-fan-out, with low per-job resource consumption (it's mostly I/O bound waiting on external APIs).

It seems to me that coupling fetching concurrency with dispatch concurrency makes it hard to address what are naturally separate bottlenecks. The right concurrency level (both per-node and aggregate across all nodes) is a separate question from the resource-intensiveness and latency of individual jobs which is a separate question from the right number of processes to poll for jobs simultaneously to ensure that available work capacity isn't starved.

My prototyping suggests we could decouple the process of fetching jobs / reporting results from the process of dispatching them quite effectively, leading to a simple means to let people address these two needs separately while only adding one more (optional) knob for them to turn. For those of us with jobs that have a high latency (and perhaps low resource consumption), but come in large bursts that make having a high concurrency (both per-node if the per-job resource consumption is small and aggregate across all nodes when the per-job resource consumption is high).

There would be some added complexity in FWG, particularly around the shutdown process but my prototyping suggests this can be done in a way where the new knob is optional and with a relatively tightly compartmentalized increase in code complexity. This would make it largely transparent for most users, while providing an important capability for those of us pushing tens of millions of jobs a day and looking to increase that considerably.

Worst-case, we could move this into a separate run method in the same package, making the structural differences completely opt-in for those who have a need for such decoupling.

Thoughts?

Release error: too many arguments in call to c.Beat

Hello @mperham, I'm using your lib but the released version has a bug.

➜ go get -u github.com/contribsys/faktory_worker_go
# github.com/contribsys/faktory_worker_go
../../golang/pkg/mod/github.com/contribsys/[email protected]/runner.go:190:24: too many arguments in call to c.Beat
	have (string)
	want ()

The runner.go calls client.Beat() and sends the current state mgr.CurrentState()

faktory_worker_go/runner.go

Lines 189 to 190 in 17dd456

err := mgr.with(func(c *faktory.Client) error {
data, err := c.Beat(mgr.CurrentState())

This version requires faktory v1.2.0-1

require (
github.com/contribsys/faktory v1.2.0-1
github.com/stretchr/testify v1.4.0
)

Which doesn't have a Beat() method that receives any argument.

https://github.com/contribsys/faktory/blob/5a6c200269abb81a8f718c9995742d847f042a5f/client/client.go#L431-L437

Would be possible to release a brand new version to get it fixed? I was using the master branch in development but I don't want to do the same for production.

Thank you very much!

Improve Readme

Motivation
Today I started playing with faktory, and its super awesome with slick UI ;)

But sadly i can't find any documentation/wiki regarding retry.

Can we and more case where we have:

  • how to retry like in sidekiq
  • how to throw exception, I am just using return errors.New("some msg") for that.

Ignore if, you might have more important priorities to deal with.

Worker/Manager per process

I'm having an issue running one worker per process.
Everything is local and I have a faktory server running on :7419.

I have a workflow consisting of AMQP message queues and Faktory jobs. AMQP messages are consumed and packaged into a faktory job, which then produces output on another message queue.

I have it set up such that each job produce/worker is run in its own process, with AMQP message queues and Faktory jobs as a means of communication.

If only one worker process for JOB_A is running, it receives jobs instantaneously. If however I run my JOB_B worker process, they both receive jobs intermittently. Looking at the retry queue, I see that the jobs not processed have an error message: unknown: No handler for JOB_A. I would say around 30-50% of jobs need to be retried, and eventually they process successfully within 5 retries. As soon as I turn off JOB_B, everything is back to normal.

In each process for JOB_A and JOB_B, because they are different processes, the following is run for each process:

mgr := faktory_worker_go.NewManager()
mgr.Register("JOB_*", fn)
mgr.Run()

If however I refactor my workers to run in a single process, calling Register twice and Run once, everything runs very smoothly:

mgr := faktory_worker_go.NewManager()
mgr.Register("JOB_A", fn_A)
mgr.Register("JOB_B", fn_B)
mgr.Run()

Is it not intended to run one worker per process?
I would really prefer to logically separate independent jobs/workers into new processes.

how can add daily job?

i want to add one daily job and this job call method every day in a year. how can i add it ?

job := faktory.NewJob("SomeJob", 1, 2, 3)
job.Queue = "default"
job.At ="2018-09-14T00:00:00.000000Z"
err = client.Push(job)

Import error when using with go modules

Recently faced an issue trying to go get the faktory client with go modules.

Go version

go version go1.11.1 darwin/amd64

Steps to reproduce

  1. Enable go modules : export GO111MODULE=on
  2. go get github.com/contribsys/faktory/client

Trace

go get -x -v github.com/contribsys/faktory/client

# /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7 for git2 https://github.com/contribsys/faktory
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git ls-remote -q https://github.com/contribsys/faktory
3.405s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git ls-remote -q https://github.com/contribsys/faktory
go: finding github.com/contribsys/faktory/client latest
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b
0.011s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git tag -l
0.010s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git tag -l
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74
0.012s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git cat-file blob 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b:client/go.mod
0.010s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git cat-file blob 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b:client/go.mod
Fetching https://github.com?go-get=1
Parsing meta tags from https://github.com?go-get=1 (status code 200)
go get github.com/contribsys/faktory/client: no matching versions for query "latest"

I realized I was getting the correct files on running go get github.com/contribsys/faktory instead.

This import mismatch causes import errors with cannot find package "github.com/contribsys/faktory/client" when using the faktory_worker_go package

I noticed this issue only happens when I use go modules

Batch ID Inconsistency

There is an inconsistency between this repository and contribsys/faktory in regards to the batch id. In this repository it expects _bid according to this line in context.go, but in contribsys/faktory it only sets bid when you push jobs to the batch.
I'm currently working around this by setting _bid manually, but I image this is not the intention since this is not added in the test/example file.

I'm not sure this is the correct repository to create this issue, but I figured it's a start.
I tried to look at the Ruby client implementation and it looks like that one is using bid, but the documentation states that custom metadata elements starting with '_' are reserved for internal use.

One additional note; It might be wise to include the batch mechanism in the specifications to avoid this type of inconsistencies in the future.

Stopping Busy Jobs

Hello all, is there a way of stopping a busy Job?

Screen Shot 2020-02-26 at 12 20 44

I don't want to run this job anymore so, I stopped the server and everything but it always tries to run when I start the server again.

thank you very much!

Context doesn't allow WithValue()

fwg's middleware is meant to provide a Context that the user can use to setup contextual data for the given job:

func SetupContext(ctx worker.Context, job *faktory.Job) error {
  aid := job.GetCustom("account_id")
  uid := job.GetCustom("user_id")
  ctx.WithValue("aid", aid).WithValue("uid", uid)
  return nil
}

mgr.Use(SetupContext)

But WithValue returns a copy of the context and it's impossible to pass on the new copy to the job execution. We'll likely need to adjust the Middleware API to something like this:

func SetupContext(ctx worker.Context, job *faktory.Job) (Context, error) {
  ...
  return ctx.WithValue("aid", aid).WithValue("uid", uid), nil
}

so that modified Contexts can be passed along the chain.

"short write" errors

We started seeing this all of a sudden last night out of the blue:

error enqueueing update-profile job: error enqueueing job: short write
…cessors.(*Processor).HandleError (/opt/reward-service/processors/processor.go:34)
…lReceiptProcessor).Process (/opt/reward-service/processors/digital_receipt.go:230)
….func1 (/go/pkg/mod/github.com/contribsys/[email protected]/manager.go:47)
…tch (/go/pkg/mod/github.com/contribsys/[email protected]/middleware.go:19)
…cessOne (/go/pkg/mod/github.com/contribsys/[email protected]/runner.go:139)
…process (/go/pkg/mod/github.com/contribsys/[email protected]/runner.go:95)
                         runtime.goexit (/usr/local/go/src/runtime/asm_amd64.s:1581)

The code in question:

func enqueueUpdateProfileJob(newRelicTxn *newrelic.Transaction, receiptID string, jobClient jobs.Client) error {
	defer newRelicTxn.StartSegment("enqueueUpdateProfileJob").End()
	jobArgument := make(map[string]interface{})
	jobArgument["id"] = receiptID
	jobArgs := []interface{}{jobArgument}
	retry := 5
	updateProfileJob := faktory.NewJob("update-profile", jobArgs...)
	updateProfileJob.Queue = "profiles"
	updateProfileJob.ReserveFor = 60
	updateProfileJob.Retry = &retry

	jpperr := jobClient.Push(updateProfileJob)
	if jpperr != nil {
		return fmt.Errorf("error enqueueing job: %v", jpperr)
	}

	return nil
}

Add test helpers

Add test helper(s) so perform functions can be called easily from a test suite.

version tag

Because there is no version tag go mod takes the last commit from master.

This caused our ci pipeline to break with the following error

/root/go/src/github.com/contribsys/faktory_worker_go/runner.go:289:9: undefined: client.Batch

After commit: f4d3451

Would it make sense to create version tags?

If anyone else has this issue you can put github.com/contribsys/faktory_worker_go v0.0.0-20191008193933-9f13abade33b in go.mod to get the stable version.

How about make *Manager.Run() return?

func (mgr *Manager) Run() currently does not return, instead, it calls func (mgr *Manager) Terminate() which in turn calls os.Exit(). I was trying to embed the worker logic with my own goroutines, and defers are required for various operations, but os.Exit() breaks them.

Is there any specific reason why Run() does not return? If not, can we change it to return, or at least make it configurable to do so?

Middleware support?

Sidekiq's server middleware is very useful for cross-cutting concerns. Provide a similar structure for the Go worker?

[QUESTION] Do I need run someFunc in go route manually?

Sorry to ask this question, but as a newbie just want to make sure I can implement the worker funcs properly.

Questions:

  1. Do I need run someFunc in go route manually? Or it is handled by faktory automatically in aync go route.
  2. Is "go route" (as it was written in test) to produce() necessary or optional, or is it better/suggested to add any task in go-route?
  3. Do these funcs necessary to init the mgr or it is optional by default setting?
mgr.Concurrency = 20
mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")

Thank you for your time and patience.

mgr.Terminate() does not wait for .Ack() to mark jobs as processed

Yesterday I added graceful shutdown to k8s pods that are running faktory_worker_go.

As expected, mgr.Run() waits for all running job processes to complete after receiving the TERM signal and then calls os.Exit(). However, I have noticed that the final job that was completed before the pods exit would stay as 'Busy'. It seems that the os.Exit() is being called before that final ACK message is sent to faktory to mark the job as being processed.

This issue is somewhat related to: #44 and I do relate to the notion expressed there. It would be nice to run some cleanup code before os.Exit() is called - currently the worker's database dependency does not get closed properly.

Thanks for this wonderful package. I've been using it for months now and have been loving it!

Logging support

Integrate Go's logging best practices. We probably shouldn't be using faktory/util's logging API but rather allow the app developer to give us a logging instance to use.

how to connect worker to a server (not localhost)

Hello

My application is running in a separate docker container.
Both the faktory and my app container are added to the same docker network.
The hostname of my faktory container is "faktory"
I can ping the "faktory" from my app container, so they can see each other.

But how do I have my worker connect to the "faktory" hostname instead of localhost?
I do not see a way to configure this?

Thank you
Cheers
Gerd

query for my use case

I want to confirm here can use faktory_worker for my use case.

My use case
Let's suppose
we have n customers c1,c2,....,cn
we have m updates for c1 and k updates for c2
All the updates come at the same time

I want to have this workflow in place
I need one worker per customer and all the updates for the respective customer should be synchronous. But updates for multiple customers can be asynchronous.

So something like
w1 - worker for customer c1
w2 - worker for customer c2

[ c1 ] [ c2 ] ........ [ cn ]
[ u1,u2,...um] [u1,u2,...uk]

c1 and c2 can be updated in parallel but updates for each customer should be synchronous.
Can I achieve this case? If yes can you please guide me on how?
If no, is there any alternative for it?

Faktory's inline mode in development or test.

Hi there.

I'm using a fake TCP server as Faktory does, just to make my tests green but now I need my jobs running "for real".

Is there such a thing as "Faktory's inline mode"? 😄

Thank you very much!

Can't test processor function which enqueues another job

I have a job processing function that needs to enqueue another job when it's done with it's work. As noted in the FAQ I'm using the helper.With() function to get a handle to a faktory.Client from the pool to push the new job. For example:

func (p *MyProcessor) Process(ctx context.Context, args ...interface{}) error {
    helper := worker.HelperFor(ctx)

    // .... do my work ....

    // enqueue a job for post-processing
    ppjerr := helper.With(func(cl *faktory.Client) error {
        jobArgument := make(map[string]interface{})
        jobArgument["id"] = theID
        jobArgs := []interface{}{jobArgument}
        postProcessingJob := faktory.NewJob("post-processing", jobArgs...)
        return cl.Push(postProcessingJob)
    })

    if ppjerr != nil {
        fmt.Printf("post processing job error: %+v\n", ppjerr)
        return ppjerr
    }
}

In the unit tests for MyProcessor, I'm using the worker.NewTestExecutor(pool) function
to send test jobs to MyProcessor. For example (test cruft removed):

// test setup
pool, _ := faktory.NewPool(2)
var executor worker.PerformExecutor =  worker.NewTestExecutor(pool)
var jobArgument map[string]interface{} = make(map[string]interface{})
jobArgument["id"] = "1234567890"
jobArgs := []interface{}{jobArgument}
var job *faktory.Job = faktory.NewJob("MyProcessor", jobArgs...)

// run processor
err := executor.Execute(job, myProcessor.Process)

// test assertions....

The enqueueing of the second job during the test is returning an error because it appears as though it's attempting to connect to Faktory to push the job, despite the fact I'm using the test executor.

post processing job job error: dial tcp [::1]:7419: connect: connection refused

How exactly do I unit test processor functions which enqueue another job without having it actually try to connect to Faktory? The structure of the code makes it awkward (impossible?) to stub/mock the faktory.Client needed to push the job.

Add better testing

Need more testing of various scenarios and better coverage. Help welcome!

EOF error

what is End of File error?
when I run test/main.go file of this repo I get EOF error in produce func.

testing/mocking pool connections and clients

I couldn't find any documentation on how to test/mock pool connections, getting a client from the pool, registering a mock handler, adding a new job etc. I am wondering what you think about this pattern, and if it makes sense:

	mgr := worker.NewManager()

	mgr.ProcessStrictPriorityQueues("fwgtest")

	mgr.Concurrency = 1

	pool, _ := faktory.NewPool(5)

	faktory.RandomProcessWid = mgr.ProcessWID

	mgr.Pool = pool

	mgr.Register("aworker", func(ctx context.Context, args ...interface{}) error {
		return nil
	})

	j := faktory.NewJob("something", 1, 2)

	perf := worker.NewTestExecutor(pool)

	err := perf.Execute(j, func(ctx context.Context, args ...interface{}) error {
		return nil
	})

	suite.NoError(err)

This doesn't work, I am just playing around and trying to figure out how to mock these guys. This is mainly for travis, and if mocking isn't possible I think I might have to install faktory and redis on the travis node.

ERR Job not found

While looking in our faktory worker logs, the log was filled with these two log statements over and over again, printed every second:

2022/09/06 12:39:51.608495 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:51.608541 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:52.609932 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:52.610081 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:53.611083 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:53.611083 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:54.612050 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:54.612227 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:55.613489 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:55.613531 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
........etc........

I searched for this error message in our code and did not find anything, so I assume this is coming from Faktory itself. What does this mean and how do I get it to stop spamming our logs?

Unable to fetch Job Object in Perform function

I recently integrated Faktory into my GO application. I initialized a worker manager with the job type and the Perform function as mentioned below

mgr.Register("TicketGeneration", ReceiveJobs)

In the "ReceiveJobs" function I only get access to the "args". However, I am not able to fetch the "Job". From the context I'm only able to fetch the "Helper" object.

func ReceiveJobs(ctx context.Context, args ...interface{}) {
      help := worker.HelperFor(ctx)
}
 Jid() string
 JobType() string
 Bid() string
 Batch(func(*client.Batch) error) error
 With(func(*client.Client) error) error
 TrackProgress(percent int, desc string, reserveUntil *time.Time) error

Is there a way that I can use the "Jid" from the helper object to fetch the job object ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.