Code Monkey home page Code Monkey logo

flo's Introduction

flo    CircleCI Codecov Maven Central License

Please note that we, at Spotify, have ceased further development of flo, so no new features will come; on the other hand, we will fix critical issues.

flo is a lightweight workflow definition library

  • It's not a workflow management system
  • It's not a workflow scheduler

Some key features

  • Programmatic Java and Scala API for expressing workflow construction (task DAG expansion)
  • Use of arbitrary program logic for DAG expansion
  • Recursive definitions
  • Lazy DAG expansion
  • DAG serialization (for 3rd party persistence)
  • Extensible DAG evaluation

Dependency

<dependency>
  <groupId>com.spotify</groupId>
  <artifactId>flo-workflow</artifactId>
  <version>${flo.version}</version>
</dependency>
"com.spotify" %% "flo-scala" % floVersion

JavaDocs here: http://spotify.github.io/flo/maven/latest/apidocs/

Table of contents

Quick Example: Fibonacci

Fibonacci serves as a good example even though it's not at all the kind of thing that flo is meant to be used for. Nevertheless, it demonstrates how a task DAG can be recursively defined with arbitrary logic governing which inputs are chosen.

class Fib {

  static Task<Long> fib(long n) {
    TaskBuilder<Long> builder = Task.named("fib", n).ofType(Long.class);
    if (n < 2) {
      return builder
          .process(() -> n);
    } else {
      return builder
          .input(() -> fib(n - 1))
          .input(() -> fib(n - 2))
          .process((a, b) -> a + b);
    }
  }

  public static void main(String[] args) {
    Task<Long> fib92 = fib(92);
    EvalContext evalContext = MemoizingContext.composeWith(EvalContext.sync());
    EvalContext.Value<Long> value = evalContext.evaluate(fib92);

    value.consume(f92 -> System.out.println("fib(92) = " + f92));
  }
}

Scala equivalent

import java.util.function.Consumer

import com.spotify.flo._
import com.spotify.flo.context.MemoizingContext

object Fib extends App {

  def fib(n: Long): Task[Long] = defTask[Long](n) dsl (
    if (n < 2) {
      $ process n
    } else {
      $ input fib(n - 1) input fib(n - 2) process (_ + _)
    }
  )

  val fib92 = fib(92)
  val evalContext = MemoizingContext.composeWith(EvalContext.sync)
  val value = evalContext.evaluate(fib92)

  value.consume(new Consumer[Long] {
    //noinspection ScalaStyle
    override def accept(t: Long): Unit = Console.println(s"fib(92) = ${t}")
  })
}

For more details on a high-level runner implementation, see flo-runner.

Task<T> is one of the more central types in flo. It represents some task which will evaluate a value of type T. It has a parameterized name, zero or more input tasks and a processing function which will be executed when inputs are evaluated. Tasks come with a few key properties governing how they are defined, behave and are interacted with. We'll cover these in the following sections.

Tasks are defined by regular methods

Your workflow tasks are not defined as classes that extend Task<T>, rather they are defined by using the TaskBuilder API as we've already seen in the fibonacci example. This is in many ways very similar to a very clean class with no mutable state, only final members and two overridden methods for inputs and evaluation function. But with a very important difference, we're handling the input tasks in a type-safe manner. Each input task you add will further construct the type for your evaluation function. This is how we can get a clean lambda such as (a, b) -> a + b as the evaluation function for our fibonacci example.

Here's a simple example of a flo task depending on two other tasks:

Task<Integer> myTask(String arg) {
  return Task.named("MyTask", arg).ofType(Integer.class)
      .input(() -> otherTask(arg))
      .input(() -> yetATask(arg))
      .process((otherResult, yetAResult) -> /* ... */);
}

This is how the same thing would typically look like in other libraries:

class MyTask extends Task<Integer> {

  private final String arg;

  MyTask(String arg) {
    super("MyTask", arg);
    this.arg = arg;
  }

  @Override
  public List<? extends Task<?>> inputs() {
    return Arrays.asList(new OtherTask(arg), new YetATask(arg));
  }

  @Override
  public Integer process(List<Object> inputs) {
    // lose all type safety and guess your inputs
    // ...
  }
}

Task embedding

There's of course nothing stopping you from having the task defined in a regular class. It might even be useful if your evaluation function is part of an existing class. flo does not force anything on to your types, it just needs to know what to run.

class SomeExistingClass {

  private final String arg;

  SomeExistingClass(String arg) {
    this.arg = arg;
  }

  Task<Integer> task() {
    return Task.named("EmbeddedTask", arg).ofType(Integer.class)
        .input(() -> otherTask(arg))
        .input(() -> yetATask(arg))
        .process(this::process);
  }

  int process(String otherResult, int yetAResult) {
    // ...
  }
}

Tasks are lazy

Creating instances of Task<T> is cheap. No matter how complex and deep the task DAG might be, creating the top level Task<T> will not cause the whole DAG to be created. This is because all inputs are declared using a Supplier<T>, utilizing their properties for deferred evaluation:

someLibrary.maybeNeedsValue(() -> expensiveCalculation());

This pattern is on its way to become an idiom for achieving laziness in Java 8. A good example is the additions to the Java 8 Logger class which lets the logger decide if the log line for a certain log level should be computed or not.

So we can easily create an endlessly recursive task (useless, but illustrative) and still be able to construct instances of it without having to worry about how complex or resource consuming the construction might be.

Task<String> endless() {
  return Task.named("Endless").ofType(String.class)
      .input(() -> endless())
      .process((impossible) -> impossible);
}

This means that we can always refer to tasks directly by using their definition:

TaskId endlessTaskId = endless().id();

Task DAGs as data structures

A Task<T> can be transformed into a data structure where a materialized view of the task DAG is needed. In this example we have two simple tasks where one is used as the input to the other.

Task<String> first(String arg) {
  return Task.named("First", arg).ofType(String.class)
      .process(() -> "hello " + arg);
}

Task<String> second(String arg) {
  return Task.named("Second", arg).ofType(String.class)
      .input(() -> first(arg))
      .process((firstResult) -> "well, " + firstResult);
}

void printTaskInfo() {
  Task<String> task = second("flo");
  TaskInfo taskInfo = TaskInfo.ofTask(task);
  System.out.println("taskInfo = " + taskInfo);
}

taskInfo in this example will be:

taskInfo = TaskInfo {
  id=Second(flo)#375f5234,
  isReference=false,
  inputs=[
    TaskInfo {
      id=First(flo)#65f4e738,
      isReference=false,
      inputs=[]
    }
  ]
}

The id and inputs fields should be pretty self explanatory. isReference is a boolean which signals if some task has already been materialized earlier in the tree, given a depth first, post-order traversal.

Recall that the DAG expansion can choose inputs arbitrarily based on the arguments. In workflow libraries where expansion is coupled with evaluation, it's hard to know what will be evaluated beforehand. Evaluation planning and result caching/memoizing becomes integral parts of such libraries. flo aims to expose useful information together with flexible evaluation apis to make it a library for easily building workflow management systems, rather than trying to be the can-do-it-all workflow management system itself. More about how this is achieved in the EvalContext sections.

EvalContext defines an interface to a context in which Task<T> instances are evaluated. The context is responsible for expanding the task DAG and invoking the task process-functions. It gives library authors a powerful abstraction to use when implementing the specific details of evaluating a task DAG. All details around setting up wiring of dependencies between tasks, interaction with user code for DAG expansion, invoking task functions with upstream arguments, and other mundane plumbing is dealt with by flo.

These are just a few aspects of evaluation that can be implemented in a EvalContext:

  • Evaluation concurrency and thread pool management
  • Persisted memoization of previous task evaluations
  • Distributed coordination of evaluating shared DAGs
  • Short-circuiting DAG expansion of previously evaluated tasks

Since multi worker, asynchronous evaluation is a very common pre-requisite for many evaluation implementations, flo comes with a base implementation of an AsyncContext that can be extended with further behaviour.

See also SyncContext, InstrumentedContext and MemoizingContext.

flo's People

Contributors

0xflotus avatar aleksandr-spotify avatar asaenf avatar benkogan avatar bergman avatar danielnorberg avatar fabriziodemaria avatar honnix avatar jswarburton avatar labianchin avatar narape avatar natashal avatar rouzwawi avatar sonjaer avatar spotify-helios-ci-agent avatar thomasdziedzic-pd avatar ulzha avatar zatine avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flo's Issues

Java properties are not passed to forked task

With the introduction of ForkingExecutor, I started noticing some errors on BQ flo tasks.

It seems to be related to the fact that I am running the flo process with -DGOOGLE_CLOUD_PROJECT=my-gcp-project and that system property definition is not passed to the the forked task subprocess.

To fix that would require some changes on ForkingExecutor, I guess something related to this.

Remove dependency on apollo and CLI stuff

From what I can tell it was mainly used for configuration and command line parsing, which has been cut out recently. For configuration we could probably use com.typesafe.config.Config directly.

It should also make flo-runner simpler by not allowing args to be passed, meaning the TaskConstructor could be scrapped altogether.

Dependency conflict: multiple versions of com.google.guava:guava:jar

Hi, we found multiple versions of com.google.guava:guava:jar exist in flo-api-generator 0.5.1-SNAPSHOT. As shown in the following dependency tree, due to Maven version management, only com.google.guava:guava:jar:23.0 will be loaded, and com.google.guava:guava:jar:17.0 will be shadowed during the packaging process.

However, several methods only included in the shadowed version com.google.guava:guava:jar:17.0, are invoked by flo-api-generator. As a result, an exception could be thrown when your project referencing the missing method.

Dependency tree:

com.spotify:flo-api-generator:jar:0.5.0
+- org.trimou:trimou-core:jar:2.0.1.Final:compile
| - org.slf4j:slf4j-api:jar:1.7.25:compile (version managed from 1.7.4)
+- com.google.testing.compile:compile-testing:jar:0.6:test
| +- (junit:junit:jar:4.12:test - version managed from 4.10; omitted for duplicate)
| +- (com.google.truth:truth:jar:0.24:test - omitted for duplicate)
| +- com.google.guava:guava:jar:23.0:test
| | +- com.google.code.findbugs:jsr305:jar:3.0.2:test (version managed from 1.3.9)
| | +- com.google.errorprone:error_prone_annotations:jar:2.0.19:test (version managed from 2.0.18)
| | +- com.google.j2objc:j2objc-annotations:jar:1.1:test
| | - org.codehaus.mojo:animal-sniffer-annotations:jar:1.14:test
| - com.sun:tools:jar:1.8.0_111:system
+- com.google.truth:truth:jar:0.24:test
| +- (com.google.guava:guava:jar:23.0:test - version managed from 17.0; omitted for duplicate)
| - (junit:junit:jar:4.12:test - version managed from 4.10; omitted for duplicate)
+- com.google.auto.value:auto-value:jar:1.4:provided
+- ch.qos.logback:logback-classic:jar:1.2.3:test
| +- ch.qos.logback:logback-core:jar:1.2.3:test
| - (org.slf4j:slf4j-api:jar:1.7.25:test - version managed from 1.7.4; omitted for duplicate)
+- junit:junit:jar:4.12:test
| - org.hamcrest:hamcrest-core:jar:1.3:test
+- org.hamcrest:hamcrest-library:jar:1.3:test
| - (org.hamcrest:hamcrest-core:jar:1.3:test - omitted for duplicate)
- org.mockito:mockito-core:jar:2.21.0:test
+- net.bytebuddy:byte-buddy:jar:1.8.15:test
+- net.bytebuddy:byte-buddy-agent:jar:1.8.15:test
- org.objenesis:objenesis:jar:2.6:test

Hope this can help you.

Best regards,
Leo

Named task arguments & structured access to args, input & context in process fn

Instead of just passing in arguments to a task as an array of anonymous items, could we have named args? E.g. a case class or something similar to the builder pattern, resulting in a pojo with the named args as fields, accessible in the process function etc.

// pseudo-scala
defTaskNamed("foobar", foo = "hello", bar = "world") // Task ID:foobar(foo=hello,bar=world)
  .input(fooData = args => lookupData(args.foo))
  .input(barData = args => lookupData(args.bar))
  .context(output = args => lookupDestination("foobar"))
  .process { task =>
    log.info("Processing foo={} and bar={}", task.args.foo, task.args.bar)
    val result = computeFoobar(foo = task.input.fooData, bar = task.input.barData)
    task.context.output.publish(result)
  }

Serialization doesn't work with forking

flo version: 0.4.1

Minimal reproducible example:

package com.spotify.data.clickhouse;

import com.spotify.flo.Task;
import com.spotify.flo.context.FloRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class App {
  public static void main(String[] args) {
    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();

    FloRunner
        .runTask(task(opts))
        .waitAndExit();
  }

  public static Task<String> task(PipelineOptions opts) {
    return Task.named("task")
        .ofType(String.class)
        .process(opts::toString);
  }
}

Workaround

Create resources/flo.conf and put flo.forking = false there.

Undo?

Is there an easy way to implement an undo operation that let's tasks undo their actions when rolling back after an error?

Vendoring/shading dependencies

Each time we upgrade Beam or flo there are new dependency conflicts. There is no good way to ensure that there are no conflicts apart from running code, it makes upgrade painful. One of the ways to solve it would be vendoring or shading dependencies, see beam-dev.

What do you think?

Conditionally depend on a task

Currently I guess one can make do with ins() that supports a list of tasks, and pass zero or one tasks there.

Should we have a more explicit API?

[Feature Request] Evaluation of tasks without running it.

It would be good to have a way to evaluate the task, if all it's dependencies are OK but not run it.
Something like FloRunner.evaluateTask(task). This can be useful if the the task you want to evaluate
a) runs for a long time (data pipelines)
b) makes a permanent or hard to revert change in an external system

It is not possible to set the success parameter when running a BigQueryOperation job

When using the BiqQueryOperator to trigger a BigQuery load job one has to set the success parameter on the BigQueryOperation. It is not possible to set this field and without it set a NullPointerException is thrown when running the job.

This is not caught by the tests as the success method is package private, so can be called in the tests but cannot be called externally.

https://github.com/spotify/flo/blob/master/contrib/flo-bigquery/src/main/java/com/spotify/flo/contrib/bigquery/BigQueryOperation.java

Serialization issues

Since flo 0.2.9 and introduction of forking and serialization I am running into issues testing and running flo tasks.

The current errors I get are:

java.lang.AssertionError: assertion failed: (List(import java.this.lang._),List())
scala.MatchError: None (of class scala.None$)

They seem related to:

My current hunch is that we might need to introduce chill and improve the kryo registrar configuration, similar to what scio does.

I created the ticket to raise awareness of the issue, ask for help and document my findings. I still need to bring the steps to reproduce the issue.

TaskBuilder doesn't catch TaskContext returning unserializable types

For instance, in BigQueryLookupOperator, BigQueryLookup should be serializable.

I see two alternatives:

  • check type signatures, I don't know if there is a good way to fix it due to type erasure
  • check that ProcessFnArg#get return serializable type, but it might require a further extension of the code around to give good error messages

flo-scala_2.12 ?

It would be nice to have a release of flo-scala both for scala 2.11 and scala 2.12. That means having support for cross builds.

Disable color in logs

It seems that currently logs always have colors (

LOG.warn("{} Failed after {}", colored(taskId), hms, valueError);
).

This does not work well with Styx and Stackdriver logs, where logs show as:

I  13:36:06.939 INFO  FloRunner - �[36mwrapperTask(�[m2018-04-10�[36m)�[37m#9ef4e797�[m
 
I  13:36:06.939 INFO  FloRunner - �[37m├▸ �[36mmakeDemoTask(�[mflo_extras.ScioAvroCanary,2018-04-10�[36m)�[37m#8639ee0e�[m
 
I  13:36:06.939 INFO  FloRunner - �[37m│  �[37m└▸ �[36mapply(�[mfoo,input,2018-04-10�[36m)�[37m#18bc0d71�[m
 
I  13:36:06.939 INFO  FloRunner - �[37m├▸ �[36mbqLoad(�[mfoo,tmp,demo1_20180410�[36m)�[37m#24ead18e�[m
 
I  13:36:06.939 INFO  FloRunner - �[37m│  �[37m└▸ �[36mmakeDemoTask(�[mflo_extras.ScioAvroCanary,2018-04-10�[36m)�[37m#8639ee0e�[m�[33m ⤴�[m
 
I  13:36:06.939 INFO  FloRunner - �[37m├▸ �[36mbqExtract(�[mflo_extras.BqExtractAvroCanary,2018-04-10�[36m)�[37m#fddb1b8c�[m
 
I  13:36:06.940 INFO  FloRunner - �[37m│  �[37m└▸ �[36mbqLoad(�[mfoo,tmp,demo1_20180410�[36m)�[37m#24ead18e�[m�[33m ⤴�[m
 
I  13:36:06.940 INFO  FloRunner - �[37m└▸ �[36mbqExtract(�[mflo_extras.BqExtractTsvCanary,2018-04-10�[36m)�[37m#fe3a4839�[m
 
I  13:36:06.940 INFO  FloRunner - �[37m   �[37m└▸ �[36mbqLoad(�[mfoo,tmp,demo1_20180410�[36m)�[37m#24ead18e�[m�[33m ⤴�[m
 

(here logs are even more weird, internally check flo​_extras​.Scio​Avro​Canary workflow)

ForkingExecutor does not seem to broadcast subprocess exit code

It seems that FloForkingExecutor does not handle the subprocess exit code, hence the main process would not exit with code 50.

As an example, consider the following "pseudo" Flo Task:

com.spotify.flo.defTask("foo")
   .process(() => sys.exit(50))

Why is that important

Because Flo is useful with Styx and Styx support a exit code 50 as a signal to not retry.

Exit code 50 will cause an immediate failure of the workflow instance (no re-try will be scheduled). This can be used by workflow to indicate an unrecoverable failure and instruct Styx not to retry.

This is used internally in Spotify when a data validation fails in a non recoverable way.

Configure forking context with conf file

Currently forking can be disabled with environment variable FLO_FORKING=false.

I would be great to have this configuration in typesafe config, like defining something in reference.conf:

flo.forkingEnabled=true
flo.forkingEnalbed=${?FLO_FORKING_ENABLED}

flaky test: shouldEvaluate3N_IIL

TaskTest.shouldEvaluate3N_IIL sometimes fails with the below error:

java.lang.AssertionError: timeout while waiting for 0 concurrent tasks to run

	at org.junit.Assert.fail(Assert.java:88)
	at com.spotify.flo.ControlledBlockingContext.waitUntilNumConcurrent(ControlledBlockingContext.java:63)
	at com.spotify.flo.TaskTest.validateEvaluation(TaskTest.java:201)
	at com.spotify.flo.TaskTest.shouldEvaluate3N_IIL(TaskTest.java:150)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.