Code Monkey home page Code Monkey logo

faktory_worker_haskell's People

Contributors

cbeav avatar cdparks avatar eborden avatar jdreaver avatar pbrisbin avatar restyled-commits avatar z0isch avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Forkers

fossas jagonalez

faktory_worker_haskell's Issues

Allow workers to specify or retrieve a `WorkerId`

Right now, I am unable to specify or retrieve the WorkerId of any particular worker. Having the WorkerId is useful for me for logging and monitoring purposes.

I would be okay with either being required to run randomWorkerId and pass the resulting ID into a worker configuration, or having some kind of IO WorkerId to retrieve a worker's ID.

Re-implement as MTL

Idea: Re-implement internals in MTL-style

Optionally, continue to expose all the same IO versions.

Benefits

Generally speaking, things that use Monad(Unlift)IO compose better into our
various applications. Using MonadLogger will also get Faktory-related logging
into the general stream with everything else.

Concretely, getting perform into a MonadFaktoryPerform type-class means that
enqueuing normally (FaktoryPerformT) or as a part of a batch (FaktoryBatchT)
will look exactly the same at the call-site (perform). This will also be a much
cleaner way to handle default JobOptions, which will prevent bugs.

Since we'll be implementing a new worker loop function, we can take this chance
to have it loop over Job args (not args), which is required if they want to
do things that require knowing Job metadata, such as submit progress updates
visible in TRACK calls or produce new Jobs into their own BATCH.

Proposed Ergonomics

Given:

data Payload = Payload
  deriving stock Generic
  deriving anyclass (FromJSON, ToJSON)

Producer / Perform

Current usage:

import Faktory.Job
import Faktory.Producer

main :: IO ()
main = bracket newProducerEnv closeProducer $ \producer ->
  perform empty producer Payload

Simplest MTL-ified usage:

import Control.Monad.Faktory
import Control.Monad.Faktory.Perform
import Control.Monad.Faktory.PerformDefault

main :: IO ()
main =
  runStderrLoggingT
    $ runFaktoryEnvT
    $ runFaktoryPerformT
    $ perform Payload

Real-world MTL usage:

-- Same imports as above

data App = App
  { --
  , appFaktoryClient :: Faktory.Client
  }

instance HasFaktoryClient App where
  faktoryClientL = lens appFaktoryClient $ \x y -> x { appFaktoryClient = y }

newtype AppT m a = AppT (ReaderT App m a)
  deriving newtype (..., MonadReader App)
  deriving MonadFaktory via (ActualFaktory m)
  deriving MonadFaktoryPerform via (DefaultFaktoryPerformm)

main :: IO ()
main = do
  app <- loadApp
  runApp app $ perform Payload

Doing a perform with explicit options not show. I'm thinking local would work to accomplish that:

local (faktoryJobOptionsL <>~ retry 3)) $ perform Payload

Batch

Current usage:

import Faktory.Ent.Batch
import Faktory.Job
import Faktory.Producer

main :: IO ()
main = bracket newProducerEnv closeProducer $ \producer -> do
  onComplete <- buildJob mempty producer myJob

  runBatch (complete onComplete <> description "My Batch") producer $ do
    void $ batchPerform mempty producer Payload
    void $ batchPerform mempty producer Payload

Simplest MTL-ified usage:

import Control.Monad.Faktory
import Control.Monad.Faktory.Ent.Batch
import Control.Monad.Faktory.Perform
import Control.Monad.Faktory.PerformDefault

main :: IO ()
main =
  runStderrLoggingT $ runFaktoryEnvT $ runFaktoryPerformT $ do
    onComplete <- buildJob myJob

    runFaktoryBatchT (complete onComplete <> description "My Batch") $ do
      perform Payload
      perform Payload

Real-world MTL usage:

-- Same App setup as Perform
-- Same imports as above

main :: IO ()
main = do
  app <- loadApp
  runApp app $ do
    onComplete <- buildJob myJob

    runFaktoryBatchT (complete onComplete <> description "My Batch") $ do
      perform Payload
      perform Payload

Worker

Current usage:

import Faktory.Worker

handle :: Payload -> IO ()
handle = undefined

main :: IO ()
main = runWorkerEnv $ handle

Simplest MTL-ified usage:

import Control.Monad.Faktory
import Control.Monad.Faktory.Worker

main :: IO ()
main =
  runStderrLoggingT
    $ runFaktoryEnvT
    $ runFaktoryWorkerEnvT
    $ liftIO . handle . jobArg

Real-world MTL usage:

-- Same App setup as Perform
-- Same imports as above

handle
  :: ( MonadIO m
     , MonadLogger m
     , MonadReader env m
     , MonadThis m
     , MonadThat m
     , HasThis env
     , HasThat env
     )
  => Job Payload
  -> m ()
handle = undefined

main :: IO ()
main = do
  app <- loadApp
  runApp app $ runFaktoryWorkerEnvT handle

Rough Sketch

⚠️ I got carried away writing the code, so expand at your own risk. What I
chose to flesh out may not be interesting, and I might've omitted important
stuff. Continue at your own risk.

Control.Monad.Faktory:

class Monad m => MonadFaktory m where
  commandByteString
    :: ByteString
    -> [ByteString]
    -> m (Either String (Maybe ByteString))

-- Implemented in terms of ^

command_ :: MonadFaktory m => ByteString -> [ByteString] -> m ()

commandOK :: MonadFaktory m => ByteString -> [ByteString] -> m ()

commandJSON
  :: FromJSON a => ByteString -> [ByteString] -> m (Either String (Maybe a))

Control.Monad.Faktory.Actual

Example instance, for use with deriving via:

class HasFaktoryClient env where
  faktoryClientL :: Lens' env Client

instance HasFaktoryClient Client where
  faktoryClientL = id

newtype ActualFaktory m a  = ActualFaktory
  { unActualFaktory :: m a
  }
  deriving newtype (..., MonadReader env)

instance (MonadIO m, MonadLogger m, MonadReader env m, HasFaktoryClient env)
    => MonadGitHub (ActualFaktory m) where
    commandByteString = do
      client <- view faktoryClientL
      liftIO $ -- the thing

newtype FaktoryT m a = FaktoryT (ReaderT Client m a)
  deriving MonadFaktory via (ActualFaktory (Faktory m)

runFaktoryT :: FaktoryT m a -> Client -> m a
runFaktoryT (FaktoryT r) = runReaderT r

runFaktoryEnvT :: MonadUnliftIO m => FaktoryT m a -> Client -> m a
runFaktoryEnvT = bracket newClientEnv closeClient runFaktoryT

Control.Monad.Faktory.Perform

class Monad m => MonadFaktoryPerform where
  defaultJobOptions :: m JobOptions
  defaultJobOptions = pure mempty

  perform :: Job args -> m ()

Control.Monad.Faktory.Perform.Default

newtype DefaultFaktoryPerform m a = DefaultFaktoryPerform
  { unDefaultFaktoryPerform :: m a
  }
  deriving newtype (..., MonadFaktory)

instance MonadFaktoryPerform (DefaultFaktoryPerform m) where
  perform args = do
    job <- buildJob defaultJobOptions args
    commandOK "PUSH" [encode job]

newtype FaktoryPerformT m a = FaktoryPerformT
  deriving MonadFaktoryPerform via (DefaultFaktoryPerform (FaktoryPerformT m))

runFaktoryPerformT :: FaktoryPerformT m a -> m a
runFaktoryPerformT (FaktoryPerformT r) = runReaderT r mempty

Control.Monad.Faktory.Perform.Reader

class HasFaktoryJobOptions env where
  faktoryJobOptionsL :: Lens' env JobOptions

instance HasFaktoryJobOptions JobOptions where
  faktoryJobOptionsL = id

newtype ReaderFaktoryPerform m a = ReaderFaktoryPerform
  { unReaderFaktoryPerform :: m a
  }
  deriving newtype (..., MonadFaktory, MonadReader env)

instance (MonadReader env m, HasFaktoryJobOptions env)
  => MonadFaktoryPerform (ReaderFaktoryPerform m) where
  defaultJobOptions = ask

  perform args = do
    job <- buildJob args -- uses defaultOptions
    commandOK "PUSH" [encode job]

newtype FaktoryPerformT m a = FaktoryPerformT (ReaderT JobOptions m a)
  deriving newtype (MonadReader JobOptions)
  deriving MonadFaktoryPerform via (ReaderFaktoryPerform (FaktoryPerformT m))

runFaktoryPerformT :: FaktoryPerformT m a -> JobOptions -> m a
runFaktoryPerformT (FaktoryPerformT r) = runReaderT r

Control.Monad.Faktory.Batch

Users would always interacts with concrete stack for this one.

newtype FaktoryBatchT m a = FaktoryBatchT (ReaderT BatchId m a)
  deriving newtype
    ( ...
    , MonadReader BatchId
    , MonadFaktory
    )

runFaktoryBatchT :: MonadFaktory m => FaktoryBatchT m a -> m a
runFaktoryBatchT (FaktoryBatchT r) = do
  batchId <- ...
  runReaderT r batchId

instance MonadFaktoryPerform m => MonadFaktoryPerform (FaktoryBatchT m) where
  defaultJobOptions = do
    batchId <- ask
    options <- lift defaultOptions
    pure $ options <> custom (CustomBatchId batchId)

  perform = lift . perform

Control.Monad.Faktory.Worker

data WorkerEnv -- WorkerId, WorkerSettings

newtype FaktoryWorkerT m a = FaktoryWorkerT
  { unFaktoryWorkerT :: ReaderT WorkerEnv m a
  }
  deriving newtype
    ( ...
    , MonadFaktory
    , MonadFaktoryPerform -- Just in case
    )

runFaktoryWorkerT
  :: (MonadUnliftIO m, MonadFaktory m)
  => (Job args -> FaktoryWorkerT m a)
  -> WorkerSettings
  -> m a
runFaktoryWorkerT f settings = do
  bracket (initializeWorker workerId) cleanupWorker $ \env ->
    flip runReaderT env $ unFaktoryWorkerT $ untilHalt $ do
      -- Thanks to MonadFaktory superclass
      emJob <- commandJSON "FETCH" [queueArg workerQueue]

      case emJob of
        -- Check for errors, handle delay loop, run f on the job call. ACK or FAIL

runFaktoryWorkerEnvT
  :: (MonadUnliftIO m, MonadFaktory m)
  => (Job args -> FaktoryWorkerT m a)
  -> m a
runFaktoryWorkerEnvT f = do
  settings <- workerSettingsEnv
  runFaktoryWorkerT f settings

Upload to Hackage

What's left? Do we need anything else to go to hackage and make an official release?

Built-in Bugsnag support

Clients that wish to notify Bugsnag of Job failures need to catch-and-rethrow themselves:

runWorker settings workerSettings $ \job -> handleAny handler $ do
  -- consumer logic
 where
  handler ex = do
    notifyBugsnag bugsnagSettings ex
    throwIO ex

We could easly do that ourselves in processorLoop, if so configured:

  Right (Just job) ->
        processAndAck job
          `catches` [ Handler $ \(ex :: WorkerHalt) -> throw ex
-                   , Handler $ \(ex :: SomeException) ->
+                   , Handler $ \(ex :: SomeException) -> do
+                     traverse_ (`notifyBugsnag` ex) settingsBugsnagSettings
                      failJob client job $ T.pack $ show ex
                    ]

This would hopefully remove the most common reason for the catch-and-rethrow pattern, and so solve the possible foot-guns with that pattern (forgetting to re-throw, mishandling WorkerHalt, etc).

As a team that is doing this ourselves, we've also learned to:

  • Set context = {job name}
  • Only notify if retries-remaining is 0

We could do both of those things internally too (assuming jobtype = {job name}).

Alternatives

  • Add a call to settingsLogError here, let clients notify themselves via that hook

    This hook takes a textual error message, so we lose a lot of power in notifying a proper Exception. Callers couldn't set context or disable based on retries count (basically you don't know the Job when this hook is called).

  • Add a new settingsOnException to address above

    This would support Bugsnag well and any other error-reporter generically. We could pass the Job to this function too, so that callers can setup context and/or base behavior on retries-remaining.

    If we did this, I would probably add settingsOnReservationExpired too, which currently goes to settingsLogError too.

/cc @cbeav

Extract Processor module

Being a job processor (vs just a producer) is its own concern that will gain some complexity as we add more features (e.g. BEAT). We should have a dedicated module with a nicer interface for such use-cases.

Support version 0.9.0

faktory_worker_haskell is built for version 0.8.0. We need to make sure it works with 0.9.0.

Delayed Jobs

The Protocol supports an at parameter which is a date-time to run the Job (no-earlier-than semantics, I'd guess). For some naming inspiration, the Ruby client's interface for this is #perform_(at|in) methods.

Lost connection doesn't terminate process

Twice now in production, we've experienced that restarting the Faktory server doesn't cause consumer processes to crash. We see they have trouble with the socket:

...: Network.Socket.sendBuf: resource vanished (Broken pipe)

But the process stays up -- though it doesn't accept any Jobs. It also has very high CPU when in this state. (We also observed issues producing Jobs to Faktory while such consumers were running, but I wasn't able to reproduce this in isolation.)

This is bad because nothing replaces the now-broken service and no work is being done until a deployment happens to "kick" the service. This can be very bad if that work was meant to happen in a quiet time and the deployment causes it to pick up again in a busy time.

Reproduction Steps

  1. Run faktory (via docker)

  2. Run consumer example

  3. Restart faktory

  4. Observe:

    • The process sees errors but doesn't crash

      Starting consumer loop
      [ERROR]: SHUTDOWN Shutdown in progress
      [ERROR]: No more input
      faktory-example-consumer: Network.Socket.sendBuf: resource vanished (Broken pipe)
      
    • Jobs (produced with the example-producer) are ignored

    • Also, my laptop started going crazy with this processing take 100% CPU.

Expected Outcome

These errors should crash the consumer process.

/cc @eborden @jdreaver

Faktory timeout is not handled

Faktory utilizes a timeout for all jobs. A job must Ack within a given period or Faktory will assume it has died and retry (or let die) the respective job. This property can be set with reserve_for and is described in the worker lifecycle wiki entry.

faktory_worker_haskell does not utilize this timeout. This can result in zombie jobs. A worker that exceeds the reserve_for will continue functioning while Faktory may have passed the job to another worker. This can cause duplicate effort, performance issues, and logical bugs.

Faktory worker should utilize the reserve_for to set an internal timeout. This timeout can throwTo the appropriate thread, ensuring operational consistency with Faktory and avoiding duplicate work issues.

Separate `Settings` types for workers and clients

Although it makes sense that workers internally call newClient and use the same underlying client type as job producers, this makes for a very confusing API for newcomers. Here are some points of confusion I had:

  • Why does Settings have a Queue? What does this do when I'm using the client to push jobs? What if the job itself has its own Queue? Do I need a separate client for every queue I want to push to?
  • Why does Settings have a Maybe WorkerId? How do I use this with runWorker? What does this do when I'm using this client to push jobs?

I would prefer if there were clearer, separate APIs for the role of a job consumer and a job producer. In my mind, there should be "Producer" and "Consumer" abstractions that both use a lower-level "Client" implementation (that people can use to send raw commands, etc.).

New release

v1.0.0.0 is latest on Hackage and is missing 20f58a7, that means it can't be used in the latest LTS. We should release a new version with this fix.

Document against the Ruby or Go client

As far as I can tell, Faktory considers either its Ruby or Go client as "specifications by implementation" for how a Faktory client ought to behave.

It would be nice to put a markdown document in this repository that shows code snippets for each part of the API in Ruby or Go and then the corresponding Haskell that our client supports for the same operation.

We can highlight any gaps or when our ergonomics differ, and seek to normalize to the specification over time whenever it makes sense (or explain when it doesn't).

Make consumer delay configurable

When there are no Jobs to consume, we sleep for 1s to avoid a tight loop when idle. It would be nice to make this configurable in Settings.

Suggestion: runWorkerHandle

Motivation

In the README, we have:

workerMain = do
  settings <- envSettings

  runWorker settings $ \job ->
    -- Process your Job here
    putStrLn $ myJobMessage job

    -- If any exception is thrown, the job will be marked as Failed in Faktory
    -- and retried. Note: you will not otherwise hear about any such exceptions,
    -- unless you catch-and-rethrow them yourself.

That comment had me thinking:

workerMain = do
  settings <- envSettings

  runWorkerHandle settings
    (\ex ->
      -- Do something with any processing exceptions
      hPutStrLn stderr $ show ex
    )
    (\job ->
      -- Process your Job here
      putStrLn $ myJobMessage job
    )

would be a nice way to allow users to observe exceptions but avoid them forgetting to re-throw and ensure the Job gets marked Failed in Faktory.

Currently, this is accomplished with (e.g.):

workerMain = do
  settings <- envSettings

  runWorker settings $ \job -> handleAny f $ do
    -- Process your Job here
    putStrLn $ myJobMessage job

f ex = do
  hPutStrLn stderr $ show ex
  throwIO ex -- <-- important to remember!

Naming Alternatives

The naming is TBD, but I took it from the idea that handle takes the exception handler first, which I like here because:

runWorker settings = runWorkerHandle settings $ const $ pure ()

runWorkerCatch is another obvious name, but that would imply (IMO) that the exception handler is taken second, which doesn't compose as nicely to runWorker. We could easily provide both.

runWorkerHandling and/or runWorkerCatching are other decent name choices, IMO.

Add `BatchId` instances and hide constructor

BatchId does not currently have some of the usual suspects for instances (Show, Eq, Ord ...) which makes it tough to integrate into other date types.

It also exposes the constructor which is probably not the way we want a client to interact with this data type.

Set up CI

We should definitely add hspec and/or doctests at some point, but I usually try to start with at least one end-to-end test that confirms things are not totally broken. I think it would be reasonable to use the producer/consumer examples from a cram to assert we can submit and process jobs.

Allow workers to specify a name

Right now, it looks like the worker name (as viewed in the busy dashboard in the Faktory UI) is set automatically by combining the Faktory server hostname and the worker's process ID. Since my workers all connect to the same Faktory server and they are all forks running in the same process, all of my workers appear to have the same name in the Faktory UI. This, combined with #47, makes it very difficult to tell what Faktory believes what any particular worker's job is.

I would like to be able to specify the worker's name as a setting.

Failure to parse payload doesn't report a FAIL

Behavior: when a Job fails to parse as JSON, it is not immediately reported as a failure. It hangs around in Enqueued until the reservation expires. At which point it may retry and do exactly that again.

Why is this bad: having Jobs hang around in Enqueued for an additional 30 minutes can stress Faktory too much and lead to an outage. We expect such cases to follow the usual FAIL behaviors, whatever they are, for that Job. We may make backwards-incompatible changes to Job structures, and are OK with in-flight Job loss -- but the 30 minute delay on each attempt throws a wrench in those plans.

Likely reason: When we FETCH to Job SomeAppType, if the args don't parse as SomeAppType, we don't even get back the Job part. Without that, we don't know the jid and can't report the FAIL. To Faktory, the Job was consumed and is in-progress -- until the reservation expiry.

Possible fix: We could FETCH to Job Value then do Value -> SomeAppType later. If that fails, we still have the Job wrapping and can FAIL it.

It's possible Apps would prefer to deal with failures to parse themselves. Maybe they want to ignore it entirely, or log it differently than settingsLogError. We should probably consider this use-case in this fix.

fails to build with aeson-2.0

faktory                   > /tmp/stack-24fe4420f69da4df/faktory-1.1.2.0/library/Faktory/Job/Custom.hs:32:23: error:
faktory                   >     • Couldn't match type: HashMap.HashMap k0 v0
faktory                   >                      with: Data.Aeson.KeyMap.KeyMap Value
faktory                   >       Expected: Object
faktory                   >         Actual: HashMap.HashMap k0 v0
faktory                   >     • In the second argument of ‘($)’, namely ‘HashMap.union b a’
faktory                   >       In the second argument of ‘($)’, namely
faktory                   >         ‘Object $ HashMap.union b a’
faktory                   >       In the expression: Custom $ Object $ HashMap.union b a
faktory                   >    |
faktory                   > 32 |     Custom $ Object $ HashMap.union b a
faktory                   >    |                       ^^^^^^^^^^^^^^^^^
faktory                   > 
faktory                   > /tmp/stack-24fe4420f69da4df/faktory-1.1.2.0/library/Faktory/Job/Custom.hs:32:37: error:
faktory                   >     • Couldn't match type: Data.Aeson.KeyMap.KeyMap Value
faktory                   >                      with: HashMap.HashMap k0 v0
faktory                   >       Expected: HashMap.HashMap k0 v0
faktory                   >         Actual: Object
faktory                   >     • In the first argument of ‘HashMap.union’, namely ‘b’
faktory                   >       In the second argument of ‘($)’, namely ‘HashMap.union b a’
faktory                   >       In the second argument of ‘($)’, namely
faktory                   >         ‘Object $ HashMap.union b a’
faktory                   >    |
faktory                   > 32 |     Custom $ Object $ HashMap.union b a
faktory                   >    |                                     ^
faktory                   > 
faktory                   > /tmp/stack-24fe4420f69da4df/faktory-1.1.2.0/library/Faktory/Job/Custom.hs:32:39: error:
faktory                   >     • Couldn't match type: Data.Aeson.KeyMap.KeyMap Value
faktory                   >                      with: HashMap.HashMap k0 v0
faktory                   >       Expected: HashMap.HashMap k0 v0
faktory                   >         Actual: Object
faktory                   >     • In the second argument of ‘HashMap.union’, namely ‘a’
faktory                   >       In the second argument of ‘($)’, namely ‘HashMap.union b a’
faktory                   >       In the second argument of ‘($)’, namely
faktory                   >         ‘Object $ HashMap.union b a’
faktory                   >    |
faktory                   > 32 |     Custom $ Object $ HashMap.union b a
faktory                   >    |                                       ^

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.