vapor / queues Goto Github PK
View Code? Open in Web Editor NEWA queue system for Vapor.
License: MIT License
A queue system for Vapor.
License: MIT License
Could we add an example to the readme on how to access the database/fluent from the job please?
I haven't figured out what the best way would be... can we pass in a DatabaseConnectable as service? or is it possible to extend the JobContext to be DatabaseConnectable?
(If you have some examples then I would be willing to document it in the readme.)
There's a bit of manual work currently required to get a valid JobsPersistanceLayer
. We should provide some convenience APIs for this like FluentKit does.
Each driver can extend those convenience APIs with functions for supplying the required credentials.
**Is your feature request related to a problem?
Currently, when running a Vapor Queue on a Mac in development, when adding jobs to a queue the jobs are pulled off by multiple eventloops, (assumption) and thus the jobs don't complete in added order. Thus the queue is not FIFO.
Describe the solution you'd like
I would like a setting to make a queue sequential or FIFO.
Describe alternatives you've considered
I can't find any alternatives. Thus I'm blocked with how I can use a Vapor Queue in this critical instance.
Additional context
I understand the value of a queue being processed by multiple event loops. I have other use cases that use multi event-loops successfully. But the need for FIFO is critical, in a different use case.
Specifically what I need: I have a series of jobs that save large amounts of data to a postgres database using Fluent. It is fine that multiple event loops process those jobs because all the save are independent. But here is the kicker. I need to add a completion job to that queue, that executes after all the other jobs complete. As it now stands, that final job, added to the queues last, executes before some of the dependent jobs complete.
A sequential/FIFO is an easy way to ensure the final job is executed after all other jobs complete.
I used ScheduledJob which is scheduled at every minutes.
In some environment, job runs twice quickly at every minutes.
I patched this library to investigate by printing more information to know what happens.
This is a patch.
https://github.com/omochi/queues/pull/1/files
It prints scheduled time previously started job and current time.
This is log.
$ .build/release/Run serve --hostname 0.0.0.0 --port 80
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:16:00 +0000, now=2021-03-03 12:15:26 +0000, prev=nil
[ NOTICE ] Server starting on http://0.0.0.0:80
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:16:00 +0000, now=2021-03-03 12:15:59 +0000, prev=Optional(2021-03-03 12:16:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:17:00 +0000, now=2021-03-03 12:16:00 +0000, prev=Optional(2021-03-03 12:16:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:17:00 +0000, now=2021-03-03 12:16:59 +0000, prev=Optional(2021-03-03 12:17:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:18:00 +0000, now=2021-03-03 12:17:00 +0000, prev=Optional(2021-03-03 12:17:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:18:00 +0000, now=2021-03-03 12:17:59 +0000, prev=Optional(2021-03-03 12:18:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:19:00 +0000, now=2021-03-03 12:18:00 +0000, prev=Optional(2021-03-03 12:18:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:19:00 +0000, now=2021-03-03 12:18:59 +0000, prev=Optional(2021-03-03 12:19:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:20:00 +0000, now=2021-03-03 12:19:00 +0000, prev=Optional(2021-03-03 12:19:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:20:00 +0000, now=2021-03-03 12:19:59 +0000, prev=Optional(2021-03-03 12:20:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:21:00 +0000, now=2021-03-03 12:20:00 +0000, prev=Optional(2021-03-03 12:20:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:21:00 +0000, now=2021-03-03 12:20:59 +0000, prev=Optional(2021-03-03 12:21:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:22:00 +0000, now=2021-03-03 12:21:00 +0000, prev=Optional(2021-03-03 12:21:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:22:00 +0000, now=2021-03-03 12:21:59 +0000, prev=Optional(2021-03-03 12:22:00 +0000)
First, job is scheduled at 12:16:00
at (now=)12:15:26
.
Next, job is started at (now=)12:15:59
.
This is problem.
This job is actually scheduled at 12:16:00
but started earlier it.
So next schedule is also 12:16:00
.
Immediately, 12:16:00
comes actually.
This is twice execution process.
After that, same phenomenon happens repeatedly.
Configure job.
app.queues.schedule(JobKickerJob()).minutely().at(0)
try app.queues.startScheduledJobs()
Start app.
Job should run only once at every scheduled interval.
I tried this at many environment.
I got only once environment as below.
swift:5.3
.$ swift build -c release
)iMac Pro 2017
, 2.3 GHz 18 core Intel Xeon WThis bug causes concurrent execution and race condition in our project.
We didn't expect this at all, so debugging was very hard and consume many working time.
I finally reached to this library and very surprised.
It may be duplicate with #85
If I use the example app.. or my own and try the command:
try JobsCommand(application: app, scheduled: true).startScheduledJobs()
It produces the error Assertion failed: JobsCommand did not shutdown before deinit: file
If there is no call to: app.jobs.schedule
prior to calling the . startScheduledJobs()
I get why that is.. but I wonder if there is a better way to handle this.. such as a precondition to give a warning that no jobs of type ScheduledJob
have been put onto the schedule
Scheduling a synchronous job to run hourly runs a few seconds early then reschedules to run at the correct time a few seconds later - thus running twice. Occasionally it will run early more than once (see 08:01:57).
I have worked around this by not running the job if it is running or if it has run within the last minute.
// queue job to fetch currency rates at two minutes past each hour
try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))
app.queues.schedule(fetcher)
.hourly()
.at(2)
try app.queues.startScheduledJobs()
ScheduledJob
should run exactly once on the specified schedule.
ScheduledJob
runs early. For example, if the job is scheduled to run at two minutes past each hour (11:02:00), then it will run at (say) 11.01.57 then reschedule to run at the correct time of 11:02:00 thus running twice.
[ INFO ] 2020-07-31 01:57:58 +0000 Initializing rates (App/CurrencyRatesFetcher.swift:22)
[ INFO ] 2020-07-31 01:57:58 +0000 Try to load rates from file (App/CurrencyRatesFetcher.swift:26)
[ INFO ] 2020-07-31 01:57:58 +0000 Attempting to load cached rates from file file:///app/data/rates... (App/CurrencyRatesStorage.swift:48)
[ INFO ] 2020-07-31 01:57:58 +0000 Loading cached rates from file file:///app/data/rates... (App/CurrencyRatesStorage.swift:55)
[ INFO ] 2020-07-31 01:57:58 +0000 Cached rates: [{
"rates" : {
"BTC" : 8.9993039e-05,
"EGP" : 15.9703,
"MDL" : 16.576971,
"BRL" : 5.1554,
"QAR" : 3.641,
"MWK" : 740.341469,
...
"SGD" : 1.371368,
"YER" : 250.349961,
"FKP" : 0.762474
},
"base" : "USD",
"fetchedAt" : 1596153718.0983748,
"timestamp" : 1596153600
}] (App/CurrencyRatesStorage.swift:57)
[ DEBUG ] Factory created. [RedisConnectionFactory: 8387CBC7-9040-4427-8890-6D06D8C46934] (RedisKit/RedisConnectionSource.swift:20)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 02:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ NOTICE ] Server starting on http://0.0.0.0:8080 (Vapor/HTTP/Server/HTTPServer.swift:183)
[ INFO ] 2020-07-31 02:01:59 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 02:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ ERROR ] CurrencyRatesFetcher failed: inProgress (Queues/QueuesCommand.swift:139)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 03:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 02:02:02 +0000 Fetched 171 currencies at July 31, 2020 at 2:02:02 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 02:02:02 +0000 Currency rates up to date at July 31, 2020 at 2:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 02:02:02 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ INFO ] 2020-07-31 03:01:59 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 03:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ ERROR ] CurrencyRatesFetcher failed: inProgress (Queues/QueuesCommand.swift:139)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 04:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 03:02:00 +0000 Fetched 171 currencies at July 31, 2020 at 3:02:00 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 03:02:00 +0000 Currency rates up to date at July 31, 2020 at 3:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 03:02:00 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ INFO ] 2020-07-31 04:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 04:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 04:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 4:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 04:01:58 +0000 Currency rates up to date at July 31, 2020 at 4:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 04:01:58 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 04:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 04:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 05:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 05:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 05:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 05:01:57 +0000 Fetched 171 currencies at July 31, 2020 at 5:01:57 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 05:01:57 +0000 Currency rates up to date at July 31, 2020 at 5:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 05:01:57 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 05:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 05:01:57 +0000(App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 06:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 06:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 06:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 06:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 6:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 06:01:58 +0000 Currency rates up to date at July 31, 2020 at 6:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 06:01:58 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 06:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 06:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 07:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 07:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 07:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 07:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 7:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 07:01:58 +0000 Currency rates up to date at July 31, 2020 at 7:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 07:01:58 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 07:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 07:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 08:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 08:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 08:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 08:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 8:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 08:01:58 +0000 Currency rates up to date at July 31, 2020 at 8:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 08:01:59 +0000 Skipping fetch. Last fetched at 2020-07-31 08:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 08:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ DEBUG ] 2020-07-31 08:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 08:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 09:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 09:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 09:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 09:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 9:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 09:01:58 +0000 Currency rates up to date at July 31, 2020 at 9:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 09:01:59 +0000 Skipping fetch. Last fetched at 2020-07-31 09:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 09:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ DEBUG ] 2020-07-31 09:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 09:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 10:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 10:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 10:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 10:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 10:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 10:01:58 +0000 Currency rates up to date at July 31, 2020 at 10:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 10:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 10:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 11:02:00 +0000 (Queues/ScheduledJob.swift:36)[ INFO ] 2020-07-31 11:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 11:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 11:01:59 +0000 Fetched 171 currencies at July 31, 2020 at 11:01:59 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 11:01:59 +0000 Currency rates up to date at July 31, 2020 at 11:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 11:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 11:01:59 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 12:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 12:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 12:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 12:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 12:01:58 PM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 12:01:58 +0000 Currency rates up to date at July 31, 2020 at 12:00:00 PM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 12:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 12:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 13:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 13:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 13:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 13:01:59 +0000 Fetched 171 currencies at July 31, 2020 at 1:01:59 PM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 13:01:59 +0000 Currency rates up to date at July 31, 2020 at 1:00:00 PM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 13:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 13:01:59 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 14:02:00 +0000 (Queues/ScheduledJob.swift:36)
Users should be able to specify that the jobs worker should be run in the same process as the web application instead of running via a separate CLI command
If I have a query the database job, what to do?
If you schedule a job using at with a specific date then the scheduled works.. however if you try and setup a job using something like .everySecond()
it never process the jobs
works
public func configure(_ app: Application) throws {
app.jobs.schedule(TestJob()).at(Date(rfc1123: "Sat, 04 Jan 2020 16:47:40 GMT")!)
try app.jobs.use(.redis(url: "redis://127.0.0.1:6379"))
}
Doesn't work
public func configure(_ app: Application) throws {
app.jobs.schedule(TestJob()).everySecond()
try app.jobs.use(.redis(url: "redis://127.0.0.1:6379"))
}
job is just a simple print
struct TestJob: ScheduledJob {
func run(context: JobContext) -> EventLoopFuture<Void> {
print("Job has run \(context)")
return context.eventLoop.makeSucceededFuture(Void())
}
}
if I make the change to ScheduleBuilder:
public func everySecond() {
self.second = 1
}
and put in a breakpoint in the nextDate(current
function I can get it to run once.. but not repeated
It is cool if there is way to customize event loop group for version 0.2.7. Like in new version to reuse application's event loops group
Under hobby account in heroku there is a limit 20 postgres connections for all workers. And JobCommand creates MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
and every thread creates own connections as I understand correctly. So this limit is easy to reach currently
Is your feature request related to a problem? Please describe.
It would be useful to be able to get the RedisConnectionPool
for an EventLoop
. For example, one may want to extend QueueContext
with a Redis property, to allow using Redis from Vapor Jobs.
Describe the solution you'd like
The method pool(eventLoop:)
in Application.Redis
could be made public.
Describe alternatives you've considered
This library could extend each type that could make use of Redis.
QueuesCommand.swift line 101: worker.queue.logger.error("Job run failed: (error)")
yields uninformative errors like "Connection refused" with no context as to what actually failed.
Logging the worker, I see it has some information about the job that would be useful.
Might relate to #79 possibly as without a delay, the logs fill with infinite retries of non-obvious origins.
Possibly just logging the job might suffice.
I want to send a request to another server once the job is triggered
but it's seems impossible to access the container
Update vapor/docs#389 before tagging 1.0
I created a job as defined below it's working fine but once there is an error it doesn't retry again
I added the
context.eventLoop.future(error: Abort(.badRequest, reason: "My error here."))
but the compiler says argument passed to call that takes no arguments
import Foundation
import Jobs
import Vapor
struct SyncCaisseOperationsJob: Job {
let container: Container
init(container: Container){
self.container = container
}
func dequeue(_ context: JobContext, _ data: PushCaisseOperationDataJob) -> EventLoopFuture<Void> {
do {
let service = try container.make(CaisseServices.self)
return try service.pushCaisseOperations(
container,
caisseId: data.caisse_id,
cmdClients: data.data,
urlQuery: data.query
)
} catch {
return Future.map(on: container, { () })
}
}
}
Currently, there is no way to inject a JobIdentifier
for a certain job using queue.dispatch(...)
so that we can use that JobIdentifier
to clear a job from the queue, by calling queue.clear(id)
.
we should avoid using abbreviations
We should automatically add the jobs command in the JobsProvider
now that this is possible in Vapor 4.
services.extend(CommandConfiguration.self) { configuration, container in
try configuration.use(c.make(JobsCommand.self), as: "jobs")
}
It would be advantageous to have access to an instance of Application
when dequeueing a job.
Maybe having it added as a property on the JobContext
func dequeue(_ context: JobContext, _ payload: SomeJob) -> EventLoopFuture<Void> {
context.application.someService.....
}
I've been using the official MongoDB Swift Driver which has been awesome so far.
For Queues, I believe the current community MongoDB Queues package only supports using the third party MongoKitten and not the official MongoDB Swift Driver.
I wanted to raise this in case anyone in the community has created a queues package or is working on one for the official MongoDB Swift Driver.
I think this would be be a great package to have for the community going forward.
Looks like there is no way to make ScheduledJobs run every X hours or X Mins.
The current helpers if I am not mistaken are every 1 min, 1 hour..
I don't see a way to create a ScheduledBuilder and use that to schedule a job.
looks like the jobs.schedule()
calls the mutating function self.storage.configuration.schedule(job, builder: builder)
which is internal.
Hi, Can add an executable target to show sending emails every hour?
Is your feature request related to a problem? Please describe.
It's ugly to run a job like every 10 seconds as the system only offers seconds or minute but no quantifiers.
Describe the solution you'd like
Maybe an option to skip a certain amount of triggers like skip: 10
can still trigger every second but would only invoke the job on the 10th and then on the 20th and so on.
Describe alternatives you've considered
And option to specify after how many seconds or minutes to trigger.
Additional context
I am using an AsyncScheduledJob
to queue up 4 jobs, but I need them to be run one after another instead of in parallel. I can't find any information online about how to do this.
I use the QUEUES package and try to understand how to use it. I have created a simple Job which fetches data from a remote server. I config vapor application with the following function
// configures your application
public func configure(_ app: Application) throws {
// uncomment to serve files from /Public folder
// app.middleware.use(FileMiddleware(publicDirectory: app.directory.publicDirectory))
app.databases.use(.sqlite(.file("db.sqlite")), as: .sqlite)
try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))
//Register jobs
let categoriesFetcherJob = FetcherJob()
app.queues.add(categoriesFetcherJob)
applyWPV2Migrations(app)
try app.queues.startInProcessJobs(on: .default)
// register routes
try routes(app)
}
I dispatch Job from Controller like this:
func index(req: Request) throws -> EventLoopFuture<[String: String]> {
let jobPayload = CategoriesFetcherJobPayload(startPage: 1, categoriesPerPage: 10)
return req.queue.dispatch(FetcherJob.self, jobPayload)
.flatMap { (_) -> EventLoopFuture<[String: String]> in
let res = ["status": "Fetching of remote categories was scheduled successfully"]
return req.eventLoop.makeSucceededFuture(res)
}
}
The issue occurs when I run unit tests not necessarily related to JOB functionality:
func testAssert() throws {
let app = Application(.testing)
defer { app.shutdown() }
try configure(app)
XCTAssertTrue(true)
}
After execution, I always hit assertion assert(self.didShutdown, "JobsCommand did not shutdown before deinit")
#6 0x0000000105535f34 in QueuesCommand.deinit at /queues/Sources/Queues/QueuesCommand.swift:164
OR
`assertionFailure("Command handler deinit when queue is not empty! Queue size: \(self.commandResponseQueue.count)")`
#7 0x00000001053f0a07 in RedisCommandHandler.deinit at /RediStack/Sources/RediStack/ChannelHandlers/RedisCommandHandler.swift:43
Tests do not crash
Hit assertion related to JOB but there is no way to shutdown it properly.
dependencies: [
// ๐ง A server-side Swift web framework.
.package(url: "https://github.com/vapor/vapor.git", from: "4.14.0"),
.package(url: "https://github.com/vapor/fluent.git", from: "4.0.0"),
.package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.0.0-rc.2"),
.package(url: "https://github.com/vapor/queues-redis-driver.git", from: "1.0.0-rc.3"),
// .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.8.0")
],
macOS 10.15.4
The library should allow users to specify a delayed job:
queue.dispatch(job, delay: someDateInTheFuture)
Would be nice to be able to be able to specify a delay between retries.
Maybe something like
protocol Job {
func nextRetry(attempt: Int) -> Date
}
extension Job {
func nextRetry(attempt: Int) -> Date { return Date() }
}
That would allow you to implement exponential backoff etc
Now it's not possible to edit the schedule of a job or to remove a job from the queue.
The queues
package is only able to run up to 1 task per refresh interval, per worker. For example, a system configured with 1 worker and a 10 second refresh interval, running tasks that take 1 second, cannot be utilised beyond 10%, and cannot run more than 1 task every 10 seconds.
In a project with queues and a task configured...
Expected behaviour
All 10 tasks complete one after the other with effectively zero gap between them.
Actual behaviour
The first task completes, and the queue waits until 10 seconds have elapsed before running the next.
queues
1.12.1
vapor-queues-fluent-driver
3.0.0-beta1
I'm not sure whether this is worth filing on the fluent driver repo as well. I think there are two possible scenarios here:
queues
intends that the pop
method in the driver API blocks until a job is available, likely intended for use with BRPOPLPUSH
/etc on Redis. Therefore making this a "bug" in vapor-queues-fluent-driver
or the queues
documentation.queues
, and it should instead re-pop
if any job is run until there is nothing.While the intention of refreshInterval
isn't documented (that I can tell), I think it's most reasonable and would be most expected by those who have used other queueing systems, that it be a period to wait when there is no available work so the tuning of this value only impacts the polling of storage and does not impact the utilisation of queue workers.
I'm frustrated that when I use the excellent Fluent Driver for Queues, the payload/JobData
enforces storage as raw Data
. This ends up meaning that for Postgres and MySQL, it's a column containing [UInt8]
, which is far from ideal.
I assume that this was done for the early drivers, which require storage this way, but it would be good to unpick this.
Offering the payload as Codable and expecting the driver to handle persistence would be a more flexible approach, and perhaps allow the use of things like Postgres' jsonb
type for far easier inspection and debugging of in-flight jobs.
Sorry if this is a bit of an obtuse question, but I'm a bit stumped - do I need to run another vapor instance alongside my "main" instance to run jobs?
It's a bit strange to configure Jobs via the provider init and a struct named JobsConfiguration
. Could we combine these?
This should be as simple as making startJobs
and startScheduledJobs
public in JobsCommand
We should add a dependency to Swift Metrics and output based metrics from the package like jobs queued, time taken to process a job and success/failure status
At the moment the event delegate only works for regular jobs. You can work around it, but I think it would be nice to have it "in-build".
We could possibly add the notification hook to the method, where the job is about to run. I think its in ScheduledJob
on line 49.
But also I think we need to define a different event delegate for a schedule job as well.
If redis close connection and not send event to vapor, then Job failed and don't up
Currently job handlers are looked up by their Data
associated type. This could lead to a potential ambiguity. For example, what would happen if two job handlers used [String: String]
as their data type?
Maybe we could do something like:
func dispatch<Job>(_ job: Job.Type, _ data: Job.Data) { ... }
That could look like:
req.jobs.dispatch(EmailJob.self, .init(message: "hi"))
We can remove this deprecation now that we're about to tag a V1
We should provide a console warning when multiple jobs are registered with the same JobData
name to help prevent collisions during decoding.
It would be nice to give the user the ability to control the number of workers they wish to have, instead of being forced to use the default of 1 per core.
Possible API:
app.queues.configuration.numWorkers = 2
Because Queues no longer supports 5.2 due to a few missing self.
in QueueWorker.swift the test for Vapor with Queues as a provider now fail, see for example: https://github.com/vapor/vapor/pull/2511/checks?check_run_id=1272354381
I suggest we either drop down to 5.2 in the test or test for both 5.2 and 5.3. On a related note, the README also says Queues supports 5.2.
How would you go about testing a job since it is run on a thread that does not notify when it is complete? Right now I am just calling sleep(10) before running my asserts, but I would like to be able to know when an AsyncJob is done.
Currently we create one ScheduledJobWorker
per event loop. As far as I can tell, there is nothing in place to make sure that the job is only performed once and on one thread so that multiple workers do not execute the same job at the same time.
In order to fully complete conformance with the reliable queue pattern, the library should expose a hook that allows drivers to move idle jobs in processing back to the upcoming queue stack. This would be useful for jobs that get stuck if the process crashes.
This is what a min-viable setup for vapor + jobs looks like currently:
let app = Application(environment: env)
app.provider(JobsProvider())
app.register(JobsDriver.self) { app in
return TestDriver(on: app.make())
}
app.register(extension: JobsConfiguration.self) { jobs, app in
jobs.add(FooJob())
}
app.get("foo") { req in
return req.jobs.dispatch(FooJob.Data(foo: "bar"))
.map { "done" }
}
return app
Not bad at all, but I have some ideas. What about something like this:
let app = Application(environment: env)
app.provider(JobsProvider())
app.jobs.driver(TestDriver())
app.jobs.add(FooJob())
app.jobs.add(BarJob())
app.jobs.add(QuxJob())
app.get("foo") { req in
return req.jobs.dispatch(FooJob.self, .init(foo: "bar"))
.map { "done" }
}
return app
I have found a small issue when making use of startInProcessJobs function to run in process worker.
In configure we call:
app.queues.startInProcessJobs(on .default)
However when server is exited using ctrl+c, there will be an error printed from QueuesCommand deinit
JobsCommand did not shutdown before deinit
I think we need to QueuesCommand part of lifecycle so its shutdown function can be called properly?
THanks for your help!
I've been working on porting the work I did for the earlier Jobs PostgreSQL driver over to the new Queues library, and I keep coming up against wanting to use Fluent to provide a JobData
wrapper that could be constructed using Fluent.
It got me to thinking: why not provide a single Queues driver that backs onto whatever Fluent-backed database the user is already using?
Does this sound feasible?
QueueService is a bit general and is one of the only types that doesn't have Job
in the name. Maybe it would be more consistent if we changed the name slightly.
Right now the smallest unit of time scheduled jobs support is a minute. This makes waiting for the scheduled jobs test to run pretty annoying.
Maybe we should add seconds or a schedule(at:Date)
method? This would allow us to run the tests a lot faster and could be useful to the end user.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.