Code Monkey home page Code Monkey logo

work's People

Contributors

fossabot avatar taylorchu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

work's Issues

What value should be used as QueueID?

Dear devs, currently I'm using v1, but to use Redis Cluster, we're trying to change our package from v1 to v2. And I wonder what value should I use for QueueID. Also, what's the purpose of QueueID?

w.opt undefined (cannot refer to unexported field or method opt)

Many thanks for developing this new version of work.

*** Issue Solved ***

Enqueuer:

package main

import (
	"fmt"

	"github.com/go-redis/redis"
	"github.com/taylorchu/work"
)

type message struct {
	Text string
}

func main() {
	client := newRedisClient()
	defer client.Close()

	job := work.NewJob()
	err := job.MarshalPayload(message{Text: "hello"})

	if (err != nil) {
		panic(err)
	}

	err = work.NewRedisQueue(client).Enqueue(job, &work.EnqueueOptions{
		Namespace: "ns1",
		QueueID:   "worker",
	})

	if (err != nil) {
		panic(err)
	}

	fmt.Println("job submitted")
}

func newRedisClient() *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:         "127.0.0.1:6379",
		PoolSize:     10,
		MinIdleConns: 10,
	})
}

Worker:

package main

import (
	"fmt"
	"os"
    "os/signal"
	"time"

	"github.com/go-redis/redis"
	"github.com/taylorchu/work"
)

type message struct {
	Text string
}

func main() {
	client := newRedisClient()
	defer client.Close()

	w := work.NewWorker(&work.WorkerOptions{
		Namespace: "ns1",
		Queue:     work.NewRedisQueue(client),
	})
	err := w.Register("worker",
		func(job *work.Job, _ *work.DequeueOptions) error {
			var msg message
			job.UnmarshalPayload(&msg)
			fmt.Println(msg)
			return nil
		},
		&work.JobOptions{
			MaxExecutionTime: time.Second,
			IdleWait:         time.Second,
			NumGoroutines:    2,
		},
	)

	if (err != nil) {
		panic(err)
	}

	fmt.Println("starting")

	w.Start()

	// Wait for a signal to quit:
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, os.Kill)
    <-signalChan

    fmt.Println("stopped")

    w.Stop()
}

func newRedisClient() *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:         "127.0.0.1:6379",
		PoolSize:     10,
		MinIdleConns: 10,
	})
}

Feature: Dequeue job by ID

I have a requirement to support changing the Delay of a previous enqueued job and just removing it completely.

Not sure if I've missed something but doesn't seem possible.

[REQUEST] Create a new release

No new release has been pushed since 19 Sep. 2019 even though you've been busy with tweaks and fixes. Any chance you can tag the current master?

Ability to retry dead jobs manually

One of the biggest reasons we used https://github.com/gocraft/work is that it has the ability in the UI to retry jobs manually that have exhausted their automatic retries. Given that this project has moved the UI solution to prometheus/grafana, are there any plans for covering the need to retry dead jobs manually?

PeriodicJob

Hey guys do you still support the periodic jobs?
I managed to use the schedule job at runtime, but I was not able to set up a periodic job

[QUESTION] Does the reaper still exist?

I was wondering, does this repository still have the reaper element from gocraft/work? As in, what happens to jobs that are stuck?

The README doesn't really explain what from v1 still exists which makes it hard to refactor my codebase (since I don't need what I have to build myself and what is already done by this repo).

Unique EnqueueMiddleware example

Hi,

How to use the unique enqueue middleware (and other EnqueueMiddleware) with redis queue? I can't find it in any source codes or docs :|

Lost job

Within the same queue:

  1. Job with id "X" is enqueued
  2. Job with id "X" is dequeued by worker №1
  3. Job with id "X" is enqueued
  4. Handler of worker №1 returns without error
  5. Worker №1 flushes ACK
  6. Job enqueued at step 3 is lost

Test in #67 demonstrates the issue.

add examples and docs

can you add some examples and docs, so for the people who're interested on this project can get started on this without digging into source code or godoc ....

i'm evaluating this, i can contribute some examples ...

Is this abandoned?

I've been using Work v1 and watching this repo - has v2 been abandoned? The original repo has much more activity to date.

Catch Panic Middleware

I ended up creating middleware for catching panics similar to how you are doing it worker.go

I am using logger middleware and it was not getting the panic error message because it was happening outside of the middleware process.

Dequeued multiple times

Single job is inserted, however, multiple deques are executed. What should I do to prevent multiple dequeueing?

TestCode


var testNamespace = "this-is-test-namespace"
var testQueueID = "this-is-test-queue-id"

func workFunc(job *work.Job, jobOpts *work.DequeueOptions) error {
	var args map[string]interface{}
	if err := job.UnmarshalPayload(&args); err != nil {
		return err
	}

	contractAddr := ""
	ok := false
	if contractAddrFromMsg, exist := args[contractAddrKey]; exist {
		if contractAddr, ok = contractAddrFromMsg.(string); !ok {
			return fmt.Errorf("aadsada")
		}
	}

	if contractAddr == "" {
		return fmt.Errorf("empty contract addr")
	}

	fmt.Println("successful deque")

	time.Sleep(500 * time.Millisecond)

	fmt.Println("successful return")
	return nil
}

func TestTaylorChuWork(t *testing.T) {
	m, err := miniredis.Run()
	if err != nil {
		t.Fatal("Failed to start", "err", err)
	}

	uc := redis.NewUniversalClient(&redis.UniversalOptions{
		Addrs: []string{m.Addr()},
	})

	
	queue := work.NewRedisQueue(uc)
	
	worker := work.NewWorker(&work.WorkerOptions{
		Namespace: testNamespace,
		Queue:     queue,
	})
	
	err = worker.Register(
		testQueueID,
		workFunc,
		&work.JobOptions{
			//WorkerOptions:     work.WorkerOptions{},
			MaxExecutionTime:  10 * time.Second,
			IdleWait:          1 * time.Second,
			NumGoroutines:     10,
			DequeueMiddleware: nil,
			HandleMiddleware:  nil,
		}, )
	if err != nil {
		t.Fatal(err)
	}

	worker.Start()

	job := work.NewJob()
	job, err = job.WithPayload(map[string]interface{}{contractAddrKey :  contractAddrKey})
	if err != nil {
		t.Fatal(err)
	}

	err = queue.Enqueue(job, &work.EnqueueOptions{Namespace:testNamespace, QueueID: testQueueID})
	if err != nil {
		t.Fatal(err)
	}

	time.Sleep(10*time.Second)

}

Result

=== RUN   TestTaylorChuWork
successful deque
successful deque
successful return
successful return

Worker loop is a busy loop

Looking at this loop here:

work/worker.go

Lines 214 to 253 in f26f39f

for {
select {
case <-w.stop:
return
case <-flushTicker.C:
err := flush()
if err != nil {
errFunc(err)
}
default:
err := func() error {
opt := &DequeueOptions{
Namespace: ns,
QueueID: h.QueueID,
At: time.Now(),
InvisibleSec: int64(2 * (h.JobOptions.MaxExecutionTime + flushIntv) / time.Second),
}
job, err := dequeue(opt)
if err != nil {
return err
}
err = handle(job, opt)
if err != nil {
return err
}
ackJobs = append(ackJobs, job)
if len(ackJobs) >= 1000 {
// prevent un-acked job count to be too large
err := flush()
if err != nil {
return err
}
}
return nil
}()
if err != nil && err != ErrEmptyQueue {
errFunc(err)
}
}
}

The default case has no sleep/wait so will just spin like mad even when no work.

Some features requested

I modded the original gocraft/work a bit to suit my needs and would really like to see these natively:

  1. Have completed callback for when a job successfully completes.
  2. Have a failed callback for when a job fails and has retries left and have available a stack trace.
  3. Have a completely failed callback for when a job fails and has no retries left.

I got so far with a test, but without a context I'm unable to proceed and start testing. I await future updates with interest! 👍

Many thanks

custom ratelimit middleware

now we have queue-level global concurrency middleware to ensure that at any time, no more than N works are running a job.

The new middleware can let user to determine a custom key and rate limit based on:

  • concurrency (how many are running at this moment)
  • bucket (non-overlapping window like every N minute edge, we will hard reset the quota)
  • window (sliding window that M jobs in N minute window)
  • error (if local error rate is too high, throttle. Maybe the job handler has a bug.)

Seeing jobs dequeued/run multiple times for long-running cron jobs, despite same jobId/start time

Hi there,

I am trying to have a cron job that only runs once every X hours (or daily, etc.) despite there being multiple instances of my application. When the job is instantaneous (or close, i.e. prints a line and returns) it works as expected (we run the job with a UniqueJobId and our uniqueness constraint is triggered (see below).

...
foundJobs, err := c.BulkFindJobs(job.ID)
	if err != nil {
		return err
	}

	if len(foundJobs) > 0 && foundJobs[0] != nil {
		logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
		return nil
	}
...

But when the job is longer (our job takes several minutes to complete), the unique constraint is ignored across instances and the job is dequeued several times and wrongly runs several times (once per instance). We've tried to use InvisibleSec but we have found that other jobs just run after that time period-- i.e. if the job is set to run at 5:00 and InvisibleSec is 60, one instance's job runs (correctly) at 5:00 and another runs at 5:01. We've also tried to see what we can do with EnqueueDelay but that does not seem to be working either.

Any help/insight would be greatly appreciated! See below for how we are setting up our cron service.

// called on application start-up 
func main() {
...
redisClient := application.BuildRedisClient()
jobsClient := application.BuildJobsClient(redisClient)

core := core.New(
core.Config{
                     ...
JobsClient:            jobsClient,
RedisClient:           redisClient,
...
        })

... 

group.Go(func() error {
cron.CronHandler(jobsClient, context.Background())
return nil
})

}
// JobsHandler, also called on start-up
func JobsHandler(redisClient *redis.ClusterClient, handlerFunc work.ContextHandleFunc) {
	jobWorker := work.NewWorker(&work.WorkerOptions{
		Namespace: jobs.NAMESPACE,
		Queue:     work.NewRedisQueue(redisClient),
		ErrorFunc: func(err error) {
			log.Println(err)
		},
	})

	jobOpts := &work.JobOptions{
		MaxExecutionTime: time.Minute,
		IdleWait:         time.Second,
		NumGoroutines:    4,
		HandleMiddleware: []work.HandleMiddleware{
			logrus.HandleFuncLogger,
			catchPanic,
		},
	}

	for queueName := range jobs.JOB_QUEUES {
		jobWorker.RegisterWithContext(string(queueName), handlerFunc, jobOpts)
	}

	jobWorker.Start()
}
// cron service
package cron

import (
	"context"
	"main/entities/jobs"
	"main/lib/errors"

	"github.com/robfig/cron/v3"
)

func CronHandler(jobsClient jobs.Client, ctx context.Context) {
	c := cron.New()
	c.AddFunc("50 * * * *", func() { enqueueOurJob(jobsClient, ctx) })
	c.Start()
	return
}

func enqueueOurJob(jobsClient jobs.Client, ctx context.Context) {
	uniqueId := "uniqueId"
	enqueueJobParams, err := jobs.CreateEnqueueJobParams(jobs.CreateEnqueueJobParamsArgs{
		Name:         jobs.OurJob,
		UniqueJobId:  &uniqueId,
	}, &jobs.OurJobPayload{})

	err = jobsClient.EnqueueJob(ctx, *enqueueJobParams)
}

func (c *client) EnqueueJob(ctx context.Context, jobParams EnqueueJobParams) error {
	job := work.NewJob()

	if jobParams.uniqueJobId != nil {
		job.ID = *jobParams.uniqueJobId
	}

	if jobParams.enqueueDelay != nil {
		job = job.Delay(*jobParams.enqueueDelay)
	}

	if err := job.MarshalJSONPayload(string(jobParams.jobPayload)); err != nil {
		return err
	}

	foundJobs, err := c.BulkFindJobs(job.ID)
	if err != nil {
		return err
	}

        // uniqueness constraint
	if len(foundJobs) > 0 && foundJobs[0] != nil {
		logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
		return nil
	}

	err = c.enqueue(job, &work.EnqueueOptions{
		Namespace: NAMESPACE,
		QueueID:   string(jobParams.jobQueueName),
	})
	if err != nil {
		return err
	}

	return nil
}

ContextJobFunc and HeartbeatMiddleware incompatible?

ContextJobFunc by default sets a context timeout, but this is not really compatible with long-running jobs/heartbeating.

I plan to fix this on a fork later today, but also happy to contribute a change upstream. I just need to figure out what an optimal fix would look like.

Is it still in prototype stage?

Apologies if this is the wrong channel to start this conversation. Please redirect me to the correct channel.
My query is, Is this package ready for use in production or is it still in the beta/prototype stage?

Also, if it's not ready, what is the planned timeline for it?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.