Code Monkey home page Code Monkey logo

piper's Introduction

Project Status

I no longer maintain this project. You might want to check out tork as an alternative solution.

Introduction

Piper is an open-source, distributed workflow engine built on Spring Boot, designed to be dead simple.

Piper can run on one or a thousand machines depending on your scaling needs.

In Piper, work to be done is defined as a set of tasks called a Pipeline. Pipelines can be sourced from many locations but typically they live on a Git repository where they can be versioned and tracked.

Piper was originally built to support the need to transcode massive amounts of video in parallel. Since transcoding video is a CPU and time instensive process I had to scale horizontally. Moreover, I needed a way to monitor these long running jobs, auto-retry them and otherwise control their execution.

Tasks

Tasks are the basic building blocks of a pipeline. Each task has a type property which maps to a TaskHandler implementation, responsible for carrying out the task.

For example here's the RandomInt TaskHandler implementation:

  public class RandomInt implements TaskHandler<Object> {

    @Override
    public Object handle(Task aTask) throws Exception {
      int startInclusive = aTask.getInteger("startInclusive", 0);
      int endInclusive = aTask.getInteger("endInclusive", 100);
      return RandomUtils.nextInt(startInclusive, endInclusive);
    }

  }

While it doesn't do much beyond generating a random integer, it does demonstrate how a TaskHandler works. a Task instance is passed as an argument to the TaskHandler which contains all the Key-Value pairs of that task.

The TaskHandler is then responsible for executing the task using this input and optionally returning an output which can be used by other pipeline tasks downstream.

Pipelines

Piper pipelines are authored in YAML, a JSON superset.

Here is an example of a basic pipeline definition.

name: Hello Demo

inputs:                --+
  - name: yourName       |
    label: Your Name     | - This defines the inputs
    type: string         |   expected by the pipeline
    required: true     --+

outputs:                 --+
  - name: myMagicNumber    | - You can output any of the job's
    value: ${randomNumber} |   variable as the job's output.
                         --+
tasks:
  - name: randomNumber               --+
    label: Generate a random number    |
    type: random/int                   | - This is a task
    startInclusive: 0                  |
    endInclusive: 10000              --+

  - label: Print a greeting
    type: io/print
    text: Hello ${yourName}

  - label: Sleep a little
    type: time/sleep        --+
    millis: ${randomNumber}   | - tasks may refer to the result of a previous task
                            --+
  - label: Print a farewell
    type: io/print
    text: Goodbye ${yourName}

So tasks are nothing but a collection of key-value pairs. At a minimum each task contains a type property which maps to an appropriate TaskHandler that needs to execute it.

Tasks may also specify a name property which can be used to name the output of the task so it can be used later in the pipeline.

The label property is used to give a human-readble description for the task.

The node property can be used to route tasks to work queues other than the default tasks queue. This allows one to design a cluster of worker nodes of different types, of different capacity, different 3rd party software dependencies and so on.

The retry property can be used to specify the number of times that a task is allowed to automatically retry in case of a failure.

The timeout property can be used to specify the number of seconds/minutes/hours that a task may execute before it is cancelled.

The output property can be used to modify the output of the task in some fashion. e.g. convert it to an integer.

All other key-value pairs are task-specific and may or may not be required depending on the specific task.

Architecture

Piper is composed of the following components:

Coordinator: The Coordinator is the like the central nervous system of Piper. It keeps tracks of jobs, dishes out work to be done by Worker machines, keeps track of failures, retries and other job-level details. Unlike Worker nodes, it does not execute actual work but delegate all task activities to Worker instances.

Worker: Workers are the work horses of Piper. These are the Piper nodes that actually execute tasks requested to be done by the Coordinator machine. Unlike the Coordinator, the workers are stateless, which by that is meant that they do not interact with a database or keep any state in memory about the job or anything else. This makes it very easy to scale up and down the number of workers in the system without fear of losing application state.

Message Broker: All communication between the Coordinator and the Worker nodes is done through a messaging broker. This has many advantages:

  1. if all workers are busy the message broker will simply queue the message until they can handle it.
  2. when workers boot up they subscribe to the appropriate queues for the type of work they are intended to handle
  3. if a worker crashes the task will automatically get re-queued to be handle by another worker.
  4. Last but not least, workers and TaskHandler implementations can be written in any language since they decoupled completely through message passing.

Database: This piece holds all the jobs state in the system, what tasks completed, failed etc. It is used by the Coordinator as its "mind".

Pipeline Repository: The component where pipelines (workflows) are created, edited etc. by pipeline engineers.

Control Flow

Piper support the following constructs to control the flow of execution:

Each

Applies the function iteratee to each item in list, in parallel. Note, that since this function applies iteratee to each item in parallel, there is no guarantee that the iteratee functions will complete in order.

- type: each
  list: [1000,2000,3000]
  iteratee:
    type: time/sleep
    millis: ${item}

This will generate three parallel tasks, one for each items in the list, which will sleep for 1, 2 and 3 seconds respectively.

Parallel

Run the tasks collection of functions in parallel, without waiting until the previous function has completed.

- type: parallel
  tasks:
    - type: io/print
      text: hello

    - type: io/print
      text: goodbye

Fork/Join

Executes each branch in the branches as a seperate and isolated sub-flow. Branches are executed internally in sequence.

- type: fork
  branches:
     - - name: randomNumber                 <-- branch 1 start here
         label: Generate a random number
         type: random/int
         startInclusive: 0
         endInclusive: 5000

       - type: time/sleep
         millis: ${randomNumber}

     - - name: randomNumber                 <-- branch 2 start here
         label: Generate a random number
         type: random/int
         startInclusive: 0
         endInclusive: 5000

       - type: time/sleep
         millis: ${randomNumber}

Switch

Executes one and only one branch of execution based on the expression value.

- type: switch
  expression: ${selector} <-- determines which case will be executed
  cases:
     - key: hello                 <-- case 1 start here
       tasks:
         - type: io/print
           text: hello world
     - key: bye                   <-- case 2 start here
       tasks:
         - type: io/print
           text: goodbye world
  default:
    - tasks:
        -type: io/print
         text: something else

Map

Produces a new collection of values by mapping each value in list through the iteratee function. The iteratee is called with an item from list in parallel. When the iteratee is finished executing on all items the map task will return a list of execution results in an order which corresponds to the order of the source list.

- name: fileSizes
  type: map
  list: ["/path/to/file1.txt","/path/to/file2.txt","/path/to/file3.txt"]
  iteratee:
    type: io/filesize
    file: ${item}

Subflow

Starts a new job as a sub-flow of the current job. Output of the sub-flow job is the output of the task.

- type: subflow
  pipelineId: copy_files
  inputs:
    - source: /path/to/source/dir
    - destination: /path/to/destination/dir

Pre/Post/Finalize

Each task can define a set of tasks that will be executed prior to its execution (pre), after its succesful execution (post) and at the end of the task's lifecycle regardless of the outcome of the task's execution (finalize).

pre/post/finalize tasks always execute on the same node which will execute the task itself and are considered to be an atomic part of the task. That is, failure in any of the pre/post/finalize tasks is considered a failure of the entire task.

  - label: 240p
    type: media/ffmpeg
    options: [
      "-y",
      "-i",
      "/some/input/video.mov",
      "-vf","scale=w=-2:h=240",
      "${workDir}/240p.mp4"
    ]
    pre:
      - name: workDir
        type: core/var
        value: "${temptDir()}/${uuid()}"
      - type: io/mkdir
        path: "${workDir}"
    post:
      - type: s3/putObject
        uri: s3://my-bucket/240p.mp4
    finalize:
      - type: io/rm
        path: ${workDir}

Webhooks

Piper provide the ability to register HTTP webhooks to receieve notifications for certain events.

Registering webhooks is done when creating the job. E.g.:

{
  "pipelineId": "demo/hello",
  "inputs": {
    ...
  },
  "webhooks": [{
    "type": "job.status",
    "url": "http://example.com",
    "retry": {   # optional configuration for retry attempts in case of webhook failure
      "initialInterval":"3s" # default 2s
      "maxInterval":"10s" # default 30s
      "maxAttempts": 4 # default 5
      "multiplier": 2.5 # default 2.0
    }
  }]
}

type is the type of event you would like to be notified on and url is the URL that Piper would be calling when the event occurs.

Supported types are job.status and task.started.

Task Handlers

core/var

  name: pi
  type: core/var
  value: 3.14159

io/createTempDir

  name: tempDir
  type: io/create-temp-dir

io/filepath

  name: myFilePath
  type: io/filepath
  filename: /path/to/my/file.txt

io/ls

  name: listOfFiles
  type: io/ls
  recursive: true # default: false
  path: /path/to/directory

io/mkdir

  type: io/mkdir
  path: /path/to/directory

io/print

  type: io/print
  text: hello world

io/rm

  type: io/rm
  path: /some/directory

media/dar

  name: myDar
  type: media/dar
  input: /path/to/my/video/mp4

media/ffmpeg

  type: media/ffmpeg
  options: [
    -y,
    -i, "${input}",
    "-pix_fmt","yuv420p",
    "-codec:v","libx264",
    "-preset","fast",
    "-b:v","500k",
    "-maxrate","500k",
    "-bufsize","1000k",
    "-vf","scale=-2:${targetHeight}",
    "-b:a","128k",
    "${output}"
  ]

media/ffprobe

  name: ffprobeResults
  type: media/ffprobe
  input: /path/to/my/media/file.mov

media/framerate

  name: framerate
  type: media/framerate
  input: /path/to/my/video/file.mov

media/mediainfo

  name: mediainfoResult
  type: media/mediainfo
  input: /path/to/my/media/file.mov

media/vduration

  name: duration
  type: media/vduration
  input: /path/to/my/video/file.mov

media/vsplit

  name: chunks
  type: media/vsplit
  input: /path/to/my/video.mp4
  chunkSize: 30s

media/vstitch

  type: media/vstitch
  chunks:
    - /path/to/chunk_001.mp4
    - /path/to/chunk_002.mp4
    - /path/to/chunk_003.mp4
    - /path/to/chunk_004.mp4
  output: /path/to/stitched/file.mp4

random/int

  name: someRandomNumber
  type: random/int
  startInclusive: 1000 # default 0
  endInclusive: 9999 # default 100

random/rogue

  type: random/rogue
  probabilty: 0.25 # default 0.5

s3/getObject

  type: s3/getObject
  uri: s3://my-bucket/path/to/file.mp4
  filepath: /path/to/my/file.mp4

s3/listObjects

  type: s3/listObjects
  bucket: my-bucket
  prefix: some/path/

s3/getUrl

  type: s3/getUrl
  uri: s3://my-bucket/path/to/file.mp4

s3/presignGetObject

  name: url
  type: s3/presignGetObject
  uri: s3://my-bucket/path/to/file.mp4
  signatureDuration: 60s

s3/putObject

  type: s3/putObject
  uri: s3://my-bucket/path/to/file.mp4
  filepath: /path/to/my/file.mp4

shell/bash

  name: listOfFiles
  type: shell/bash
  script: |
        for f in /tmp
        do
          echo "$f"
        done

time/sleep

  type: time/sleep
  millis: 60000

Expression Functions

boolean

  type: core/var
  value: "${boolean('false')}"

byte

  type: core/var
  value: "${byte('42')}"

char

  type: core/var
  value: "${char('1')}"

short

  type: core/var
  value: "${short('42')}"

int

  type: core/var
  value: "${int('42')}"

long

  type: core/var
  value: "${long('42')}"

float

  type: core/var
  value: "${float('4.2')}"

double

  type: core/var
  value: "${float('4.2')}"

systemProperty

  type: core/var
  value: "${systemProperty('java.home')}"

range

  type: core/var
  value: "${range(0,100)}" # [0,1,...,100]

join

  type: core/var
  value: "${join('A','B','C')}" # ABC

concat

  type: core/var
  value: "${join('A','B','C')"}

concat

  type: core/var
  value: ${concat(['A','B'],['C'])} # ['A','B','C']

flatten

  type: core/var
  value: ${flatten([['A'],['B']])} # ['A','B']

sort

  type: core/var
  value: ${sort([3,1,2])} # [1,2,3]

tempDir

  type: core/var
  value: "${tempDir()}"  # e.g. /tmp

uuid

  name: workDir
  type: core/var
  value: "${tempDir()}/${uuid()}"

stringf

  type: core/var
  value: "${stringf('%03d',5)}"  # 005

now

  type: core/var
  value: "${dateFormat(now(),'yyyy')}"  # e.g. 2020

timestamp

  type: core/var
  value: "${timestamp()}"  # e.g. 1583268621423

dateFormat

  type: core/var
  value: "${dateFormat(now(),'yyyy')}"  # e.g. 2020

config

  type: core/var
  value: "${config('some.config.property')}"

Tutorials

Hello World

Start a local Postgres database:

./scripts/database.sh

Start a local RabbitMQ instance:

./scripts/rabbit.sh

Build Piper:

./scripts/build.sh

Start Piper:

./scripts/development.sh

Go to the browser at http://localhost:8080/jobs

Which should give you something like:

{
  number: 0,
  totalItems: 0,
  size: 0,
  totalPages: 0,
  items: [ ]
}

The /jobs endpoint lists all jobs that are either running or were previously run on Piper.

Start a demo job:

curl -s \
     -X POST \
     -H Content-Type:application/json \
     -d '{"pipelineId":"demo/hello","inputs":{"yourName":"Joe Jones"}}' \
     http://localhost:8080/jobs

Which should give you something like this as a response:

{
  "createTime": "2017-07-05T16:56:27.402+0000",
  "webhooks": [],
  "inputs": {
    "yourName": "Joe Jones"
  },
  "id": "8221553af238431ab006cc178eb59129",
  "label": "Hello Demo",
  "priority": 0,
  "pipelineId": "demo/hello",
  "status": "CREATED",
  "tags": []
}

If you'll refresh your browser page now you should see the executing job.

In case you are wondering, the demo/hello pipeline is located at here

Writing your first pipeline

Create the directory ~/piper/pipelines and create a file in there called mypipeline.yaml.

Edit the file and the following text:

label: My Pipeline

inputs:
  - name: name
    type: string
    required: true

tasks:
  - label: Print a greeting
    type: io/print
    text: Hello ${name}

  - label: Print a farewell
    type: io/print
    text: Goodbye ${name}

Execute your workflow

curl -s -X POST -H Content-Type:application/json -d '{"pipelineId":"mypipeline","inputs":{"name":"Arik"}}' http://localhost:8080/jobs

You can make changes to your pipeline and execute the ./scripts/clear.sh to clear the cache to reload the pipeline.

Scaling Piper

Depending on your workload you will probably exhaust the ability to run Piper on a single node fairly quickly. Good, because that's where the fun begins.

Start RabbitMQ:

./scripts/rabbit.sh

Start the Coordinator:

./scripts/coordinator.sh

From another terminal window, start a Worker:

./scripts/worker.sh

Execute the demo pipeline:

curl -s \
     -X POST \
     -H Content-Type:application/json \
     -d '{"pipelineId":"demo/hello","inputs":{"yourName":"Joe Jones"}}' \
     http://localhost:8080/jobs

Transcoding a Video

Note: You must have ffmpeg installed on your worker machine to get this demo to work

Transcode a source video to an SD (480p) output:

curl -s \
     -X POST \
     -H Content-Type:application/json \
     -d '{"pipelineId":"video/transcode","inputs":{"input":"/path/to/video/input.mov","output":"/path/to/video/output.mp4","profile":"sd"}}' \
     http://localhost:8080/jobs

Transcode a source video to an HD (1080p) output:

curl -s \
     -X POST \
     -H Content-Type:application/json \
     -d '{"pipelineId":"video/transcode","inputs":{"input":"/path/to/video/input.mov","output":"/path/to/video/output.mp4","profile":"hd"}}' \
     http://localhost:8080/jobs

Transcoding a Video (Split & Stitch)

See Transcoding video at scale with Piper

Adaptive Streaming

See Adaptive Streaming with Piper

Using Git as a Pipeline Repository backend

Rather than storing the pipelines in your local file system you can use Git to store them for you. This has great advantages, not the least of which is pipeline versioning, Pull Requests and everything else Git has to offer.

To enable Git as a pipeline repository set the piper.pipeline-repository.git.enabled flag to true in ./scripts/development.sh and restart Piper. By default, Piper will use the demo repository piper-pipelines.

You can change it by using the piper.pipeline-repository.git.url and piper.pipeline-repository.git.search-paths configuration parameters.

Configuration

# messaging provider between Coordinator and Workers (jms | amqp | kafka) default: jms
piper.message-broker.provider=jms
# turn on the Coordinator process
piper.coordinator.enabled=true
# turn on the Worker process and listen to tasks.
piper.worker.enabled=true
# when worker is enabled, subscribe to the default "tasks" queue with 5 concurrent consumers.
# you may also route pipeline tasks to other arbitrarilty named task queues by specifying the "node"
# property on any give task.
# E.g. node: captions will route to the captions queue which a worker would subscribe to with piper.worker.subscriptions.captions
# note: queue must be created before tasks can be routed to it. Piper will create the queue if it isn't already there when the worker
# bootstraps.
piper.worker.subscriptions.tasks=5
# enable a git-based pipeline repository
piper.pipeline-repository.git.enabled=true
# The URL to the Git Repo
piper.pipeline-repository.git.url=https://github.com/myusername/my-pipelines.git
piper.pipeline-repository.git.branch=master
piper.pipeline-repository.git.username=me
piper.pipeline-repository.git.password=secret
# folders within the git repo that are scanned for pipelines.
piper.pipeline-repository.git.search-paths=demo/,video/
# enable file system based pipeline repository
piper.pipeline-repository.filesystem.enabled=true
# location of pipelines on the file system.
piper.pipeline-repository.filesystem.location-pattern=$HOME/piper/**/*.yaml
# data source
spring.datasource.platform=postgres # only postgres is supported at the moment
spring.datasource.url=jdbc:postgresql://localhost:5432/piper
spring.datasource.username=piper
spring.datasource.password=piper
spring.datasource.initialization-mode=never # change to always when bootstrapping the database for the first time

Docker

creactiviti/piper Hello World in Docker:

Start a local Postgres database:

./scripts/database.sh

Create an empty directory:

mkdir pipelines
cd pipelines

Create a simple pipeline file -- hello.yaml -- and paste the following to it:

label: Hello World
inputs:
  - name: name
    label: Your Name
    type: core/var
    required: true
tasks:
  - label: Print Hello Message
    type: io/print
    text: "Hello ${name}!"
docker run \
  --name=piper \
  --link postgres:postgres \
  --rm \
  -it \
  -e spring.datasource.url=jdbc:postgresql://postgres:5432/piper \
  -e spring.datasource.initialization-mode=always \
  -e piper.worker.enabled=true \
  -e piper.coordinator.enabled=true \
  -e piper.worker.subscriptions.tasks=1 \
  -e piper.pipeline-repository.filesystem.enabled=true \
  -e piper.pipeline-repository.filesystem.location-pattern=/pipelines/**/*.yaml \
  -v $PWD:/pipelines \
  -p 8080:8080 \
  creactiviti/piper
curl -s \
     -X POST \
     -H Content-Type:application/json \
     -d '{"pipelineId":"hello","inputs":{"name":"Joe Jones"}}' \
     http://localhost:8080/jobs

License

Piper is released under version 2.0 of the Apache License.

piper's People

Contributors

ccamel avatar muranoya avatar runabol avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

piper's Issues

Switch not working as per document

Issue:
Switch/Cases never executes "default" block.

Created below pipeline. On executing Switch Cases, when "year" value is not 2019 or 2020 it NEVER reaches Default block to set respective value to "output" variable, and the coordinator status always comes FAILED

2019-05-03 22:27:28.001 DEBUG 20812 --- [cTaskExecutor-1] c.c.piper.core.event.LogEventListener : {jobId=a9cabddd12f3469d9c68505fc7da6428, createTime=2019-05-04T05:27:27.993+0000, id=5097b47a74204e38af03b181fdd1cd9b, type=job.status, status=FAILED}

Request:

{
"pipelineId":"myswitch",
"inputs":
{
"year":"2019"
}
}

Response:

{
"createTime": "2019-05-04T05:27:27.490+0000",
"webhooks": [],
"inputs": {
"year": "2090"
},
"id": "a9cabddd12f3469d9c68505fc7da6428",
"label": "Switch",
"priority": 0,
"pipelineId": "switch2",
"status": "CREATED",
"tags": []
}

switch2.yaml

label: Switch

inputs:
  - name: year
    type: string
    required: true
    
tasks:          

  - name: selector
    type: var
    value: ${year}

  - type: switch
    expression: ${selector}
    cases: 
      - key: "2019"
        tasks: 
          - name: output
            type: var
            value: Requested this year ${selector} schedule 
      - key: "2020"
        tasks: 
          - name: output
            type: var
            value: Requested next year ${selector} schedule 
    default: 
          -  name: output
             type: var
             value: Schedule not available for year ${selector}
             
  - type: print
    label: ${selector}-${output}
    text: ${output}  

Parallel task handler is executing tasks sequentially

As the title describe, the second command's output date is always greater than the first commmand's

Pipeline code :

label: Transcode 

inputs:
  - name: input
    label: Input File
    type: string
    required: true

tasks: 

- type: parallel
  tasks:

    - type: bash
      label: First executed command
      script: sleep 10; date
          
    - type: bash
      label: Second executed command
      script: date

Job description :

{
    "outputs": {},
    "execution": [
        {
            "jobId": "635d3f8490664d2b8cadd1eb434cd09c",
            "createTime": "2019-01-27T20:17:23.757+0000",
            "startTime": "2019-01-27T20:17:23.824+0000",
            "taskNumber": 1,
            "id": "2a22422dfdde49dfbad12e5985bebcea",
            "type": "parallel",
            "priority": 0,
            "tasks": [
                {
                    "label": "First executed command",
                    "type": "bash",
                    "script": "sleep 10; date"
                },
                {
                    "label": "Second executed command",
                    "type": "bash",
                    "script": "date"
                }
            ],
            "status": "COMPLETED"
        },
        {
            "label": "First executed command",
            "type": "bash",
            "priority": 0,
            "script": "sleep 10; date",
            "parentId": "2a22422dfdde49dfbad12e5985bebcea",
            "output": "Sun Jan 27 20:17:33 UTC 2019\n",
            "executionTime": 10008,
            "jobId": "635d3f8490664d2b8cadd1eb434cd09c",
            "createTime": "2019-01-27T20:17:23.795+0000",
            "progress": 100,
            "startTime": "2019-01-27T20:17:23.801+0000",
            "id": "3fe36d3f85f74513b0ad80f2ab377ed0",
            "endTime": "2019-01-27T20:17:33.808+0000",
            "status": "COMPLETED"
        },
        {
            "label": "Second executed command",
            "type": "bash",
            "priority": 0,
            "script": "date",
            "parentId": "2a22422dfdde49dfbad12e5985bebcea",
            "output": "Sun Jan 27 20:17:33 UTC 2019\n",
            "executionTime": 5,
            "jobId": "635d3f8490664d2b8cadd1eb434cd09c",
            "createTime": "2019-01-27T20:17:23.798+0000",
            "progress": 100,
            "startTime": "2019-01-27T20:17:33.810+0000",
            "id": "0d5e03f8e3dc44269f90e58a1595e697",
            "endTime": "2019-01-27T20:17:33.815+0000",
            "status": "COMPLETED"
        }
    ],
    "inputs": {
        "input": "lol"
    },
    "currentTask": -1,
    "label": "Transcode",
    "priority": 0,
    "pipelineId": "video/transcode",
    "tags": [],
    "createTime": "2019-01-27T20:17:23.620+0000",
    "webhooks": [],
    "startTime": "2019-01-27T20:17:23.750+0000",
    "id": "635d3f8490664d2b8cadd1eb434cd09c",
    "endTime": "2019-01-27T20:17:33.843+0000",
    "status": "COMPLETED"
}

Docker setup help, Failed to obtain JDBC Connection

I've reached this amazing project and I want to give it a try, but I'm unable to setup the enviroment with docker. I've tried two tutorials without success. If I follow the docker instructions at the end of this repository, I just end up with this message if I perform a get request to the /jobs url:

image

The same if I make a POST request with postman, same message:

image

My docker-compose config looks like this:

rabbit:
  image: creactiviti/rabbitmq:3.7.7-management
  hostname: rabbit
  ports:
    - 15672:15672
    - 5672:5672
  restart: always

postgres:
  image: postgres:11
  ports: 
    - 5432:5432
  environment:
    - POSTGRES_DB=piper
    - POSTGRES_USER=piper
    - POSTGRES_PASSWORD=piper

coordinator:
  image: creactiviti/piper:latest
  ports:
    - 8080:8080
  environment:
    - piper.worker.enabled=true
    - piper.coordinator.enabled=true
    - piper.worker.subscriptions.tasks=1
    - piper.pipeline-repository.filesystem.enabled=true
    - piper.pipeline-repository.filesystem.location-pattern=/app/pipelines/**/*.yaml
    - spring.datasource.initialize=true
    - spring.datasource.name=piper
    - spring.datasource.platform=postgres
    - spring.datasource.url=jdbc:postgresql://localhost:5432/piper
    - spring.datasource.username=piper
    - spring.datasource.password=piper
  volumes:
    - D:/test/docker/piper/pipelines:/app/pipelines
  restart: always

I also tried above setup but without any of the spring options, as the docker tutorial in this repository states with same results.

I have some questions:

  1. Is it mandatory having a DB for following the hello world tutorial?
  2. Is there any other tutorial might I follow?
  3. Is it compatible with Oracle, or just h2 and postgres?

I also tried this tutorial without success, I think I'm missing some steps.
https://medium.com/@arik.cohen/transcoding-video-at-scale-with-piper-dca23eb26fd2

Thanks for your work!

/scripts/build.sh fails two unit tests

[ERROR] Failures:
[ERROR] SpelTaskEvaluatorTests.test32:289 expected: </> but was:
[ERROR] MkdirTests.test2:27 Expected java.nio.file.FileSystemException to be thrown, but nothing was thrown.
[INFO]
[ERROR] Tests run: 98, Failures: 2, Errors: 0, Skipped: 0

Assertions.assertEquals(FilenameUtils.getFullPathNoEndSeparator(System.getProperty("java.io.tmpdir")),evaluated.get("tempDir"));
java.io.tmpdir is /tmp on my machine, and getFullPathNoEndSeparator gives "/", which is not equal to "/tmp"

task.set("path", "/no/such/thing");
Files.createDirectories() will not throw expected exception.

Am I missing anything here?

Thanks

The Coordinator Shutsdown

On starting coordinator service it keeps shutting down

019-04-30 23:51:51.119 INFO 27376 --- [ main] uration$$EnhancerBySpringCGLIB$$39cad2e7 : Registring AMQP Listener: completions -> com.creactiviti.piper.core.Coordinator:complete
2019-04-30 23:51:51.125 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:51.392 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:0/SimpleConnection@363042d7 [delegate=amqp://[email protected]:5672/, localPort= 62381]
2019-04-30 23:51:51.409 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:51.468 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:51.475 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:51.483 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:51.488 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:1/SimpleConnection@663411de [delegate=amqp://[email protected]:5672/, localPort= 62382]
2019-04-30 23:51:52.504 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:52.509 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:52.510 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:52.517 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:52.565 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:2/SimpleConnection@33b1c5c5 [delegate=amqp://[email protected]:5672/, localPort= 62383]
2019-04-30 23:51:54.575 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:54.579 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:54.580 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:54.598 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:54.612 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:3/SimpleConnection@9ef8eb7 [delegate=amqp://[email protected]:5672/, localPort= 62385]
2019-04-30 23:51:58.617 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:51:58.619 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:51:58.619 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:51:58.633 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:51:58.650 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:4/SimpleConnection@305a0c5f [delegate=amqp://[email protected]:5672/, localPort= 62388]
2019-04-30 23:52:03.659 INFO 27376 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (x.control.af0fa68f536441fea2329a0491b1d68a) durable:true, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2019-04-30 23:52:03.661 ERROR 27376 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
2019-04-30 23:52:03.661 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-04-30 23:52:03.663 WARN 27376 --- [ 127.0.0.1:5672] c.r.c.impl.ForgivingExceptionHandler : An unexpected connection driver error occured (Exception message: Connection reset)
2019-04-30 23:52:03.670 INFO 27376 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#7b84fcf8:5/SimpleConnection@4372b9b6 [delegate=amqp://[email protected]:5672/, localPort= 62391]
2019-04-30 23:52:03.674 INFO 27376 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2019-04-30 23:52:03.707 INFO 27376 --- [ main] utoConfigurationReportLoggingInitializer :

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2019-04-30 23:52:03.729 ERROR 27376 --- [ main] o.s.boot.SpringApplication : Application startup failed

org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1467) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:579) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11$1.doWithRetry(RabbitAdmin.java:486) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:481) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:592) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1436) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareQueue(RabbitAdmin.java:232) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:168) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:164) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.configureRabbitListeners(AmqpMessengerConfiguration.java:142) ~[classes!/:0.0.1-SNAPSHOT]
at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(RabbitListenerAnnotationBeanPostProcessor.java:230) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at com.creactiviti.piper.PiperApplication.main(PiperApplication.java:37) [classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_171]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:763) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:50) ~[amqp-client-4.0.3.jar!/:4.0.3]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_171]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_171]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1027) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.sun.proxy.$Proxy92.exchangeDeclare(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareExchanges(RabbitAdmin.java:630) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.access$000(RabbitAdmin.java:72) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$12.doInRabbit(RabbitAdmin.java:583) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1461) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 33 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar!/:4.0.3]
... 45 common frames omitted

2019-04-30 23:52:03.734 INFO 27376 --- [ main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ef04b5: startup date [Tue Apr 30 23:51:34 PDT 2019]; root of context hierarchy
2019-04-30 23:52:03.747 INFO 27376 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown

Bash type tasks sometimes fail to execute

There is an intermittent problem with bash type tasks that sometimes fail to execute, throwing a Permission Denied error.
I have isolated the fault and will shortly submit a pull request.

Unknown pipeline: video/hls_single

Hi,

I have setup Coordinator and Worker based setup. My problem is sometimes its working and sometimes I am getting the error " Execution of Rabbit message listener failed." in Worker-node. Also, I am attaching the error below.
Can you help me to solve the issue?

Worker Node Log:

2019-06-06 07:57:32.150 WARN 1 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'onApplicationEvent' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:395) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NullPointerException: null
at com.creactiviti.piper.core.task.SubflowJobStatusEventListener.onApplicationEvent(SubflowJobStatusEventListener.java:46) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.event.EventListenerChain.onApplicationEvent(EventListenerChain.java:35) ~[classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:265) ~[spring-core-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:387) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 12 common frames omitted

2019-06-06 07:57:32.153 WARN 1 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'start' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:395) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:856) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:208) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1381) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:760) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1324) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1294) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1800(SimpleMessageListenerContainer.java:105) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1550) [spring-rabbit-1.7.9.RELEASE.jar!/:na]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
Caused by: java.lang.NullPointerException: null
at com.creactiviti.piper.core.context.MapContext.(MapContext.java:39) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.DefaultJobExecutor.executeNextTask(DefaultJobExecutor.java:73) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.DefaultJobExecutor.execute(DefaultJobExecutor.java:52) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.core.Coordinator.start(Coordinator.java:133) ~[classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:265) ~[spring-core-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:387) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 12 common frames omitted

two sequential events for same task are send to two different queues

Hi Team,

Please can anyone clarify me regarding the below two points?

  1. Two sequential events of a task "STARTED" and "COMPLETED" are going to two different queues "Queues.EVENTS" and "Queues.COMPLETIONS".
    Because of two queues for same task state, "Queues.COMPLETIONS" will be processed before "Queues.EVENTS". Though in
    "TaskStartedEventListener.java" we are updating the DB only if startTime is null. I feel this is a temporary fix.
    So my question is can we push "Completion" event also to "Queues.Events" queue?

  2. During handling of the Event, I am setting few more properties to "TaskExecution" Object, So while creating PiperEvent, can we send TaskExecution Object also as part of PiperEvent?

Regards,
Ravi Kishore. K

Logical control type support

Hi Team,

I am trying to find the option, how can i fit IF ELSE control.
We have SWITCHand FOR LOOP already.

Please advise.

Thanks in advance.
Sreenadh.

Job.status webhooks don't notify of STARTED status

I've started using Webhooks (thanks a lot for the updated documentation btw) but have encountered the following issue:
I get notified of CREATED and COMPLETED events, but not of STARTED ones.
I believe there is a bug on this line, which publishes the event before changing the status. Is this it or am I understanding incorrectly?
Also the notification of the CREATED status seems unnecessary, we already get it on the response to the create job request.

Improve separation worker/coordinator

As far as I understand, the commit 73c8345 has simplified project code structure and put all in one place, in a single jar.

We have a strategy of implementation of the workers in which we need to separate the workers from the coordinator (µService architecture - wrt separation of concerns) and we do not want to embed the code of the coordinator (server) for each of them.

In addition, we plan to implement some workers in go (now that piper supports Kafka). That's why it might be interesting to have some sort of SDK in java, aside of the the piper server project, for the implementation of the workers, who would contain the minimum of code required for that, and a declination for other languages: go, python...

What do you think ?

Fork/Join results not returning

Hi Arik,

            I’m trying to retrieve (join) the two branches back together after they have forked and run.  How can I accomplish that and retrieve the results from each branch in the fork after the join?

            I attempted to do the same with using type “parallel”, but no luck.  I want to be able to merge the branches back, that way I can go to task named “checkAdapterResults” and pull in adapter1 and adapter2 results in the checkAdapterResults handler.

            I’m getting the error “2019-02-13 15:24:45.778 DEBUG 9396 --- [enerContainer-1] c.c.piper.core.task.SpelTaskEvaluator    : EL1008E: Property or field 'adapter1' cannot be found on object of type 'com.creactiviti.piper.core.context.MapContext' - maybe not public or not valid?”

And the same error for “adapter2”.

  • type: fork
    branches:

      • name: adapter1
        label: Web Service Call
        type: adapter1
      • name: adapter2
        label: Web Service Call
        type: adapter2
  • type: checkAdapterResults
    name: checkAdapterResults
    label: Checking Adapter Results
    adapter1: ${adapter1}
    adapter2: ${adapter2}

Thank you ,
Kevin

Distributed computing issue when deployed on multiple nodes with kubernetes

I have deployed piper in kubernetes using 5 piper workers/pods .

After proceeding with the tests, I have noticed that all workers are responsible for a single job which is a problem in case I want to make some POST-TRANSCODING actions with the video.

Here's an explained scenario :

  1. worker X gets the job and started transcoding the file and saving the output to a temporary file
  2. worker X finished the transcoding
  3. worker Y tries to make some POST-TRANSCODING actions with the video => Here come the problem, file not found in the worker's tmp directory.

I did a hack using a mounted NFS volume to sync the workers processing but this is very Network IO consuming.

So my question: Is it possible to bind a unique worker to a unique job ?

Resuming a task within switch executes all tasks in hierarchy of parent switch

I am executing the following yaml
label: one-level-switch

inputs:

  • name: idType
    label: Type of Id - User/Service
    type: string
    required: true

  • name: retainId
    label: Name of IdType
    type: string
    required: true

  • name: leadApproval
    label: Platform Lead Approval
    type: string
    required: true

  • name: targetServerIP
    label: Target Server IP
    type: string
    required: true

  • name: requestorId
    label: Requestor Id
    type: string
    required: true

  • name: wfRequestId
    label: WorkFlow Id
    type: string
    required: true

  • name: transactionType
    label: Transaction Type
    type: string
    required: true

  • name: requestId
    label: Request Id
    type: string
    required: true

outputs:

  • name: myMagicNumber
    value: ${randomNumber}

tasks:

  • label: Is ServiceId configured
    name: serviceIdOutcome
    type: jio/ngo/checkIdConfiguration
    id_type: service
    retainId: ${retainId }
    wfRequestId: ${wfRequestId}
    exec: async
    route:
    endpoint: /people/myspan/checkIdConfiguration
    endpointType: REST
    inbox:
    stat:
    checkIdOutput: ${checkIdOutput}

  • label: Print a greeting 1
    type: io/print
    text: At ${serviceIdOutcome.checkIdOutput} - ${serviceIdOutcome}

  • label: Service Switch
    type: switch
    expression: ${serviceIdOutcome.checkIdOutput}
    cases:

    • key: Y
      tasks:

      • label: Print a greeting
        type: io/print
        text: No Change
    • key: N
      tasks:

      • label: Domain Lead Approval
        name: domainLeadApproval
        type: jio/ngo/DomainLeadApproval
        requestId: ${requestorId}
        inbox: Domain-Lead-Inbox
        userId: ${requestorId}
        domainLeadOutcome: ${domainLeadOutcome}
        approver: ${leadApproval}

      • type: jio/ngo/id/ProfileChange
        label: Profile Change On Target Server
        text: Profile Change On Target Server
        serverIp: ${targetServerIP}
        retainId: ${retainId }
        wfRequestId: ${wfRequestId}
        exec: async
        route:
        endpoint: /people/myspan/profileChange
        endpointType: REST
        inbox:
        stat:
        default:

    • tasks:

      • label: Print a greeting
        type: io/print
        text: ServiceId Default Case
  • name: updateIdMaster
    type: jio/ngo/CloseTransaction
    value: "Pre: ${message}"
    url: api-url(to be put)
    requestId: ${requestId}
    domainLeadResponse: ${domainLeadApproval.domainLeadOutcome}
    exec: async
    route:
    endpoint: /people/myspan/updateIdMasterAndSendMail
    endpointType: REST
    inbox:
    stat:

I am facing below errors -

  • Firstly the resolution of variable "serviceIdOutcome.checkIdOutput" is uneven. It sometimes resolves sometimes not.
  • Secondly I am pausing task( using cordinator.stop() ) at intermediate level (in my case - Domain Lead Approval ) for approval as per my requirement. When i resume task at this stage, the workflow executes once again from start( Which is incorrect in terms of execution, Ideally it should complete my existing task and move on to next one.)
  • Third, if I have multiple switch cases , each task in switch starts with taskNumber 1. So as in "task_exection" table for task "Is ServiceId configured" we have tasknumber as 1 and also for "Domain Lead Approval" we have taskNumber 1. Ideally it should be incremental one, which is also one of the case in which my workflow fails to execute in expected flow.

adding control flow types

Hi,
Piper control flow seems to be missing flow control types such as loop & next. Actually a loop can be executed by a switch & next kind of control types.
Any suggestions on how to add a next kind of control or any other means to achieve that?
Thanks.

Add Travis/Circle CI build

It'd be great to have some continuous integration (in order to run builds for regression testing, and so on).

java.lang.IllegalArgumentException: Unknown task handler: subflow

As the title mentions, i'm running the latest version of docker container "creactiviti/piper"
I wanted to use the subflow task in order to make a modular code, but an exception was raised

$ docker pull creactiviti/piper
Using default tag: latest
latest: Pulling from creactiviti/piper
Digest: sha256:1d9e5465304805191e0a7322e7c7b36ca2a4f3dc577f02d7bb6f7d46f82726ef
Status: Image is up to date for creactiviti/piper:latest
        {
            "jobId": "3559b511ff3e48ff9baa5548c04c785d",
            "createTime": "2019-01-25T12:08:26.118+0000",
            "inputs": [
                {
                    "source": "/path/to/source/dir"
                },
                {
                    "destination": "/path/to/destination/dir"
                }
            ],
            "taskNumber": 2,
            "id": "c072543876024b89a50d753164d0201e",
            "endTime": "2019-01-25T12:08:26.128+0000",
            "type": "subflow",
            "priority": 0,
            "error": {
                "stackTrace": [
                    "java.lang.IllegalArgumentException: Unknown task handler: subflow",
                    "\tat org.springframework.util.Assert.notNull(Assert.java:134)",
                    "\tat com.creactiviti.piper.core.task.DefaultTaskHandlerResolver.resolve(DefaultTaskHandlerResolver.java:33)",
                    "\tat com.creactiviti.piper.core.Worker.lambda$handle$0(Worker.java:90)",
                    "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)",
                    "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)",
                    "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)",
                    "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)",
                    "\tat java.lang.Thread.run(Thread.java:748)"
                ],
                "message": "Unknown task handler: subflow"
            },
            "pipelineId": "job",
            "status": "FAILED"
        }

The Docker Hub's image seems to be outdated

Application startup failed due to org.springframework.amqp.AmqpIOException: java.io.IOException

I am facing following exceptions (window env) Please advice. Thanks in advance.

ror starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
2019-05-05 00:49:05.105 ERROR 17532 --- [ main] o.s.boot.SpringApplication : Application startup failed

org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1467) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:579) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11$1.doWithRetry(RabbitAdmin.java:486) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:481) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:592) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1436) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareQueue(RabbitAdmin.java:232) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:168) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.registerListenerEndpoint(AmqpMessengerConfiguration.java:164) ~[classes!/:0.0.1-SNAPSHOT]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.lambda$configureRabbitListeners$0(AmqpMessengerConfiguration.java:150) ~[classes!/:0.0.1-SNAPSHOT]
at java.util.HashMap.forEach(Unknown Source) ~[na:1.8.0_201]
at com.creactiviti.piper.config.AmqpMessengerConfiguration.configureRabbitListeners(AmqpMessengerConfiguration.java:150) ~[classes!/:0.0.1-SNAPSHOT]
at org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(RabbitListenerAnnotationBeanPostProcessor.java:230) ~[spring-rabbit-1.7.9.RELEA
SE.jar!/:na]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:781) ~[spring-beans-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543) ~[spring-context-4.3.18.RELEASE.jar!/:4.3.18.RELEASE]
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303) ~[spring-boot-1.5.15.RELEASE.jar!/:1.5.15.RELEASE]
at com.creactiviti.piper.PiperApplication.main(PiperApplication.java:37) [classes!/:0.0.1-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_201]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_201]
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:87) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.Launcher.launch(Launcher.java:50) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:51) [piper-0.0.1-SNAPSHOT.jar:0.0.1-SNAPSHOT]
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:763) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:50) ~[amqp-client-4.0.3.jar!/:4.0.3]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_201]
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[na:1.8.0_201]
at java.lang.reflect.Method.invoke(Unknown Source) ~[na:1.8.0_201]
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1027) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at com.sun.proxy.$Proxy87.exchangeDeclare(Unknown Source) ~[na:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.declareExchanges(RabbitAdmin.java:630) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin.access$000(RabbitAdmin.java:72) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitAdmin$12.doInRabbit(RabbitAdmin.java:583) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1461) ~[spring-rabbit-1.7.9.RELEASE.jar!/:na]
... 35 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message'
, class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.3.jar!/:4.0.3]
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.3.jar!/:4.0.3]
... 47 common frames omitted

2019-05-05 00:49:05.110 INFO 17532 --- [ main] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@46fbb2
c1: startup date [Sun May 05 00:48:39 PDT 2019]; root of context hierarchy
2019-05-05 00:49:05.116 INFO 17532 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown

MongoDB support

I started working on MongoDB database support in Piper. I think it's a very interesting NoSQL database for Piper due to its ability to persist very easily JSON (/BSON) documents.

I would like to know if you would be interested in this contribution?

Pipeline task

Is it possible to call other pipelines by their "pipelineId" as Tasks from another pipeline, and pass them parameters?

Your first pipeline example does not work

So i am following the steps as mentioned at https://github.com/creactiviti/piper#writing-your-first-pipeline & get an error -

/piper$ curl -s -X POST -H Content-Type:application/json -d '{"pipelineId":"mypipeline","inputs":{"name":"Arik"}}' http://localhost:8080/jobs
{"timestamp":"2019-03-21T20:57:15.006+0000","status":400,"error":"Bad Request","exception":"java.lang.IllegalArgumentException","message":"Unknown pipeline: mypipeline","path":"/jobs"}

I have tried this with the mypipeline.yaml in

/piper$ ll src/main/resources/pipelines/
total 1
drwxrwxrwx 1 xxx xxx 256 Mar 21 16:53 ./
drwxrwxrwx 1 xxx xxx 0 Mar 21 16:24 ../
drwxrwxrwx 1 xxx xxx 0 Mar 21 16:24 demo/
-rwxrwxrwx 1 xxx xxx 240 Mar 21 16:45 mypipeline.yaml*

and at

/piper$ ll piper/pipelines/
total 1
drwxrwxrwx 1 xxx xxx 160 Mar 21 16:45 ./
drwxrwxrwx 1 xxx xxx 152 Mar 21 16:49 ../
-rwxrwxrwx 1 xxx xxx 240 Mar 21 16:45 mypipeline.yaml*

to the same error.

Where should this yaml file go?

Branch fork/join documentation issue

The one bolded should be replaced with "tasks" not "branches". Branches doesn't work.

Fork/Join
Executes each branch in the branches as a seperate and isolated sub-flow. Branches are executed internally in sequence.

- type: fork
  **branches**: 
     - - name: randomNumber                 <-- branch 1 start here
         label: Generate a random number
         type: randomInt
         startInclusive: 0
         endInclusive: 5000

Incorrect HTTP status code (500)

The HTTP status code returned by the API is incorrect in some cases.

For instance, when launching a pipeline with a bad parameter:

> curl -v -s -X POST -H Content-Type:application/json -d '{"pipelineId":"demo/hello","inputs":{"badprop":"Joe Jones"}}' http://localhost:8080/jobs

I got:

< HTTP/1.1 500 
< X-Application-Context: application
< Content-Type: application/json;charset=UTF-8
< Transfer-Encoding: chunked
< Date: Tue, 12 Jun 2018 10:17:03 GMT
< Connection: close
< 
* Closing connection 0
{"timestamp":"2018-06-12T10:17:03.794+0000","status":500,"error":"Internal Server Error","exception":"java.lang.IllegalArgumentException","message":"Missing required param: yourName","path":"/jobs"}⏎          

I would expect to have a 400 error code instead.

400 Bad Request: The request cannot be fulfilled due to bad syntax.

[Question] Access to the JobId via piper pipelines

Hello,

As i have mentioned in the title, I want to get access to the JobId variable in order to persist some data that will be used after the notification of the registered webhooks.
I have checked the content of ${execution} & ${this} according to the Spring Expression Language, but without success.

Thanks for your answer.

How to run project after importing in STS

I have successfully imported piper project and when i run it it got started on pot 8080.
But i want to know how i should check project is running by hitting URL on browser

Apache Kafka support

Hi,

It looks like the current implementation supports jms, rabbitmq. Do you have any future roadmap to integrate with Apache Kafka as the message broker?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.