Code Monkey home page Code Monkey logo

asyncjobs's Introduction

Choria Asynchronous Jos

Overview

This is an Asynchronous Job Queue system that relies on NATS JetStream for storage and general job life cycle management. It is compatible with any NATS JetStream based system like a private hosted JetStream, Choria Streams or a commercial SaaS.

Each Task is stored in JetStream by a unique ID and Work Queue item is made referencing that Task. JetStream will handle dealing with scheduling, retries, acknowledgements and more of the Work Queue item. The stored Task will be updated during the lifecycle.

Multiple processes can process jobs concurrently, thus job processing is both horizontally and vertically scalable. Job handlers are implemented in Go with one process hosting one or many handlers. Other languages can implement Job Handlers using NATS Request-Reply services. Per process concurrency and overall per-queue concurrency controls exist.

This package heavily inspired by hibiken/asynq.

Go Reference Go Report Card CodeQL Unit Tests

Status

This is a brand-new project, under heavy development. The core Task handling is in good shape and reasonably stable. Task Scheduler is still subject to some change.

Synopsis

Tasks are published to Work Queues:

// establish a connection to the EMAIL work queue using a NATS context
client, _ := asyncjobs.NewClient(asyncjobs.NatsConn(nc), asyncjobs.BindWorkQueue("EMAIL"))

// create a task with the type 'email:new' and body from newEmail()
task, _ := asyncjobs.NewTask("email:new", newEmail())

// store it in the Work Queue
client.EnqueueTask(ctx, task)

Tasks are processes by horizontally and vertically scalable processes. Typically, a Handler handles one type of Task. We have Prometheus integration, concurrency and backoffs configured.

// establish a connection to the EMAIL work queue using a 
// NATS context, with concurrency, prometheus stats and backoff
client, _ := asyncjobs.NewClient(
	asyncjobs.NatsContext("EMAIL"), 
	asyncjobs.BindWorkQueue("EMAIL"),
	asyncjobs.ClientConcurrency(10),
	asyncjobs.PrometheusListenPort(8080),
	asyncjobs.RetryBackoffPolicy(asyncjobs.RetryLinearTenMinutes))

router := asyncjobs.NewTaskRouter()
router.Handler("email:new", func(ctx context.Context, log asyncjobs.Logger, task *asyncjobs.Task) (any, error) {
	log.Printf("Processing task %s", task.ID)

	// do work here using task.Payload

	return "sent", nil
})

client.Run(ctx, router)

See our documentation for a deep dive into the use cases, architecture, abilities and more.

Requirements

NATS 2.8.0 or newer with JetStream enabled.

Features

See the Feature List page for a full feature break down.

asyncjobs's People

Contributors

dependabot[bot] avatar drev74 avatar ripienaar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

asyncjobs's Issues

autoscale / allocate jobs based on metrics use case

I have a use case where i need to run long running jobs on hetzner where the hetzner robot allows me to add and remove vms.

So it will allow me to autoscale, on cheap hardware.

In order to do this i need to detect RAM and CPU usage on each server where the async jobs agent runs. Detection is pretty easy.

Logic is:

  • if server is below 10% utilisation, put server into "blocked" mode, block new job allocations, and move any long running jobs off to servers with 50 to 80% utilisation. Its essentially rebalancing, so we can kill servers.
  • if server has more than 80% utilisation, start a new server, and let it take jobs off the queue.
  • on new job, find server with lowest utilisation that is not "blocked".

so that the logic of the 2 use cases can be don, NATS KV would be an easy win.
If each agent sends metrics every 1 minute, and NATS KV flushes KV on a TTL of 1 hour, it will self run.

Then core can issue jobs based on these metrics.

Tasks DLQ

Tasks that reach some kind of final state other than ok/handler error should enter a DLQ. This would be tasks past deadlines etc.

This is just a stream with task entries in it probably but it would be client maintained and would not support things like stream limits based DLQ

docs: how to contribute in wiki

Hello

I'd like to contribute in wiki docs. However, when I fork the project, the wiki is not forked πŸ™„ . There are few ways to work around that.
Which is your way for wiki contributors ? πŸ€”

Task hander should support middleware

It should be able to wrap a task handler in another - for example to create extra logging without modifing existing handlers.

Just like go http middleware essentially.

ajc tasks cron scheduler is not a thing?

I'm trying to follow the docs for scheduled tasks. It appears that you create scheduled tasks through Golang, but then a separate process must be running for said tasks to be executed. Please correct me if this is wrong. According to the docs, the process to run is:

ajc task cron scheduler $(hostname -f) --monitor 8080

This does appear to be something ajc knows anything about, instead I just get a list of valid commands. This list shows that there is a tasks command but no mention of a cron subcommand for it.

bug: panic on /metrics endpoint registration for multiple instances

Hello I'm using this lib in my code somehow like this:

type QueueManager struct {
  Client *aj.Client
  Router *aj.Mux
}

func (m QueueManager) Run(ctx context.Context) error {
  return m.Client.Run(ctx, m.Router)
}

go p.Pong.Run(ctx)

When I run multiple tests, I'm getting the following error:

panic: http: multiple registrations for /metrics

goroutine 27 [running]:
net/http.(*ServeMux).Handle(0x121e8a0, {0xc066a9, 0x8}, {0xd8bc20?, 0xc000448c60})
        /usr/local/go/src/net/http/server.go:2478 +0x226
net/http.Handle(...)
        /usr/local/go/src/net/http/server.go:2521
github.com/choria-io/asyncjobs.(*Client).startPrometheus(0xc0001bd7d0)
        /home/bku/work/go/pkg/mod/github.com/choria-io/[email protected]/client.go:137 +0xce
github.com/choria-io/asyncjobs.(*Client).Run(0xc0001bd7d0, {0xd8fbf8, 0xc00003a0c8}, 0xc0001ae358?)
        /home/bku/work/go/pkg/mod/github.com/choria-io/[email protected]/client.go:81 +0x1bc
github.com/neurodyne-web-services/api-gateway/internal/worker.QueueManager.Run({0xc0001bd7d0?, 0xc0001bda70?}, {0xd8fbf8?, 0xc00003a0c8?})

This happens because of hardcoded /metrics endpoint, which reuses a common http mux for each instance, thus yielding in a panic.

Solution will follow in PR

Add wasm based tasks

Would allow nats obj to store the wasm and then run the wasm.

why ?

  • easier / lighter deployment
  • Wazero is getting ability to run python, rust, js based code as wasm.
  • gotip compiles to wasi now. Been using it and it’s much easier Han tinygo.
  • docker still used for complex tasks.

https://github.com/bots-garden/capsule Is a pure gol g wasm runner. So a decent candidate . Has no db dependencies. Used to work with nats butbis being refactored to have more formal API encapsulation.

@ripienaar if you have time let me know what you think

there are other Wazero based runners. Capsule is just one

Retrying expired tasks do not work

When a task is expired retrying it does not work - the expire time remains in the past.

So the question is should this work and what about the deadline? We have a few options

  • Calculate the delta between create and expire and set a new expire matching that delta
  • Accept a new deadline
  • Fail as unretryable

We should probably support the first 2 options

very awesome :)

Just wanted to say how great this looks. Thank you for all the hard work on this @ripienaar

I am going to try it out and feedback if thats ok ?

The wiki really helped me see how to use this. Is a good idea.

Love how the CLI wraps the NATS CLI to give an easy way to interact with the system.

formalize external script handlers

like Mux#RequestReply we should have a handler factory for calling scripts, CLI should use that instead of its own bespoke implementation.

asyncjobs file should support setting up external script handlers, docker packager should support copying a directory of scripts into the container on a well known path.

Add a generic storage init command

We now have quite a bit of streams and things to configure and each have a command of their own - fine for queues and such - but for the general storage i think ajc init --memory --replica 3 kind of command is needed

Support packaging many handlers into a ready made container

Given a asyncjobs.yaml with the following content:

nats: AJC
queue: DEFAULT
name: github.com/ripienaar/handlers
asyncjobs: latest
tasks:
  - type: email:new
    package:  github.com/myco/email-handler/newEmail

If github.com/myco/email-handler/newEmail had a method AsyncJobHandler() that implements HandlerFunc we should be able to generate a stub go program that runs all these handlers with sane defaults and package it all into a docker container.

Detect stale active states

At present we refuse to pocess tasks in the active state - but his can be stale if a handler crashes.

So we should check if it's been in active state longer than queue max age and then continue.

fail to run a docker hanlder

I'm trying to run a demo handler following this. Getting the following error: Choria Async Jobs Handler Failed: only states completed, expired or terminated can be discarded

To reproduce:

  1. Created a NATS context with this: XDG_CONFIG_HOME=pwd nats context add AJ_API --server nats://hyp:4222
  2. chmod 777 -R ./nats
  3. docker run -it -v $PWD/nats:/handler/config/nats -e AJ_NATS_CONTEXT=AJ_API -p 8087:8080 --rm xxxx/asynq:test

Container failed to start with the following:

docker run -it -v $PWD/nats:/handler/config/nats -e AJ_NATS_CONTEXT=AJ_API -p 8087:8080 --rm
 xxxx/asynq:test                                                                                                                                           
INFO[13:21:22] Choria Async Jobs Handler Service git.example.com/example build settings                                                                       
INFO[13:21:22] NATS Context: AJ_API                                                                                                                           
INFO[13:21:22] Work Queue: SDK                                                                                                                                
INFO[13:21:22] Concurrency: 12                                                                                                                                
INFO[13:21:22] Retry Policy: 1m                                                                                                                               
INFO[13:21:22] Discard State: completed                                                                                                                       
INFO[13:21:22] Prometheus Port: 8080                                                                                                                          
Choria Async Jobs Handler Failed: only states completed, expired or terminated can be discarded  

OS: Ubuntu 21.10
AJ: v0.0.7

Scheduled Tasks

A scheduler service should be made that reads a set of schedules off a stream - much like tasks - and then on a cron like interval creates those tasks.

It should probably support running more than 1 and support leader election.

Optionally discard completed tasks

To avoid unbounded growth the client should be able to delete tasks in certain end states.

Completed seems obvious and we will start with that.

Client will still do final task update and then do a msg delete right after. When we have life cycle events probsbly we will skip this final task update.

This should be a ClientOpt like DoscardCompleted() or DiscardTaskStates(….)

Client lifecycle events

Clients should publish events such as start/stop, pre/post handling etc. These should be light weight and not contain all the task details, just IDs etc.

Clients probably should support a user supplied unique identifier that will also surface in prometheus

REST Service to Enqueue Tasks

A REST service should be able to front a specific queue or multiple queues, it should accept requests over HTTP(s) and enqueue into JetStream

How to pass username and password to ajc

When starting my nats jetstream server in dev mode, I see the username and password printed to the console. I can use these credentials and successfully connect in go, but when I start ajc I get:

ajc runtime error: nats: Authorization Violation

Presumably this means I need to pass this into ajc somehow, but see no docs about how to do so.

add noop logger

Default AJC logger generates too much noise in application code. I suggest to add a noop logger, which is just a placeholder for a custom logger, which is already supported.

Exclusive task processors

With leader election landing in #18 we should consider a mode where multiple processors on a queue would leader elect and only 1 would process jobs, handy for ordering related things or where running jobs maintains a on-disk cache where its beneficial to have as much as possible things on one node etc

bug: task purge won't clean a queue

I have an entry in a queue, whereas a task list claims to be empty

> ajc queue view PONG                                              
PONG Work Queue:
         Entries: 1 @ 178 B
    Memory Based: false
        Replicas: 1
  Archive Period: forever
  Max Task Tries: 20
    Max Run Time: 1h0m0s
  Max Concurrent: 100
     Max Entries: unlimited
      First Item: 03 May 22 15:11:59 UTC (2m5s)
       Last Item: 03 May 22 15:11:59 UTC (2m5s)

> ajc task ls 
ajc runtime error: no tasks found

I assume that purging all tasks should clean all queues as well

Ubuntu 21.10
I'm using AJC v0.0.7

feat: add context switch to ajc

I use ajc during my local dev. I run a local server with nats server run --jetstream AJC

If I need to restart the server or reboot, NATS won't allow me to pick this context again with the same command, returning the error:

nats: error: context AJC already exist, choose a new instance name or remove it with 'nats context rm AJC', try --help

I rather need to do 😲

nats server run --jetstream BJC
nats context select BJC
ajc info --context BJC

Thus all follow-up commands require extra typing to specify a new context. That's a dev nightmare.

I would rather have a context switch option with --select, like it was implemented in nats-cli. This would allow to seamlessly jump around contexts to drop old context or switch between dev/test/prod envs.

What do you think ? πŸ€”

failed to run a docker example

Hello

I've been following this.
I was able to build an image, but I'm getting a permissions error trying to run the newly generated container.

docker run -it -v $PWD/nats:/handler/config/nats -p 8086:8080 --rm xxxxx/asynq:test       
INFO[08:26:33] Choria Async Jobs Handler Service git.example.com/example build settings                                                                       
INFO[08:26:33] NATS Context: AJC                                                                                                                              
INFO[08:26:33] Work Queue: SDK                                                                                                                                
INFO[08:26:33] Concurrency: 12                                                                                                                                
INFO[08:26:33] Retry Policy: 1m                                                                                                                               
INFO[08:26:33] Discard State: completed                                                                                                                       
INFO[08:26:33] Prometheus Port: 8080                                
Choria Async Jobs Handler Failed: open /handler/config/nats/context/AJC.json: permission denied

OS: Ubuntu 21.10
Asyncjobs: v0.0.7

bump jetstream API

Nats has bumped its API and separated JetStream to its own package. Should I migrate the code to the new js package, so that all key-value will be js.KeyValue instead of nats.KeyValue ?

Allow re-enq'ing a task

At present this will fail due to the header passed to only allow new entries on the subject, but there should be an option to retry a task that reached termination.

For this we need to adjust how queues are written - work item should be by ID - so that we can ensure the item is removed from the queue before re-inserting just in case.

Do this with client.RetryByID(task.ID) and ajc task retry <id>

Support task dependencies

We want to be able to do entire workflows of jobs, the workflows would be DAGs.

The idea is that when a DAG is submitted all the tasks will be created after a BFS of the DAG so that all the tasks have their parents filled in.

ajc will then, when a task with dependencies is handled, first check the dependent tasks are ok. It will optionally load up all their results into the task also.

For now we should just support basic n ancestors of a task.

feat: protobuf serialization

Hello

Currently, I'm working with a double queue design. Few services pass data thru two queues.
I'm dissatisfied with the impl of NewTTask. Internally, it calls json.Marshal, which messes the data if I pass already binary pre-encoded input from my service. I need to decode JSON multiple times on each level.

Protobuf pros:

  • faster serdes
  • higher compression
  • lower memory/bandwith footprint
  • de-facto standard for CNCF tools (docker, kuber, grafana, etc)

Protobuf cons:

  • need to maintain a proto definition files
  • complicates a bit release and deployment flow

However, given the whole project goal of providing a simple high performance backplane for microservices, I believe it's worth implementing.

Would you consider moving to proto? I may take a lead in implementataion

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.