taylorchu / work Goto Github PK
View Code? Open in Web Editor NEWgocraft/work v2 prototype
License: MIT License
gocraft/work v2 prototype
License: MIT License
Dear devs, currently I'm using v1, but to use Redis Cluster, we're trying to change our package from v1 to v2. And I wonder what value should I use for QueueID. Also, what's the purpose of QueueID?
Many thanks for developing this new version of work.
*** Issue Solved ***
Enqueuer:
package main
import (
"fmt"
"github.com/go-redis/redis"
"github.com/taylorchu/work"
)
type message struct {
Text string
}
func main() {
client := newRedisClient()
defer client.Close()
job := work.NewJob()
err := job.MarshalPayload(message{Text: "hello"})
if (err != nil) {
panic(err)
}
err = work.NewRedisQueue(client).Enqueue(job, &work.EnqueueOptions{
Namespace: "ns1",
QueueID: "worker",
})
if (err != nil) {
panic(err)
}
fmt.Println("job submitted")
}
func newRedisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
PoolSize: 10,
MinIdleConns: 10,
})
}
Worker:
package main
import (
"fmt"
"os"
"os/signal"
"time"
"github.com/go-redis/redis"
"github.com/taylorchu/work"
)
type message struct {
Text string
}
func main() {
client := newRedisClient()
defer client.Close()
w := work.NewWorker(&work.WorkerOptions{
Namespace: "ns1",
Queue: work.NewRedisQueue(client),
})
err := w.Register("worker",
func(job *work.Job, _ *work.DequeueOptions) error {
var msg message
job.UnmarshalPayload(&msg)
fmt.Println(msg)
return nil
},
&work.JobOptions{
MaxExecutionTime: time.Second,
IdleWait: time.Second,
NumGoroutines: 2,
},
)
if (err != nil) {
panic(err)
}
fmt.Println("starting")
w.Start()
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
fmt.Println("stopped")
w.Stop()
}
func newRedisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
PoolSize: 10,
MinIdleConns: 10,
})
}
This line does not work -
Line 58 in 17fd835
As nothing is ever done when context deadline is exceeded. Perhaps move this logic to worker loop?
I have a requirement to support changing the Delay of a previous enqueued job and just removing it completely.
Not sure if I've missed something but doesn't seem possible.
So the only way to get latest version is to point to specific commit.
Is this backward compatible with v1? If not, is there a timeline for this? Thanks!
No new release has been pushed since 19 Sep. 2019 even though you've been busy with tweaks and fixes. Any chance you can tag the current master?
One of the biggest reasons we used https://github.com/gocraft/work is that it has the ability in the UI to retry jobs manually that have exhausted their automatic retries. Given that this project has moved the UI solution to prometheus/grafana, are there any plans for covering the need to retry dead jobs manually?
Hey guys do you still support the periodic jobs?
I managed to use the schedule job at runtime, but I was not able to set up a periodic job
I was wondering, does this repository still have the reaper element from gocraft/work? As in, what happens to jobs that are stuck?
The README doesn't really explain what from v1 still exists which makes it hard to refactor my codebase (since I don't need what I have to build myself and what is already done by this repo).
Hi,
How to use the unique enqueue middleware (and other EnqueueMiddleware) with redis queue? I can't find it in any source codes or docs :|
Within the same queue:
Test in #67 demonstrates the issue.
can you add some examples and docs, so for the people who're interested on this project can get started on this without digging into source code or godoc ....
i'm evaluating this, i can contribute some examples ...
What do you think about merging this?
I've been using Work v1 and watching this repo - has v2 been abandoned? The original repo has much more activity to date.
I ended up creating middleware for catching panics similar to how you are doing it worker.go
I am using logger middleware and it was not getting the panic error message because it was happening outside of the middleware process.
Single job is inserted, however, multiple deques are executed. What should I do to prevent multiple dequeueing?
var testNamespace = "this-is-test-namespace"
var testQueueID = "this-is-test-queue-id"
func workFunc(job *work.Job, jobOpts *work.DequeueOptions) error {
var args map[string]interface{}
if err := job.UnmarshalPayload(&args); err != nil {
return err
}
contractAddr := ""
ok := false
if contractAddrFromMsg, exist := args[contractAddrKey]; exist {
if contractAddr, ok = contractAddrFromMsg.(string); !ok {
return fmt.Errorf("aadsada")
}
}
if contractAddr == "" {
return fmt.Errorf("empty contract addr")
}
fmt.Println("successful deque")
time.Sleep(500 * time.Millisecond)
fmt.Println("successful return")
return nil
}
func TestTaylorChuWork(t *testing.T) {
m, err := miniredis.Run()
if err != nil {
t.Fatal("Failed to start", "err", err)
}
uc := redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: []string{m.Addr()},
})
queue := work.NewRedisQueue(uc)
worker := work.NewWorker(&work.WorkerOptions{
Namespace: testNamespace,
Queue: queue,
})
err = worker.Register(
testQueueID,
workFunc,
&work.JobOptions{
//WorkerOptions: work.WorkerOptions{},
MaxExecutionTime: 10 * time.Second,
IdleWait: 1 * time.Second,
NumGoroutines: 10,
DequeueMiddleware: nil,
HandleMiddleware: nil,
}, )
if err != nil {
t.Fatal(err)
}
worker.Start()
job := work.NewJob()
job, err = job.WithPayload(map[string]interface{}{contractAddrKey : contractAddrKey})
if err != nil {
t.Fatal(err)
}
err = queue.Enqueue(job, &work.EnqueueOptions{Namespace:testNamespace, QueueID: testQueueID})
if err != nil {
t.Fatal(err)
}
time.Sleep(10*time.Second)
}
=== RUN TestTaylorChuWork
successful deque
successful deque
successful return
successful return
Looking at this loop here:
Lines 214 to 253 in f26f39f
The default case has no sleep/wait so will just spin like mad even when no work.
In https://github.com/gocraft/work/ it was possible to assign priority using
pool.JobWithOptions("name", work.JobOptions{Priority: 10}, fn interface{})
Is this still possible in this new version or all jobs equal?
I modded the original gocraft/work a bit to suit my needs and would really like to see these natively:
I got so far with a test, but without a context I'm unable to proceed and start testing. I await future updates with interest! 👍
Many thanks
now we have queue-level global concurrency middleware to ensure that at any time, no more than N works are running a job.
The new middleware can let user to determine a custom key and rate limit based on:
Hi there,
I am trying to have a cron job that only runs once every X hours (or daily, etc.) despite there being multiple instances of my application. When the job is instantaneous (or close, i.e. prints a line and returns) it works as expected (we run the job with a UniqueJobId
and our uniqueness constraint is triggered (see below).
...
foundJobs, err := c.BulkFindJobs(job.ID)
if err != nil {
return err
}
if len(foundJobs) > 0 && foundJobs[0] != nil {
logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
return nil
}
...
But when the job is longer (our job takes several minutes to complete), the unique constraint is ignored across instances and the job is dequeued several times and wrongly runs several times (once per instance). We've tried to use InvisibleSec
but we have found that other jobs just run after that time period-- i.e. if the job is set to run at 5:00 and InvisibleSec
is 60, one instance's job runs (correctly) at 5:00 and another runs at 5:01. We've also tried to see what we can do with EnqueueDelay
but that does not seem to be working either.
Any help/insight would be greatly appreciated! See below for how we are setting up our cron service.
// called on application start-up
func main() {
...
redisClient := application.BuildRedisClient()
jobsClient := application.BuildJobsClient(redisClient)
core := core.New(
core.Config{
...
JobsClient: jobsClient,
RedisClient: redisClient,
...
})
...
group.Go(func() error {
cron.CronHandler(jobsClient, context.Background())
return nil
})
}
// JobsHandler, also called on start-up
func JobsHandler(redisClient *redis.ClusterClient, handlerFunc work.ContextHandleFunc) {
jobWorker := work.NewWorker(&work.WorkerOptions{
Namespace: jobs.NAMESPACE,
Queue: work.NewRedisQueue(redisClient),
ErrorFunc: func(err error) {
log.Println(err)
},
})
jobOpts := &work.JobOptions{
MaxExecutionTime: time.Minute,
IdleWait: time.Second,
NumGoroutines: 4,
HandleMiddleware: []work.HandleMiddleware{
logrus.HandleFuncLogger,
catchPanic,
},
}
for queueName := range jobs.JOB_QUEUES {
jobWorker.RegisterWithContext(string(queueName), handlerFunc, jobOpts)
}
jobWorker.Start()
}
// cron service
package cron
import (
"context"
"main/entities/jobs"
"main/lib/errors"
"github.com/robfig/cron/v3"
)
func CronHandler(jobsClient jobs.Client, ctx context.Context) {
c := cron.New()
c.AddFunc("50 * * * *", func() { enqueueOurJob(jobsClient, ctx) })
c.Start()
return
}
func enqueueOurJob(jobsClient jobs.Client, ctx context.Context) {
uniqueId := "uniqueId"
enqueueJobParams, err := jobs.CreateEnqueueJobParams(jobs.CreateEnqueueJobParamsArgs{
Name: jobs.OurJob,
UniqueJobId: &uniqueId,
}, &jobs.OurJobPayload{})
err = jobsClient.EnqueueJob(ctx, *enqueueJobParams)
}
func (c *client) EnqueueJob(ctx context.Context, jobParams EnqueueJobParams) error {
job := work.NewJob()
if jobParams.uniqueJobId != nil {
job.ID = *jobParams.uniqueJobId
}
if jobParams.enqueueDelay != nil {
job = job.Delay(*jobParams.enqueueDelay)
}
if err := job.MarshalJSONPayload(string(jobParams.jobPayload)); err != nil {
return err
}
foundJobs, err := c.BulkFindJobs(job.ID)
if err != nil {
return err
}
// uniqueness constraint
if len(foundJobs) > 0 && foundJobs[0] != nil {
logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
return nil
}
err = c.enqueue(job, &work.EnqueueOptions{
Namespace: NAMESPACE,
QueueID: string(jobParams.jobQueueName),
})
if err != nil {
return err
}
return nil
}
ContextJobFunc by default sets a context timeout, but this is not really compatible with long-running jobs/heartbeating.
I plan to fix this on a fork later today, but also happy to contribute a change upstream. I just need to figure out what an optimal fix would look like.
Apologies if this is the wrong channel to start this conversation. Please redirect me to the correct channel.
My query is, Is this package ready for use in production or is it still in the beta/prototype stage?
Also, if it's not ready, what is the planned timeline for it?
Im already using v8 in some other parts of my Application. Are there any incompatibility issues with the new Redis Version?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.