- Pure
FaaS
withopenfaas
- Fast build with
go
- Secured with
HMAC
- Stateless by
design
(with optional 3rd party integration)- Tracing with
open-tracing
- Available as
faasflow
template
FYI: Faasflow is into conceptual state and API which may change and under active development
Faasflow allow you to define your faas functions pipeline and deploy it as a function
Create pipeline with simple call
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
// use any 3rd party to maintain state
context.SetStateManager(myMinioStatemanager)
flow.Modify(func(data []byte) ([]byte, error) {
// Set value in context with StateManager
context.Set("raw-image", data)
return data
}).
Apply("facedetect", Header("method","post")).
Modify(func(data []byte) ([]byte, error) {
// perform check
// ...
// and replay data
data, _ :=context.Get("raw-image")
// do modification if needed
return data
}).
Apply("compress", Header("method","post")).
Apply("colorify", Header("method","post")).
Callback("storage.io/bucket?id=3345612358265349126").
OnFailure(func(err error) {
// failure handler
}).
Finally(func(state string) {
// success - state.State = StateSuccess
// failure - state.State = StateFailure
// cleanup code
context.del("raw-image")
})
}
Faasflow supports sync and async function call. By default all call are async. To call a function in Sync, faasflow provide option faasflow.Sync
:
flow.Apply("function", faasflow.Sync)
One or more Async
function call results a pipeline to have multiple phases
If all calls are Sync
, pipeline will have one phase and return the result to the caller
Acronyms | description |
---|---|
Pipeline Definition | User define the flow as a pipeline by implementing the template Handle() . For a given flow the definition is always the same |
Function | A FaaS Function. A function is applied to flow by calling flow.Apply(funcName, Sync) or flow.Apply(funcName) . By Default function call are async |
Modifier | A inline function. A inline modifier function is applied as flow.Modify(func(data []byte) ([]byte, error) { return data, nil } ) |
Callback | A URL that will be called with the final/partial result. flow.Callback(url) |
Handler | A Failure handler registered as flow.OnFailure(func(err error){}) . If registered it is called if an error occured |
Finally | A Cleanup handler registered as flow.Finally(func(){}) . If registered it is called at the end if state is StateFailure otherwise StateSuccess |
Phase | Segment of a pipeline definiton which consist of one or more call to Function in Sync, Modifier or Callback . A pipeline definition has one or more phases. Async call Apply() results in a new phase. |
Context | Request context has the state of request. It abstracts the StateHandler and provide API to manage state of the request. Interface StateHandler{} can be set by user to use 3rd party storage to manage state. |
Faasflow runs four mejor steps to define and run the pipeline
Step | description |
---|---|
Build Workflow | Identify a request and build a flow. A incoming request could be a partially finished pipeline or a fresh raw request. For a partial request faasflow parse and understand the state of the pipeline from the incoming request |
Get Definition | FaasWorkflow create simple pipeline-definition with one or multiple phases based on the flow defined at Define() function in handler.go . A pipeline-definition consist of multiple phases . Each Phase includes one or more Function Call , Modifier or Callback . Always a single phase is executed in a single invokation of the flow. A same flow always outputs to same pipeline definition, which allows faasflow to be completly stateless |
Execute | Execute executes a Phase by calling the Modifier , Functions or Callback based on how user defines the pipeline. Only one Phase gets executed at a single execution of faasflow function . |
Repeat Or Response | If pipeline is not yet completed, FaasWorkflow forwards the remaining pipeline with partial execution state and the partial result to the same flow function via gateway . If the pipeline has only one phase or completed faasflow returns the output to the gateway otherwise it returns empty |
https://github.com/s8sg/faasflow/tree/master/example
faas-cli template pull https://github.com/s8sg/faasflow
faas-cli new test-flow --lang faasflow
test-flow:
lang: faasflow
handler: ./test-flow
image: test-flow:latest
environment:
read_timeout: 120
write_timeout: 120
write_debug: true
combine_output: false
environment_file:
- flow.yml
read_timeout
: A value larger thanmax
phase execution time.
write_timeout
: A value larger thanmax
phase execution time.
write_debug
: It enables the debug msg in logs.
combine_output
: It allows debug msg to be excluded fromoutput
.
To make the stack.yml look clean we can create a seperate flow.yml
with faasflow related configuration.
environment:
flow_name: "test-flow"
gateway: "gateway:8080"
enable_tracing: false
trace_server: ""
enable_hmac: false
flow_name
: The name of the flow function. Faasflow use this to forward partial request.
gateway
: We need to tell faasflow the address of openfaas gateway. All calls are made via gateway# swarm gateway: "gateway:8080" # k8 gateway: "gateway.openfaas:8080"
enable_tracing
: It ebales the opentracing for requests and their phases.
trace_server
: The address of opentracing backend jaeger.
enable_hmac
: Enable hmac to add extra layer of security for partial request forward.
flow.Apply("yourFunc1", Header("method","post")).
Modify(func(data []byte) ([]byte, error) {
// Check, update/customize data, replay data ...
return []byte(fmt.Sprintf("{ \"data\" : \"%s\" }", string(data))), nil
}).Apply("yourFunc2", Header("method","post")).
Callback("http://gateway:8080/function/send2slack",
Header("method", "post"), Query("authtoken", os.Getenv(token)))
This function will generate two phases as:
Phase 1 : Apply("yourFunc1") Modify() Phase 2: Apply("yourFunc2") Callback()
Build
faas-cli build -f test-flow.yml
Deploy
faas-cli deploy -f test-flow.yml
cat data | faas-cli invoke --async -f test-flow.yml test-flow
Function submitted asynchronously.
Edit the
test-flow/handler.go
flow.Apply("yourFunc1", Header("method","post"), faasflow.Sync).
Modify(func(data []byte) ([]byte, error) {
// Check, update/customize data, replay data ...
return []byte(fmt.Sprintf("{ \"data\" : \"%s\" }", string(data))), nil
}).Apply("yourFunc2", Header("method", "post"), faasflow.Sync)
This function will generate one phase as:
Phase 1 : Apply("yourFunc1") Modify() Apply("yourFunc2")
cat data | faas-cli invoke -f test-flow.yml test-flow > updated_data
Request can be tracked from the log by RequestId
. For each new Request a unique RequestId
is generated.
2018/08/13 07:51:59 [request `bdojh7oi7u6bl8te4r0g`] Created
2018/08/13 07:52:03 [Request `bdojh7oi7u6bl8te4r0g`] Received
Request tracing can be enabled by providing by specifying
enable_tracing: true
trace_server: "jaegertracing:5775"
jaeger
(opentracing-1.x) used for traceing
To start the trace server we run jaegertracing/all-in-one
as a service.
docker service rm jaegertracing
docker pull jaegertracing/all-in-one:latest
docker service create --constraint="node.role==manager" --detach=true \
--network func_functions --name jaegertracing -p 5775:5775/udp -p 16686:16686 \
jaegertracing/all-in-one:latest
Below is an example of tracing for an async request with 3 phases
The main state in faasflow is the execution-position
(next-phase) and the partially
completed data.
Apart from that faasflow allow user to define state with StateManager
interface.
type StateManager interface {
Set(key string, value interface{}) error
Get(key string) (interface{}, error)
Del(key string) error
}
State manager can be implemented and set by user with request context in faasflow Define()
:
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
// initialize my custom StateManager as myStateManager
context.SetStateManager(myStateManager)
}
Once a state manager is set it can be used by calling Get()
and Set()
from context
:
flow.Modify(func(data []byte) {
// parse data and set to be used later
// json.Unmarshal(&req, data)
context.Set("commitsha", req.Sha)
}).Apply("myfunc").
Modify(func(data []byte) {
// retrived the data that was set in the context
sha, _ = context.Get("commitsha")
commitsha, _ = sha.[string]
// use the query
})
By default faasflow template use requestEmbedStateManager
which embed the state data along with the request for the next phase. For bigger values it is recommended to pass it with custom StateManager
.
Once StateManager
is overridden, all call to Set()
, Get()
and del()
will call the provided StateManager
Http Query to flow can be used from context as
flow.Apply("myfunc", Query("auth-token", context.Query.Get("token"))). // pass as a function query
Modify(func(data []byte) {
token = context.Query.Get("token") // get query inside modifier
})
Finally provides a way to cleanup context and other resources and do post completion work of the pipeline. A Finally method can be used on flow as:
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
// initialize my custom StateManager as myStateManager
context.SetStateManager(myStateManager)
// Define flow
flow.Modify(func(data []byte) {
// parse data and set to be used later
// json.Unmarshal(&req, data)
context.Set("commitsha", req.Sha)
}).Apply("myfunc").
Modify(func(data []byte) {
// retrived the data in different phase from context
sha, _ = context.Get("commitsha")
commitsha, _ = sha.[string]
}).Finally(func() {
// delete the state resource
context.Del("commitsha")
})
}
Issue/Suggestion Create an issue at faasflow-issue.
ReviewPR/Implement Create Pull Request at faasflow-pr.