contribsys / faktory_worker_go Goto Github PK
View Code? Open in Web Editor NEWFaktory workers for Go
License: Mozilla Public License 2.0
Faktory workers for Go
License: Mozilla Public License 2.0
We have faktory setup in GCP.
There is a compute engine VM running docker image contribsys/faktory:1.5.1 and a cloud run worker service using github.com/contribsys/faktory_worker_go v1.4.2
A few days ago, for several hours we saw saw heartbeat error: ERR Unknown worker 4pf5cn20s4elh
every 10 seconds.
It looks like there was a network hiccup so it makes sense that faktory wasn't able to get a heartbeat from a worker for a time so it assumed the worker was bad and no longer recognized it.
What doesn't make sense if during the time of this error, it looks like the worker microservice was still able to process some events. The error was going on for hours and we see quite a few events still got processed throughout that time.
Is this expected? Trying to understand faktory better to better debug and utilize it.
About a week ago, I noticed that the enqueued jobs was growing.
I tried updating from faktory 1..1.0-1 to 1.2.0-1. I also updated faktory_worker_go to the latest release. This did not solve the problem. On some of my jobs, they might kick off another job right before they return. I've noticed that the kick-off job does not get removed from the queue while the new job gets added. The worker is still running, though it is not picking up new jobs, nor does it seem to tell faktory that it is done with existing jobs. The worker is running and has no logs. My worker is configured as so:
// Run faktory worker server and register jobs
mgr := worker.NewManager()
// faktory.RegisterJobs(mgr, procedures) - application code which calls mgr.Register for each known job
mgr.ProcessStrictPriorityQueues(faktory.Critical, faktory.Default, faktory.Bulk)
mgr.Run()
Please let me know how else I can help debug.
Add an Appveyor Windows CI build to ensure the Go worker process can execute successfully on Windows.
Hello again,
When running multiple workers in different docker containers, they all end up having the same worker id. And it confuses the faktory server thinking there is only one worker.
https://github.com/contribsys/faktory_worker_go/blob/master/runner.go#L106 rand.Int63()
by default always uses the same seed value of 1
. So it returns the same deterministic values when invoked repeatedly. I think it should use an unique id generator instead of a random number for worker id.
Hi,
I've successfully make faktory_worker_ruby working but I can't make it worked with go.
Faktory is installed and is running.
I copy and run the test/main.go in my project and it didn't work. So I clone this repo and tried to run test/main.go with no success.
I'm having the following error in both cases:
nicolas@mbp-nicolas faktory_worker_go % go run test/main.go
2021/01/19 18:01:35.217337 faktory_worker_go 1.4.0 PID 88049 now ready to process jobs
&{Pool:0xc000190220}
panic: Expected: <nil>
goroutine 6 [running]:
main.unique.func1(0xc0001922c0, 0xc0001922c0, 0x0)
/Users/nicolas/www/others/faktory_worker_go/test/main.go:117 +0x359
github.com/contribsys/faktory/client.(*Pool).With(0xc00018a2b0, 0xc0001a7fa0, 0x0, 0x0)
/Users/nicolas/go/pkg/mod/github.com/contribsys/[email protected]/client/pool.go:54 +0x9f
main.unique()
/Users/nicolas/www/others/faktory_worker_go/test/main.go:98 +0x127
main.main.func3(0xc00001c3f0)
/Users/nicolas/www/others/faktory_worker_go/test/main.go:80 +0x38
created by main.main
/Users/nicolas/www/others/faktory_worker_go/test/main.go:74 +0x6bb
exit status 2
Thanks in advance for your help
I recently upgraded our Faktory server to 1.4.0 but I am now seeing that our golang workers always say started less than 1minute ago. I get the same behavior for 1.3.0. If you refresh the page frequently you can see the process disappear. strangely it does still appear to run jobs, but I find it concerning that it is disappearing.
tested locally with
github.com/contribsys/faktory_worker_go v1.3.0-1
Is there a best practice as to how to mock faktory for testing?
To understand how the pools work I created a pool with capacity of 3.
pool, err := faktory.NewPool(3)
if err != nil {
log.Fatalln("Unable to connect to Faktory", err.Error())
}
log.Println("Pool created, length = ", pool.Len()) // length = 0
Then I called .Get()
10 times, expecting it to fail from the fourth call to .Get()
. But all 10 calls succeeded.
cli := []*faktory.Client{}
for i := 0; i < 10; i++ {
c, err := pool.Get()
if err != nil {
log.Fatalln("Unable to get client from pool", err.Error()) // never reached
}
cli = append(cli, c)
fmt.Println("Got client ", i+1)
fmt.Println("Length of pool", pool.Len())
}
I was even able to push jobs using all 10 *faktory.Client
.
for i := 0; i < 10; i++ {
job := faktory.NewJob("lazy", i)
cli[i].Push(job) // succeeded, job was seen from web ui
}
Finally I tried to return all of them, but only 5 clients were returns.
for i := 0; i < 10; i++ {
c := cli[i]
pool.Put(c)
fmt.Println("Got client ", i+1)
fmt.Println("Length of pool", pool.Len()) // didn't go beyond 5.
}
Is this an expected behavior? If yes, how to control the total number of connections?
FWR implements a hard shutdown timeout of 25 seconds, exactly like Sidekiq, but FWG never implemented it because contexts were not part of the original API design. A year or two later we integrated Contexts but only to provide access to helper Values.
Today FWG will pause forever, waiting for all jobs to stop. Most jobs execute quickly and finish within 30 seconds but longer jobs can delay shutdown, leading to platforms like Heroku KILLing FWG without warning, orphaning those long jobs for the reservation timeout (default: 30 minutes).
The proper and canonical shutdown process should:
FAIL
any jobs in (5) so they can be re-executed soonIdeally this will guarantee that FWG can gracefully stop the vast majority of jobs and FAIL
any which linger past 25 seconds so they can be re-executed quickly after process restart.
This is the fix for contribsys/faktory#468
I used https://github.com/gocraft/work before this. In that, I can create my own Context and can use it in middleware and inside the worker func.
In faktory, I can get the value from the context from the interface. But I could not set the value to the context.
Can you help me to set context values?
like SetValue("log_map", map[string]interface{}{"app": "demo app", "user_id": 12})
Because our logs print current user related values in all logs using the context.
Queues
was being exported, but was renamed to queues
causing apps to break.
https://github.com/contribsys/faktory_worker_go/blob/master/runner.go#L42
Is the intent to use ProcessStrictPriorityQueues
and/or ProcessWeightedPriorityQueues
in lieu of Queues
?
I'm seeing fairly frequent instances in which a job appears to be running for 30 minutes -- but, as far as I can tell, is not.
One of the job types where I see this has a metric that is recorded to DataDog when it successfully completes. That metric is never above 90 seconds or so. I am seeing some job failures here and there, but they are all below 30 minutes.
Instead, I suspect this is related to another problem we're having: We're seeing bursts of dial errors with workers and other clients trying to connect to Faktory unsuccessfully. What would happen if a job completed (success or failure), but the worker was unable to report that status back to the server because it couldn't get a connection to the server?
Looking at faktory_worker_go
:
https://github.com/contribsys/faktory_worker_go/blob/master/runner.go#L260
mgr.with(func(c *faktory.Client) error {
if err != nil {
return c.Fail(job.Jid, err, nil)
} else {
return c.Ack(job.Jid)
}
})
It appears that .with
can return an error, but that this error is routinely ignored.
I don't think calling panic
is appropriate for this situation, but perhaps some combination of the following might be helpful:
The first option would have some risks/challenges of its own, of course. You'd need to ensure the connection didn't time out, handle reconnecting if it did go away (either due to timeout or a server failure), etc. I'm sure you have more insight into how it would impact operations concerns in general, so forgive me if it's an Obviously Stupid Idea. That said, for situations involving a relatively high job volume (mid-hundreds- low-thousands per second), the many-transient-connections thing has proven to be a bit of a challenge (I'm paying attention to #219 / #222 for this reason, and we've been having to be careful about tuning things like FIN_WAIT
and such in our server configuration).
I am currently using the testing
package to test the worker's perform method. Within my worker's method, I am pushing new jobs to a different worker and I would like to assert if that is happening.
myFunc := func(ctx context.Context, args ...interface{}) error {
return help.With(func(cl *faktory.Client) error {
job := faktory.NewJob("AnotherJob", 2)
return cl.Push(job)
})
}
pool, err := faktory.NewPool(5)
perf := worker.NewTestExecutor(pool)
somejob := faktory.NewJob("OneJob", 1)
err = perf.Execute(somejob, myFunc)
// assert if another job is pushed...
It would be really helpful to have such helper.
I would like to pass an array of Labels to be displayed in the UI, like golang
is currently.
https://github.com/contribsys/faktory/blob/50e383d42aede70c4fd8645c636f5295e55be988/client/client.go#L375
Any ideas how to do this when configuring the manager?
I have a series of golang tests using a local pool with concurrency of 1. It seems that the worker state is leaking between unit tests nondeterministically, and I'm struggling to find in the docs or source code where this could be happening. We initialize a struct called Worker that contains a faktory_worker_go.Manager instance, and call worker.manager.Register(job_name, worker.JobFunc) for all of our job handlers. Inside the handlers, the reference to the worker struct sometimes does not contain variables from the current test, and instead contains old variables from previous tests. Any ideas why?
https://github.com/contribsys/faktory/blob/master/server/commands.go#L57
Setting this in the server creates a problem wherein Go-based clients cannot set the retry count to 0: The use of omitempty
in the definition of Job
will result in any field whose value is the default for that type being omitted. Since 0
is the default value for int
in Go, the JSON marshaller omits the field when this is set to zero.
You could address this by making the field a *int
, but that will result in compile errors for existing Go clients that attempt to set this value. You could also address this by removing the omitempty
declaration, but that will break clients silently if they construct a Job
without going through faktory.NewJob
and don't set this field properly.
Regardless, I'm now stuck in a situation where my only option is to use -1
and thus send jobs to the dead queue, when I'd prefer they get discarded. Any suggestions here?
Hey Mike, sorry for bothering you too much.
I have a question, Is there a way of sending a struct
to a job?
import (
faktory "github.com/contribsys/faktory/client"
)
type Potato struct {
id string
name string
}
potato := &Potato{"123", "Go potato!!"}
client, err := faktory.Open()
job := faktory.NewJob("SomeJob", potato)
err = client.Push(job)
I'm not sure if I have to use job.SetCustom("potato", potato)
or not... If I have to do it, I don't know how to get potato from the job. 🤕
thank you very much!
Is there any to get the number of jobs that are in the Enqueued status? I know it can be done from the /info endpoint but can it be added to the log so it can be scraped? Any other suggestions?
Thanks
As of today, there's no way to know when a Job is about to die besides taking a look on Faktory's web UI Morgue view.
Sidekiq offers a sidekiq_retries_exhausted
hook to handle this. It would be nice if Faktory offered something similar so we can handle those cases programmatically.
Hello All,
I have a health check function in my web service and need to check if the factory worker is running normally and able to fetch jobs.
is there any way or method to check this?
Currently, the connections value for FWG is used in a way that guarantees each connection will sit and block waiting for a job, then dispatch the work in its corresponding goroutine. This approach is simple, and effective -- to a point. We've found that as we increase the number of connections to Faktory, the load on Redis (or within Faktory -- we haven't profiled it very close yet) becomes a problem with many connections all waiting for jobs to perform. In our situation, we have a concurrency of 64 per node (which leaves our nodes pretty massively underutilized at the moment), and 8 nodes. Somewhere between 26-32 nodes, the contention around waiting for jobs becomes a significant issue that impacts the ability of other processes to add jobs to the queue.
We'd really like to be able to move to a concurrency of more like 1,024 per node, because we have a workload amenable to very wide-fan-out, with low per-job resource consumption (it's mostly I/O bound waiting on external APIs).
It seems to me that coupling fetching concurrency with dispatch concurrency makes it hard to address what are naturally separate bottlenecks. The right concurrency level (both per-node and aggregate across all nodes) is a separate question from the resource-intensiveness and latency of individual jobs which is a separate question from the right number of processes to poll for jobs simultaneously to ensure that available work capacity isn't starved.
My prototyping suggests we could decouple the process of fetching jobs / reporting results from the process of dispatching them quite effectively, leading to a simple means to let people address these two needs separately while only adding one more (optional) knob for them to turn. For those of us with jobs that have a high latency (and perhaps low resource consumption), but come in large bursts that make having a high concurrency (both per-node if the per-job resource consumption is small and aggregate across all nodes when the per-job resource consumption is high).
There would be some added complexity in FWG, particularly around the shutdown process but my prototyping suggests this can be done in a way where the new knob is optional and with a relatively tightly compartmentalized increase in code complexity. This would make it largely transparent for most users, while providing an important capability for those of us pushing tens of millions of jobs a day and looking to increase that considerably.
Worst-case, we could move this into a separate run method in the same package, making the structural differences completely opt-in for those who have a need for such decoupling.
Thoughts?
Hello @mperham, I'm using your lib but the released version has a bug.
➜ go get -u github.com/contribsys/faktory_worker_go
# github.com/contribsys/faktory_worker_go
../../golang/pkg/mod/github.com/contribsys/[email protected]/runner.go:190:24: too many arguments in call to c.Beat
have (string)
want ()
The runner.go
calls client.Beat()
and sends the current state mgr.CurrentState()
Lines 189 to 190 in 17dd456
This version requires faktory v1.2.0-1
Lines 5 to 8 in 17dd456
Which doesn't have a Beat()
method that receives any argument.
Would be possible to release a brand new version to get it fixed? I was using the master branch in development but I don't want to do the same for production.
Thank you very much!
Motivation
Today I started playing with faktory, and its super awesome with slick UI ;)
But sadly i can't find any documentation/wiki regarding retry.
Can we and more case where we have:
return errors.New("some msg")
for that.Ignore if, you might have more important priorities to deal with.
I'm having an issue running one worker per process.
Everything is local and I have a faktory server running on :7419.
I have a workflow consisting of AMQP message queues and Faktory jobs. AMQP messages are consumed and packaged into a faktory job, which then produces output on another message queue.
I have it set up such that each job produce/worker is run in its own process, with AMQP message queues and Faktory jobs as a means of communication.
If only one worker process for JOB_A is running, it receives jobs instantaneously. If however I run my JOB_B worker process, they both receive jobs intermittently. Looking at the retry queue, I see that the jobs not processed have an error message: unknown: No handler for JOB_A
. I would say around 30-50% of jobs need to be retried, and eventually they process successfully within 5 retries. As soon as I turn off JOB_B, everything is back to normal.
In each process for JOB_A and JOB_B, because they are different processes, the following is run for each process:
mgr := faktory_worker_go.NewManager()
mgr.Register("JOB_*", fn)
mgr.Run()
If however I refactor my workers to run in a single process, calling Register twice and Run once, everything runs very smoothly:
mgr := faktory_worker_go.NewManager()
mgr.Register("JOB_A", fn_A)
mgr.Register("JOB_B", fn_B)
mgr.Run()
Is it not intended to run one worker per process?
I would really prefer to logically separate independent jobs/workers into new processes.
i want to add one daily job and this job call method every day in a year. how can i add it ?
job := faktory.NewJob("SomeJob", 1, 2, 3)
job.Queue = "default"
job.At ="2018-09-14T00:00:00.000000Z"
err = client.Push(job)
Recently faced an issue trying to go get
the faktory client with go modules.
Go version
go version go1.11.1 darwin/amd64
export GO111MODULE=on
go get github.com/contribsys/faktory/client
go get -x -v github.com/contribsys/faktory/client
# /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7 for git2 https://github.com/contribsys/faktory
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git ls-remote -q https://github.com/contribsys/faktory
3.405s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git ls-remote -q https://github.com/contribsys/faktory
go: finding github.com/contribsys/faktory/client latest
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b
0.011s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git tag -l
0.010s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git tag -l
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74
0.012s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git -c log.showsignature=false log -n1 '--format=format:%H %ct %D' 2dcf6cf85e74
cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git cat-file blob 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b:client/go.mod
0.010s # cd /Users/kuro/go/pkg/mod/cache/vcs/d50afae69b1c3619314315ef9389bb8370fcf9f95d50001693c0adc2a96c1ae7; git cat-file blob 2dcf6cf85e74b86f1157310d300fbe7c07eaa14b:client/go.mod
Fetching https://github.com?go-get=1
Parsing meta tags from https://github.com?go-get=1 (status code 200)
go get github.com/contribsys/faktory/client: no matching versions for query "latest"
I realized I was getting the correct files on running go get github.com/contribsys/faktory
instead.
This import mismatch causes import errors with cannot find package "github.com/contribsys/faktory/client"
when using the faktory_worker_go
package
I noticed this issue only happens when I use go modules
There is an inconsistency between this repository and contribsys/faktory in regards to the batch id. In this repository it expects _bid
according to this line in context.go, but in contribsys/faktory it only sets bid
when you push jobs to the batch.
I'm currently working around this by setting _bid
manually, but I image this is not the intention since this is not added in the test/example file.
I'm not sure this is the correct repository to create this issue, but I figured it's a start.
I tried to look at the Ruby client implementation and it looks like that one is using bid
, but the documentation states that custom metadata elements starting with '_' are reserved for internal use.
One additional note; It might be wise to include the batch mechanism in the specifications to avoid this type of inconsistencies in the future.
Worker doesn't response to shutdown/quiet actions from the server. https://github.com/contribsys/faktory_worker_go/blob/master/runner.go#L140 . Here server is returning json response of {"state":"quiet"}
, but the client is checking against plain text. Might need a json parsing. I can send a PR with the fix.
fwg's middleware is meant to provide a Context that the user can use to setup contextual data for the given job:
func SetupContext(ctx worker.Context, job *faktory.Job) error {
aid := job.GetCustom("account_id")
uid := job.GetCustom("user_id")
ctx.WithValue("aid", aid).WithValue("uid", uid)
return nil
}
mgr.Use(SetupContext)
But WithValue
returns a copy of the context and it's impossible to pass on the new copy to the job execution. We'll likely need to adjust the Middleware API to something like this:
func SetupContext(ctx worker.Context, job *faktory.Job) (Context, error) {
...
return ctx.WithValue("aid", aid).WithValue("uid", uid), nil
}
so that modified Contexts can be passed along the chain.
We started seeing this all of a sudden last night out of the blue:
error enqueueing update-profile job: error enqueueing job: short write
…cessors.(*Processor).HandleError (/opt/reward-service/processors/processor.go:34)
…lReceiptProcessor).Process (/opt/reward-service/processors/digital_receipt.go:230)
….func1 (/go/pkg/mod/github.com/contribsys/[email protected]/manager.go:47)
…tch (/go/pkg/mod/github.com/contribsys/[email protected]/middleware.go:19)
…cessOne (/go/pkg/mod/github.com/contribsys/[email protected]/runner.go:139)
…process (/go/pkg/mod/github.com/contribsys/[email protected]/runner.go:95)
runtime.goexit (/usr/local/go/src/runtime/asm_amd64.s:1581)
The code in question:
func enqueueUpdateProfileJob(newRelicTxn *newrelic.Transaction, receiptID string, jobClient jobs.Client) error {
defer newRelicTxn.StartSegment("enqueueUpdateProfileJob").End()
jobArgument := make(map[string]interface{})
jobArgument["id"] = receiptID
jobArgs := []interface{}{jobArgument}
retry := 5
updateProfileJob := faktory.NewJob("update-profile", jobArgs...)
updateProfileJob.Queue = "profiles"
updateProfileJob.ReserveFor = 60
updateProfileJob.Retry = &retry
jpperr := jobClient.Push(updateProfileJob)
if jpperr != nil {
return fmt.Errorf("error enqueueing job: %v", jpperr)
}
return nil
}
Add test helper(s) so perform functions can be called easily from a test suite.
Because there is no version tag go mod takes the last commit from master.
This caused our ci pipeline to break with the following error
/root/go/src/github.com/contribsys/faktory_worker_go/runner.go:289:9: undefined: client.Batch
After commit: f4d3451
Would it make sense to create version tags?
If anyone else has this issue you can put github.com/contribsys/faktory_worker_go v0.0.0-20191008193933-9f13abade33b
in go.mod to get the stable version.
func (mgr *Manager) Run()
currently does not return, instead, it calls func (mgr *Manager) Terminate()
which in turn calls os.Exit()
. I was trying to embed the worker logic with my own goroutines, and defer
s are required for various operations, but os.Exit()
breaks them.
Is there any specific reason why Run()
does not return? If not, can we change it to return, or at least make it configurable to do so?
Sidekiq's server middleware is very useful for cross-cutting concerns. Provide a similar structure for the Go worker?
Sorry to ask this question, but as a newbie just want to make sure I can implement the worker funcs properly.
Questions:
mgr.Concurrency = 20
mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")
Thank you for your time and patience.
Yesterday I added graceful shutdown to k8s pods that are running faktory_worker_go.
As expected, mgr.Run() waits for all running job processes to complete after receiving the TERM signal and then calls os.Exit(). However, I have noticed that the final job that was completed before the pods exit would stay as 'Busy'. It seems that the os.Exit() is being called before that final ACK message is sent to faktory to mark the job as being processed.
This issue is somewhat related to: #44 and I do relate to the notion expressed there. It would be nice to run some cleanup code before os.Exit() is called - currently the worker's database dependency does not get closed properly.
Thanks for this wonderful package. I've been using it for months now and have been loving it!
Integrate Go's logging best practices. We probably shouldn't be using faktory/util
's logging API but rather allow the app developer to give us a logging instance to use.
Hello
My application is running in a separate docker container.
Both the faktory and my app container are added to the same docker network.
The hostname of my faktory container is "faktory"
I can ping the "faktory" from my app container, so they can see each other.
But how do I have my worker connect to the "faktory" hostname instead of localhost?
I do not see a way to configure this?
Thank you
Cheers
Gerd
I want to confirm here can use faktory_worker for my use case.
My use case
Let's suppose
we have n customers c1,c2,....,cn
we have m updates for c1 and k updates for c2
All the updates come at the same time
I want to have this workflow in place
I need one worker per customer and all the updates for the respective customer should be synchronous. But updates for multiple customers can be asynchronous.
So something like
w1 - worker for customer c1
w2 - worker for customer c2
[ c1 ] [ c2 ] ........ [ cn ]
[ u1,u2,...um] [u1,u2,...uk]
c1 and c2 can be updated in parallel but updates for each customer should be synchronous.
Can I achieve this case? If yes can you please guide me on how?
If no, is there any alternative for it?
Hi there.
I'm using a fake TCP server as Faktory does, just to make my tests green but now I need my jobs running "for real".
Is there such a thing as "Faktory's inline mode"? 😄
Thank you very much!
This code will not compile on Windows (1.3.0-1):
faktory_worker_go/runner_windows.go
Lines 17 to 20 in de58a75
But this matches the rest of the package:
signalMap = map[os.Signal]string{
SIGTERM: "terminate",
SIGINT: "terminate",
}
I have a job processing function that needs to enqueue another job when it's done with it's work. As noted in the FAQ I'm using the helper.With()
function to get a handle to a faktory.Client
from the pool to push the new job. For example:
func (p *MyProcessor) Process(ctx context.Context, args ...interface{}) error {
helper := worker.HelperFor(ctx)
// .... do my work ....
// enqueue a job for post-processing
ppjerr := helper.With(func(cl *faktory.Client) error {
jobArgument := make(map[string]interface{})
jobArgument["id"] = theID
jobArgs := []interface{}{jobArgument}
postProcessingJob := faktory.NewJob("post-processing", jobArgs...)
return cl.Push(postProcessingJob)
})
if ppjerr != nil {
fmt.Printf("post processing job error: %+v\n", ppjerr)
return ppjerr
}
}
In the unit tests for MyProcessor, I'm using the worker.NewTestExecutor(pool)
function
to send test jobs to MyProcessor. For example (test cruft removed):
// test setup
pool, _ := faktory.NewPool(2)
var executor worker.PerformExecutor = worker.NewTestExecutor(pool)
var jobArgument map[string]interface{} = make(map[string]interface{})
jobArgument["id"] = "1234567890"
jobArgs := []interface{}{jobArgument}
var job *faktory.Job = faktory.NewJob("MyProcessor", jobArgs...)
// run processor
err := executor.Execute(job, myProcessor.Process)
// test assertions....
The enqueueing of the second job during the test is returning an error because it appears as though it's attempting to connect to Faktory to push the job, despite the fact I'm using the test executor.
post processing job job error: dial tcp [::1]:7419: connect: connection refused
How exactly do I unit test processor functions which enqueue another job without having it actually try to connect to Faktory? The structure of the code makes it awkward (impossible?) to stub/mock the faktory.Client needed to push the job.
Need more testing of various scenarios and better coverage. Help welcome!
what is End of File error?
when I run test/main.go file of this repo I get EOF error in produce func.
I couldn't find any documentation on how to test/mock pool connections, getting a client from the pool, registering a mock handler, adding a new job etc. I am wondering what you think about this pattern, and if it makes sense:
mgr := worker.NewManager()
mgr.ProcessStrictPriorityQueues("fwgtest")
mgr.Concurrency = 1
pool, _ := faktory.NewPool(5)
faktory.RandomProcessWid = mgr.ProcessWID
mgr.Pool = pool
mgr.Register("aworker", func(ctx context.Context, args ...interface{}) error {
return nil
})
j := faktory.NewJob("something", 1, 2)
perf := worker.NewTestExecutor(pool)
err := perf.Execute(j, func(ctx context.Context, args ...interface{}) error {
return nil
})
suite.NoError(err)
This doesn't work, I am just playing around and trying to figure out how to mock these guys. This is mainly for travis, and if mocking isn't possible I think I might have to install faktory and redis on the travis node.
While looking in our faktory worker logs, the log was filled with these two log statements over and over again, printed every second:
2022/09/06 12:39:51.608495 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:51.608541 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:52.609932 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:52.610081 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:53.611083 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:53.611083 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:54.612050 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:54.612227 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
2022/09/06 12:39:55.613489 ERR Job not found 148ec8b7-8417-45e9-bbc0-66f7a1a4d7e0
2022/09/06 12:39:55.613531 ERR Job not found 9e286640-ec97-47f2-9a49-d50c5e6ed0a2
........etc........
I searched for this error message in our code and did not find anything, so I assume this is coming from Faktory itself. What does this mean and how do I get it to stop spamming our logs?
When a job fails, the log is incorrectly formatted like this: Error running %s job %s: %v my_job atjFoe0nfoIw5C9x my error
Caused by
Line 131 in b22d62e
I recently integrated Faktory into my GO application. I initialized a worker manager with the job type and the Perform function as mentioned below
mgr.Register("TicketGeneration", ReceiveJobs)
In the "ReceiveJobs" function I only get access to the "args". However, I am not able to fetch the "Job". From the context I'm only able to fetch the "Helper" object.
func ReceiveJobs(ctx context.Context, args ...interface{}) {
help := worker.HelperFor(ctx)
}
Jid() string
JobType() string
Bid() string
Batch(func(*client.Batch) error) error
With(func(*client.Client) error) error
TrackProgress(percent int, desc string, reserveUntil *time.Time) error
Is there a way that I can use the "Jid" from the helper object to fetch the job object ?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.