Code Monkey home page Code Monkey logo

goworker's Introduction

Hi there ๐Ÿ‘‹

I am Ben. I build software and data things on the Internet ๐Ÿ’ซ

goworker's People

Contributors

aaron1011 avatar benmanns avatar brockwood avatar ghais avatar jmonteiro avatar jpatters avatar koron avatar pda avatar robscc avatar sergeylanzman avatar theorioli avatar tinygrasshopper avatar wuman avatar xaka avatar xboston avatar xescugc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goworker's Issues

Non-command-line configuration

Currently goworker can only be configured with command-line flags. It would be convenient if goworker could be configured programmatically. This would allow the configuration to be stored elsewhere (e.g. in a application-defined configuration file).

Would you accept a pull request that adds this functionality? Any preference as to how you would like it implemented? I was thinking of something along the lines of:

package main

// ...

func main() {
    var options = &goworker.Options{
        Queues:      []string{"high", "medium", "low"},
        Connections: 5,
    }
    if err := goworker.Configure(options); err != nil {
        panic(err)
    }
    // ...
}

fail to install

[vagrant@localhost gadmin]$ go get github.com/benmanns/goworker
go get: github.com/youtube/vitess@none updating to
        github.com/youtube/[email protected]: parsing go.mod:
        module declares its path as: vitess.io/vitess
                but was required as: github.com/youtube/vitess

package broken

It seems like vitess.io/vitess/go/pool has changed their function signature and it requires more arguments.


func newRedisPool(uri string, capacity int, maxCapacity int, idleTimout time.Duration) *pools.ResourcePool {
	return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout)
}

Above lines are source of this problem and must be changed to accept 5 arguments as needed by
NewResourcePool(factory Factory, capacity int, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool

Access to actual JSON payload.

I'm new to Go, and am trying to figure out things. One thing which made goworker a bit hard to use was that the worker get an args ... interface{} as the payload. This means that the args would need to be manually type asserted to build the actual object structures. This can get cumbersome when its a complex JSON payload.

I had asked if there was another way to do this on SO, but it seems like there isn't any.

If access to the JSON is provided, then a struct can be passed to unmarshal to build that type.

Let me know, and thanks for this brilliant package.

RabbitMQ Support

Is there any plan about rabbitmq implementation ? I think it will be very usefull

can it run on multiple servers?

I ran the same hello example in two processes to test multi-server deployment

# terminal 
go run main.go -queues="hello" -concurrency=25
# another terminal
go run main.go -queues="hello" -concurrency=25
# queued up 100 messages
redis-cli -r 100 RPUSH resque:queue:hello '{"class":"MyClass","args":["hi","there"]}'

I expected the 100 messages to be processed equally between the two processes. The first process processsed all 100 messages. Does goworker support running on multiple servers?

etcd support

Hi,

Is it planed to support etcd ? I guess a common interface with Redis could be use and this would be useful for cloud-native applications that uses etcd for both cache and configuration. What do you think?

Thanks

How does goworker work?

I came across this project and I am wondering how it manages to execute arbitrary ruby code.

As from the ready I assume that I can have goworker run on my servers and from my ruby application I create some Resque-style background jobs which get processed by goworker.

I dig the code but I cannot find the spot where the ruby code gets executed and how you manage to get the correct ruby environment ...

Maybe I got something wrong though. Would be great if someone could shed some light on this?

Maybe @benmanns, @johnTurknett or @rjrobinson?

Thank you very much!

cannot unmarshal object into Go value of type []interface {}

I set up a worker for a job that contains 1 argument, a json structure. I get the error below when goworker attempts to process the job. Note that this error occurs before my worker function which does not even get called.

getting job from [default]: json: cannot unmarshal object into Go value of type []interface {}

pools.NewResourcePool request param has changed

github.com/benmanns/goworker

/data/gopath/src/github.com/benmanns/goworker/redis.go:29:9: cannot use func literal (type func() (pools.Resource, error)) as type pools.Factory in return argument
/data/gopath/src/github.com/benmanns/goworker/redis.go:35:30: not enough arguments in call to pools.NewResourcePool
have (pools.Factory, int, int, time.Duration)
want (pools.Factory, int, int, time.Duration, int, func(time.Time))

Expose Redis connection pool

Is there any way that the Redis connection pool could be exposed to worker functions somehow? It's a slight inconvenience to have to set up a separate connection pool, reimplement configuration logic, etc. It'd be great to be able to easily ".Get" a connection from the pool to use.

Not sure if this is something wanted for this project though.

How can I redirect all logs to stderr?

Hi, I'm writing a worker that print out jobs' info to stdout. But the output get mess up with logs when the worker shutdown or cannot connect to redis.
I tried -logtostderr flag but it didn't help.
How can I redirect all logs to stderr to avoid parsing the output with filter for logs?

Allow for maps instead of interfaces for worker functions

The getting started page says:

To create a worker, write a function matching the signature

func(string, ...interface{}) error

It'd be great if you could instead write functions like

func(string, map[string]string) error

for instance, if I wanted to pass a group of files to a worker for processing, I'd have to queue up the job with the right files in the right order in my args parameter, but I'd also have to reference them as args indices. So basically instead of doing something like:

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","product_file":"whatever.csv","category_file":"something_else.csv"}'

and

func(queue string, args map[string]string) error{
    productFile := os.Open(args["product_file"])
    // more things done here
    return nil
}

my actual workflow looks more like

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","args":["whatever.csv", "something_else.csv"]}'

and

func(queue string,  ...interface{}) error{
    productFileIndex = 0
    productFile := os.Open(args[productFileIndex])
    // more things done here
    return nil
}

obviously this isn't the worst problem to have ever, but I think it's a less error prone use case.

Retry failed jobs

It would be nice to have features like sidekiq provides (https://github.com/mperham/sidekiq/wiki/Error-Handling), especially retry failed jobs.
Something like:

"If you don't fix the bug within 25 retries (about 21 days), Sidekiq will stop retrying and move your job to the Dead Job Queue. You can fix the bug and retry the job manually anytime within the next 6 months using the Web UI."

Another go get failure

../github.com/benmanns/goworker/goworker.go:47: cannot use ctx (type context.Context) as type time.Duration in argument to pool.Get

Panic on enqueue

When enqueuing and working on jobs simultaneously a panic occurs.
"attempt to Put into a full ResourcePool"

This is caused by calling Init() every time Enqueue is called.

Refactor the Enqueue API

This issue is more of a question on which way is the better to approach the following issue.

I've started using this lib to implement a web service using Resque just in GO. With the Enqueue API works fine but there is a problem with it, this library was thought at the beginning to implement Workers that consume Resque, so that the code that is running this lib is a worker, not a web service.

This presents the following issue, if you have a web service that enqueues jobs (ex: Send Emails) when you run this:

From the documentation

package main

import (
        "fmt"

        "github.com/benmanns/goworker"
)

func main() {
        err := goworker.Enqueue(&goworker.Job{
                Queue: "myqueue",
                Payload: goworker.Payload{
                        Class: "MyClass",
                        Args:  []interface{}{"hi", "there"},
                },
        })
        fmt.Println(err)
}
$> go run enqueue.go

This returns an error saying that you must specify at least one queue, of course this should not happen because this code is not registering a worker, just pushing data to Redis.

$> go run enqueue.go -queues somequeue

This is the way it works now, the queue in which the Job is going to be pushed is the myqueue specified on the code, not the somequeue of the flags.

For this problem i have two possible solutions:

  • Separate the API for Pushing data to Resque (Enqueue) and the API for consuming it (Workers) so when you import it, you specify which one you want.

  • Delay the validations until they are used (lazy), when a Worker is created, then validate the queues, not before.

Which is of them ( or others ) do you prefer ? I ask this because sending a PR with a big change like that should be consulted before, that's what I think hehe.

Edited: Of course from this 2 solutions the first one changes the API so on back compatibility, which is important ๐Ÿ˜ข .

Example from Getting Started section doesn't run

The following example from Getting Started section doesn't run as is.

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

It prints "Error: you must specify at least one queue". Perhaps updating this code so newcomers can execute it without further knowledge would be a good idea.

New version

Hello we are using go dep for manage all the dependencies of our project and it seems that this tool download the latest tag of goworker project that is "v0.1.2", the thing is that this version doesn't include the WorkerSettings.

Which is the plan to release the next tag?

Thanks

How to panic handling on Job?

Hi! if it occurs is panic,nothing log will not be output.

package main

import (
        "errors"
        "fmt"
        "github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
            panic(errors.New("panic"))
                fmt.Printf("From %s, %v\n", queue, args)
                return nil
}

func init() {
        goworker.Register("MyClass", myFunc)
}

func main() {
        if err := goworker.Work(); err != nil {
                fmt.Println("Error:", err)
        }
}

I would like to the error handling , or is not only to recover on their own ?

missing the prefillParallelism argument in redis.go

go/src/github.com/benmanns/goworker/redis.go:35:30: not enough arguments in call to pools.NewResourcePool

In vitess.io/vitess/go/pools the NewResourcePool is defined as

NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Duration, prefillParallelism int) *ResourcePool

Is missing the argument prefillParallelism in redis.go?

return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout, 0)
or
return pools.NewResourcePool(newRedisFactory(uri), capacity, maxCapacity, idleTimout, 1)

show better error message in case Redis server is not available

Currently, if the Redis server is not available, running the example code from the homepage results in,

$ ./worker -queues=hello
1391970160808977666 [Critical] Error on getting connection in poller hostname:20800-poller:hello
1391970160809764448 [Critical] Error on getting connection in worker hostname:20800-0:hello
1391970160809953790 [Critical] Error on getting connection in worker hostname:20800-1:hello
1391970160810271765 [Critical] Error on getting connection in worker hostname:20800-2:hello
1391970160810447307 [Critical] Error on getting connection in worker hostname:20800-3:hello
1391970160810619467 [Critical] Error on getting connection in poller hostname:20800-poller:hello
1391970160810783042 [Critical] Error on getting connection in poller hostname:20800-poller:hello
1391970160810944713 [Critical] Error on getting connection in worker hostname:20800-4:hello
1391970160811127112 [Critical] Error on getting connection in worker hostname:20800-5:hello
1391970160811294090 [Critical] Error on getting connection in worker hostname:20800-0:hello
1391970160811451285 [Critical] Error on getting connection in worker hostname:20800-3:hello
1391970160811605366 [Critical] Error on getting connection in worker hostname:20800-6:hello
1391970160811769875 [Critical] Error on getting connection in worker hostname:20800-7:hello
1391970160811935685 [Critical] Error on getting connection in worker hostname:20800-1:hello
1391970160812122838 [Critical] Error on getting connection in worker hostname:20800-5:hello
1391970160812328184 [Critical] Error on getting connection in worker hostname:20800-8:hello
1391970160812552045 [Critical] Error on getting connection in worker hostname:20800-9:hello
1391970160812796857 [Critical] Error on getting connection in worker hostname:20800-4:hello
1391970160813031975 [Critical] Error on getting connection in worker hostname:20800-7:hello
1391970160813273275 [Critical] Error on getting connection in worker hostname:20800-10:hello
1391970160813515649 [Critical] Error on getting connection in worker hostname:20800-11:hello
1391970160813765257 [Critical] Error on getting connection in worker hostname:20800-6:hello
1391970160813966490 [Critical] Error on getting connection in worker hostname:20800-9:hello
1391970160814185941 [Critical] Error on getting connection in worker hostname:20800-12:hello
1391970160814416075 [Critical] Error on getting connection in worker hostname:20800-13:hello
1391970160814635214 [Critical] Error on getting connection in worker hostname:20800-8:hello
1391970160814820036 [Critical] Error on getting connection in worker hostname:20800-11:hello
1391970160815009785 [Critical] Error on getting connection in worker hostname:20800-14:hello
1391970160815232195 [Critical] Error on getting connection in worker hostname:20800-15:hello
1391970160815428000 [Critical] Error on getting connection in worker hostname:20800-10:hello
1391970160815615223 [Critical] Error on getting connection in worker hostname:20800-13:hello
1391970160815834367 [Critical] Error on getting connection in worker hostname:20800-16:hello
1391970160816038209 [Critical] Error on getting connection in worker hostname:20800-17:hello
1391970160816242444 [Critical] Error on getting connection in worker hostname:20800-12:hello
1391970160816458387 [Critical] Error on getting connection in worker hostname:20800-15:hello
1391970160816644701 [Critical] Error on getting connection in worker hostname:20800-18:hello
1391970160816847689 [Critical] Error on getting connection in worker hostname:20800-19:hello
...
...

Can we improve on this?

100% cpu usage

When running goworker it takes up 100% cpu, meaning one cpu and pushes the server load to one, and stays there. Is this normal and expected? Can anything be done about this? This is using the example provided which is basically doing nothing, it also seems to push up the load of redis to 50% CPU and it's not doing anything. Wondering if this is normal behavior? Messing with the settings does not change anything, i.e. number of workers or polling time.

Goworker does not recover from loss of connection to redis

If the connection to redis goes down (due to redis restart, netsplit, ...) the library does
not recover, but only logs the error:

2014-03-10 19:07:23,682 DEBG 'my-worker' stdout output:
1394474843680784649 [Error] Error on my-worker-1:14479-poller:my_queue getting job from [my_queue]: use of closed network connection

Goworker should recognize the redis client instance is dead and try reconnecting.

Timeout for stuck workers

It can happen that workers get stuck silently. We were using node-resque worker before, which handled this scenario very well. With goworker, jobs just keep shown as running in resque-web.
Also, the worker count in resque-web keeps increasing (should be 4).

screen shot 2018-08-29 at 13 26 38

go get failure

go get github.com/benmanns/goworker
package code.google.com/p/vitess/go/pools: unable to detect version control system for code.google.com/ path

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.