Code Monkey home page Code Monkey logo

work's People

Contributors

aprimadi avatar austintaylor avatar beornf avatar caldempsey avatar chambo-e avatar cypriss avatar davars avatar igorwwwwwwwwwwwwwwwwwwww avatar jnovak-stripe avatar mitchrodrigues avatar peterhellberg avatar shdunning avatar sohymg avatar taylorchu avatar tyler-smith avatar tylerb avatar vaporz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

work's Issues

Periodic Job with Retries

Hi, this is more of a question actually. Does periodic job have a retry option? What happens if there are no retries or the retry maxes out? Does it continue to run in the next scheduled interval?

Thanks in advance.

If I stop the worker and start it later the periodic job runs hundreds of times

I have a periodic job that runs every second. If I stop the worker for 5 minutes (usually during development) and then start it again, the job will run 300 times in a row before re-entering it's cycle of 1 per second.

	beego.Debug("Starting worker")
	pool := work.NewWorkerPool(Context{}, 1, "typely", redisPool)
	
	// Add middleware that will be executed for each job
	pool.Middleware((*Context).Log)
	
	pool.PeriodicallyEnqueue("*/1 * * * * *", "create_invoices")
	pool.Job("create_invoices", (*Context).CreateInvoices)
	
	// Start processing jobs
	pool.Start()
	
	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan
	
	// Stop the pool
	pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
	beego.Debug("Starting job: ", job.Name)
	return next()
}

func (c *Context) CreateInvoices(job *work.Job) error {
	return CreateInvoices()
}```

During downtime, if I change the cron to run every hour, when I restart it, it still makes up for the lost seconds running 300 times and only then it enters the new cycle of 1/h.

Is this desired behavior?

"Extra" args which do not matter for job uniqueness

Hi @cypriss,

Thanks for the work on work. ๐Ÿ˜‹

Case in point, in my current setup, I'm loading twitter followers with the job args:

{"twitter_id": 123456, "next_cursor_str": "-1", "ids": []}

If I'm rate limited by twitter API, I schedule a new job from within the current job with the args:

{"twitter_id": 123456, "next_cursor_str": "something", "ids": [1,2,3,4,5]}

(I have to make a query for uniqueness at this time)

If I were to use work for background processing, it seems like embedding partial args like above will have to go unless:

a) There's support to include "extra" args, which do not count towards job uniqueness; or
b) I use a reference to external cache. This sounds reasonably good for my case since the ids part can get pretty large.

With (a) it would be guaranteed that there's only one followers checking job for a user with twitter_id 123456. The next_cursor_str and ids can move into the extra/cache args. What do you think? Are there breaking concerns about how work is implemented, or is this simple enough to do?

I'm willing to take a stab at this with some guidance on what and how this would be possible.

MaxConcurrency does not work

This is going to be vague, as I'm currently trying to debug what's going on, but the MaxConcurrency option seems to be broken.

According to my current understanding:

For example, the :lock key for a job, under load, should be the same as the value of MaxConcurrency, but mine stays at 0.
The :lock_info key should have individual lock counts, per worker, that add up to the total of :lock for that job. However, the contents of that hash vary wildly between -4 and 4. I am testing with a MaxConcurrency of 1.

It seems there is a race condition in the increment/decrement logic. I have a worker pool size of 4, so the -4 and 4 variance could come from there. Perhaps the system is attempting to pull 4 jobs at once, and all 4 succeed in incrementing/decrementing?

How to know the job is successfully done?

The 'WorkerObservations' will get the inprogress status of the running job, once the job is done, the status info is deleted. Then my question is, how can I get to know my job is successfully done?

key := redisKeyWorkerObservation(o.namespace, o.workerID)

	if obv == nil {
		if _, err := conn.Do("DEL", key); err != nil {
			return err
		}
	} 

@shdunning ?

Ability to display job status by ID

Im looking at using this for long lived web requests where the initial response to the client is a 202, and the client will check back to see status. Is there a way to be able to look up a job's status given only the job ID?

Support redis sentinel

We use redis sentinel, Have you thought about adding sentinel support ? or it already supported? some redigo fork seems to support sentinel but may not proper.

Thank you.

500 errors on /retry_jobs and all other fetch/xhr requests

I'm having 500 on https://worker.palcomp3.com/retry_jobs?page=1

{"error": "dial tcp [::1]:6379: getsockopt: connection refused"}

Start command:

workwebui -redis="redis_hostname:6379" -ns="work" -database="11" -listen=":5040"

I'm being able to connect to redis_hostname:6379 through telnet, so the problem probably isn't related to the redis setup.

Pass defaults to context

Hi,

I would like to pass my db connector via context to all request but it looks like the context is cleared each time. I have found a similar PR for "web" gocraft/web#6

There a workaround or am I missing something?

Regards,
Riaan

Periodic jobs don't run as expected

PeriodicallyEnqueue("@every 1200s", key)

I found that periodic jobs don't record last execution time, therefore the following piece of code that wouldn't be run.

for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) {
			epoch := t.Unix()
			id := makeUniquePeriodicID(pj.jobName, pj.spec, epoch)

			job := &Job{
				Name: pj.jobName,
				ID:   id,

				// This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
				EnqueuedAt: epoch,
				Args:       nil,
			}

			rawJSON, err := job.serialize()
			if err != nil {
				return err
			}

			_, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON)
			if err != nil {
				return err
			}
		}`

Preserve int64 job arguments with json.Number

All JSON numbers are unmarshalled into float64 by default. Hence large int64 numbers with more than 16 digits can not be passed as an argument. The boundary check is done here https://github.com/gocraft/work/blob/master/job.go#L106

By using json.Number these arguments could be preserved:

dec := json.NewDecoder(bytes.NewReader(rawJSON))
dec.UseNumber()
dec.Decode(&job)

Converting json.Number to primitive types in helper methods can be done simply with:

strconv.ParseInt(string(v), 10, 64)
strconv.ParseFloat(string(v), 64)

This could be added as a UseNumber job option for backwards compatibility.

Is this repo still be maintained?

The last commit is merged months ago and several bug fix PR are pending, even it is approved. I'm wondering is this repo is still active developed?

Wait for job to complete

Hi,

I'm starting jobs with:

_, err := enqueuer.Enqueue("handle_tika", work.Q{"pdf": bundle[0], "xml": bundle[1]})
if err != nil {
    log.Fatal(err)
}

this is called multiple times. I'd like to enque the jobs but let each of them wait for the running job. How can I achieve that?

Drain functionality

With nodejs, I had an async.queue with a drain function. Which allowed me to push lots of items in the queue and upon completion (or complete drainage of the queue), the drain function would execute.

var q = async.queue(function (task, callback) {	
	...
}

q.drain = function() {
	...
}	

for (let item of items) {
	var task = {
		...
	}
	q.push(task);
}	

I would like to be able to group jobs when they are queued and then have another process that watches that group empty out and upon empty, then executes a job.

One use-case that I have, is that I am screen-shoting a page, but with many various useragents and resolutions. I'm queuing about 40 unique jobs and within my UI, I'm showing a "loading" notification. Would be nice to fire-off a "completed" job, which could contain a websocket function and at the UI hide the loading alert.

Schedule jobs

Hi,
When are going be available Schedule jobs?????
Do you have any ideia???

Thanks,

Giorgio

Usage Example

Hi,

I'm following allong the example process jobs in the README and trying to get it to work.

in main.go, I'm calling StartWorkers() which is equivalent to the main func of the example. Later, I'm enqueuing a job after an incoming HTTP request.

Now it seems like StartWorkers() seems to block the HTTP request as no connections are accepted after calling StartWorkers().

I'm sure this is a beginner question and that there is a trivial mistake, but I can't figure it out.

Proposal: Add context.Context support

context.Context is the de facto idiomatic way to inject dependencies.

Jobs and Middlewares should receive a context as a first parameter and the last one should forward the context through the next function.

How to return a result to the caller in the job worker?

Are there any function like this? Example:

//-----------------
//the caller:

job, err := enqueuer.Enqueue("add", work.Q{"a": 1, "b": 2})
...
result := job.GetResult()

//---------------------------
//and the worker:

pool.Job("add", (*Context).Add)
...
func (c *Context) Add(job *work.Job) error {
a := job.ArgInt("a")
b := job.ArgInt("b")

    resultAplusB := a + b
    // how to return this value of resultAplusB

return nil

}

Thanks.

How to functionally pass variables to worker? (e.g. db conn)

If gocraft/work initializes the context struct for me, how do I "pass" a variable, say db, to a worker?

func main() {
    // ...
    db, _ := sql.Open(...)
    pool := configureBackgroundProcessing(db, redisPool)
    go pool.Start()
    // monitor for sigterm
}

func configureBackgroundProcessing(db *sql.DB, redisPool *redis.Pool) *work.WorkerPool {
    concurrency := 10
    namespace := "gocraft_work"
    pool := work.NewWorkerPool(jobs.Context{}, concurrency, namespace, redisPool)

    // a middleware..?

    pool.Job("get_followers", (*jobs.Context).TwitterGetFollowers)

    return pool
}

Radically different from other lib, like jrallison/go-workers, with which my configureBackgroundProcessing func looks something like:

followersWorker := &FollowersWorker{db: db}
workers.Process("get_followers", followersWorker.Perform, 10)

Stopping worker pool keeps running jobs

For some reason, I'm unable to stop a worker pool when there are jobs in the queue. I expected calling pool.Stop() to finish up what jobs are being run, and workers to exit while redis holds whatever else is in the queue... but that doesn't seem the case. Below is a small test case that illustrates the problem, mostly adapted from the README:

package main

import (
    "fmt"
    "github.com/garyburd/redigo/redis"
    "github.com/gocraft/work"
    "time"
)

var redisPool = &redis.Pool{
    MaxActive: 5,
    MaxIdle:   5,
    Wait:      true,
    Dial: func() (redis.Conn, error) {
        return redis.Dial("tcp", ":6379")
    },
}

type Context struct{}

var enqueuer = work.NewEnqueuer("app", redisPool)

func (c *Context) SampleJob(job *work.Job) error {
    fmt.Printf("Sample job %v running\n", job.ID)

    time.Sleep(10 * time.Second)

    fmt.Printf("Sample job %v done\n", job.ID)
    return nil
}

func main() {
    pool := work.NewWorkerPool(Context{}, 2, "app", redisPool)

    pool.Job("sample_job", (*Context).SampleJob)

    for i := 1; i <= 30; i++ {
        enqueuer.Enqueue("sample_job", work.Q{})
    }

    pool.Start()

    time.Sleep(30 * time.Second)
    fmt.Println("30 seconds is up! Stopping workers...")

    pool.Stop()
}

Rather than stopping after 30 seconds (plus up to 2 * 10 seconds for the running jobs to continue) the program continues and workers keep consuming jobs from the queue until it's empty:

go run main.go
Sample job 4b6859807fe26e8c344cc302 running
Sample job 6f6f30cd645cd328552983a8 running
Sample job 4b6859807fe26e8c344cc302 done
Sample job 6f6f30cd645cd328552983a8 done
Sample job d17378fcb8ab7e2b42b71ccf running
Sample job 5ad6c131d7faab927db749a2 running
Sample job d17378fcb8ab7e2b42b71ccf done
Sample job 5ad6c131d7faab927db749a2 done
Sample job e7f7e2aaeecf5e16d13ad5bc running
Sample job 5a09255570fabb8813eee575 running
30 seconds is up! Stopping workers...
Sample job 5a09255570fabb8813eee575 done
Sample job e7f7e2aaeecf5e16d13ad5bc done
Sample job f25bcdacb746c94348430143 running
Sample job e399bd4c8d3da1de109310c4 running
Sample job f25bcdacb746c94348430143 done
Sample job e399bd4c8d3da1de109310c4 done
Sample job 27a8ff38c160c64f9a1a52d4 running
Sample job fa0e939272c71d2d927bd032 running
Sample job fa0e939272c71d2d927bd032 done
Sample job 27a8ff38c160c64f9a1a52d4 done
Sample job d189cd37e00f1482aa898550 running
Sample job 40761b0c9f8ecaab192c9b79 running
Sample job d189cd37e00f1482aa898550 done
[...snipped! but this keeps going on for awhile...]

Am I doing something wrong in the setup or is this the desired behavior?

Is project alive?

28 days since bug reports and two PRs have been opened with no response from the team. I would like to know if the team is alive and willing to support the project or should I just go ahead and fork it completely?

Webui doesn't work in Safari

Hi,

when trying to accessing the webui in Safari on my mac, it doesn't load at all. In the console, we get the following message:

[Error] ReferenceError: Can't find variable: fetch
	value (work.js:5:18353)
	performInitialMount (work.js:4:2382)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	mountChildren (work.js:3:28158)
	_createInitialChildren (work.js:3:6598)
	mountComponent (work.js:3:4759)
	mountComponent (work.js:2:15978)
	mountChildren (work.js:3:28158)
	_createInitialChildren (work.js:3:6598)
	mountComponent (work.js:3:4759)
	mountComponent (work.js:2:15978)
	mountChildren (work.js:3:28158)
	_createInitialChildren (work.js:3:6598)
	mountComponent (work.js:3:4759)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	a (work.js:5:10792)
	perform (work.js:2:18190)
	i (work.js:5:11004)
	perform (work.js:2:18190)
	batchedUpdates (work.js:4:15372)
	l (work.js:2:12826)
	_renderNewRootComponent (work.js:5:12205)
	_renderSubtreeIntoContainer (work.js:5:13208)
	(anonymous function) (work.js:1:2656)
	t (work.js:1:112)
	(anonymous function) (work.js:1:196)
	Global Code (work.js:1:200)

In Chrome, it seems to work just fine...

Allow *webui.Server to be mounted.

It would be nice to have webui.Server implement http.Handler so it can be mounted in an existing app without having to run another process.

You could also just make the web.Router public or embedded.

Jobs can never be properly reaped on dirty shutdown

The reaper, which starts during the startup of WorkerPools, will never reap dead jobs if we encounter a dirty shutdown (SIGKILL or other early exit where we don't wait for all workers to finish).

The heartbeat key expires after 60 seconds, which isn't long enough to leave it around for the 2nd reap cycle (10 min) to discover a dead job (job is considered dead if it hasn't received a heartbeat in 5 min).

Replacing Redis with MemSQL

Would it be possible to switch out redis for memsql?

For those that don't know, memsql is an in-memory row storage, with an on-disk columnar storage. Which includes the capacity for high availability.

Since I am going to be using memsql myself, I'd rather not introduce another similar db component and simplify my stack.

Is it a case of switching out one in-memory db for another or does it need to be a pubsub aware db?

Thanks

Graceful shutdown

When stopping the worker, I'd like to be able to tell the jobs that they should stop. And re-queue them on other workers.

In my use-case I've implemented the method to signal that the job needs to stop, however, there's no way to stop the job without either marking it as completed or as failed.

If I re-queue the job manually, I lose the MaxFails options, if I fail the job by returning an error, work still cannot honor the MaxFails value.

How would one go about doing this? I suggest if a job returns a work.ErrSkip error , MaxFails should not be increased.

Periodic Job

First of all keep up the good work! awesome! Btw, just out of curiosity any chance to stop/delete periodic job? unfortuntely i cant see any, am i right?It seems to be implemented only for scheduled as single shot either dead or retry jobs as well(maybe because a range zscore is needed) . Further more i can't see any pause/unpause public method into client(but only a private method into worker_test file). regards

Please create a release with latest changes

Current release v0.5.0 doesn't include MaxConcurrency with JobOptions. It would be helpful if there is a release which includes this so package manages like glide can make use of the release to lock down to a version

Sidekiq compatibility and enquers in other languages eg. php

Would someone mind confirming whether this will process sidekiq compatible jobs - ie. can I use a sidekiq client library to push jobs to redis, then use this library to process?

If not, a secondary question: If I wanted to push jobs using PHP - I presume I would just need to port enqueue.go. From this line of code, the main use case looks pretty straight forward - I can use phpredis to LPUSH json, per this line of go:

conn.Do("LPUSH", e.queuePrefix+jobName, rawJSON)

Does that look right? Is it really that straightforward?

ERROR: requeuer.process.dead - no job name

After enqueuing a job to run after waiting 5 seconds, I get this error and my job doesn't run:
ERROR: requeuer.process.dead - no job name

What is the best way to troubleshoot this?

SummitDB support

Just wondering know would you like to have another backend? I was working on an internal service to have a queue system. gocraft/work is a great choice. But I want to get rid of the dependencies of Redis. So I combine tidwall/summitdb and gocraft/work. It works well.

If you like the idea, I'd like to fire an MR.

Log stack trace for panic

Hi, currently the log message is not very helpful in tracing the error if the job results in a panic. Is it possible to log with the stack trace? This will be useful in general for any logging of errors.

ERROR: runJob.panic - some error

# https://github.com/gocraft/work/blob/master/log.go#L8
func logError(key string, err error) {
	fmt.Printf("ERROR: %s - %+v\n", key, err)
}

Feature request: Allow parameters for periodic jobs

Currently periodic jobs cannot have parameters. It would be very useful to be able to do something like enqueuer.PeriodicallyEnqueue(name, params)

Also, it's a bit inconsistent that periodical enqueues are a part of the WorkerPool, but non-periodical functions are in Enqueuer.

Maintained?

Is this project still maintained?, quite a few issues and PR's unanswered.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.