Code Monkey home page Code Monkey logo

dagflow's Introduction

DAGFlow

lint UT Codacy Badge GoDoc License: MIT

a DAG task engine based on asynq

QuickStart

server side

  1. prepare asynq server mux
    srv := asynq.NewServer(
    	asynq.RedisClientOpt{Addr: redisAddr},
    	asynq.Config{
    		// Specify how many concurrent workers to use
    		Concurrency: 10,
    		// Optionally specify multiple queues with different priority.
    		Queues: map[string]int{
    			"critical": 6,
    			"default":  3,
    			"low":      1,
    		},
    		// See the godoc for other configuration options
    	},
    )
    mux := asynq.NewServeMux()
    please refer to asynq's doc to figure out more config options.
  2. create dagflow service
    svc, err := service.New(&types.Config{
    	Redis: types.RedisConfig{
    		Addr:   "127.0.0.1:6379",
    		Expire: 120,
    	},
        Store: types.StoreConfig{
    	    Type: "redis",
        },
    }, nil)
    if err != nil {
    	log.Fatal("failed to create service", err)
    }
  3. create a flow object and register it to dagflow service
    f, err := svc.NewFlow("f1")
    if err != nil {
    	log.Fatal("failed to create flow", err)
    }
    if err = f.Node("n1", incOp); err != nil {
    	log.Fatal("failed to create node", err)
    }
    // for complex dag, you can use RegisterFlowsWithDefinitor
    svc.RegisterFlows(mux, f)
  4. start asynq server
    if err := srv.Run(mux); err != nil {
    	log.Fatalf("could not run server: %v", err)
    }

client side

  1. create dagflow service, same as step 2 in server side
    svc, err := service.New(&types.Config{
    	Redis: types.RedisConfig{
    		Addr:   "127.0.0.1:6379",
    		Expire: 120,
    	},
        Store: types.StoreConfig{
    	    Type: "redis",
        },
    }, nil)
    if err != nil {
    	log.Fatal("failed to create service", err)
    }
  2. create a flow object and register it to dagflow service, same as step 3 in server side except mux should be set to nil
    f, err := svc.NewFlow("f1")
    if err != nil {
    	log.Fatal("failed to create flow", err)
    }
    if err = f.Node("n1", incOp); err != nil {
    	log.Fatal("failed to create node", err)
    }
    // for complex dag, you can use RegisterFlowsWithDefinitor
    svc.RegisterFlows(nil, f)
  3. submit dagflow tasks
    svc.Submit("f1", []byte(`1`))

DAG

single node DAG

f, err := svc.NewFlow("f1")
if err != nil {
    log.Fatal("failed to create flow", err)
}
if err = f.Node("n1", incOp); err != nil {
    log.Fatal("failed to create node", err)
}

complex DAG

func prepareFlow(f *flow.Flow) error {
	if err := f.Node("l1n1", incOp); err != nil {
		return err
	}
	if err := f.Node("l2n1", incOp); err != nil {
		return err
	}
	if err := f.Node("l2n2", decOp); err != nil {
		return err
	}
	if err := f.Node("l3n1", mulOp, flow.WithAggregator(func (dataMap map[string][]byte) ([]byte, error) {
        l2n1Result := dataMap["l2n1"]
        l2n2Result := dataMap["l2n2"]
        // do anything you want to construct input data for node l3n1
    })); err != nil {
		return err
	}
	if err := f.Edge("l1n1", "l2n1"); err != nil {
		return err
	}
	if err := f.Edge("l1n1", "l2n2"); err != nil {
		return err
	}
	if err := f.Edge("l2n1", "l3n1"); err != nil {
		return err
	}
	if err := f.Edge("l2n2", "l3n1"); err != nil {
		return err
	}
	return nil
}

SwitchNode

SwitchNode is a special type node which works like switch case statment in golang

f, err := svc.NewFlow("f1")
if err != nil {
    log.Fatal("failed to create flow", err)
}
if err = f.SwitchNode("n1", func(data []byte) string {
    return "+"
}, map[string]flow.NodeFunc{
    "+": incOp,
    "-": decOp,
}); err != nil {
    log.Fatal("failed to create node", err)
}

dagflow's People

Contributors

yuyang0 avatar

Stargazers

KooFrank avatar WB01782015 avatar

Watchers

 avatar

Forkers

nyl1001

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.