Code Monkey home page Code Monkey logo

gleam's Introduction

Gleam

Build Status GoDoc Wiki Go Report Card codecov

Gleam is a high performance and efficient distributed execution system, and also simple, generic, flexible and easy to customize.

Gleam is built in Go, and the user defined computation can be written in Go, Unix pipe tools, or any streaming programs.

High Performance

  • Pure Go mappers and reducers have high performance and concurrency.
  • Data flows through memory, optionally to disk.
  • Multiple map reduce steps are merged together for better performance.

Memory Efficient

  • Gleam does not have the common GC problem that plagued other languages. Each executor runs in a separated OS process. The memory is managed by the OS. One machine can host many more executors.
  • Gleam master and agent servers are memory efficient, consuming about 10 MB memory.
  • Gleam tries to automatically adjust the required memory size based on data size hints, avoiding the try-and-error manual memory tuning effort.

Flexible

  • The Gleam flow can run standalone or distributed.
  • Adjustable in memory mode or OnDisk mode.

Easy to Customize

  • The Go code is much simpler to read than Scala, Java, C++.

One Flow, Multiple ways to execute

Gleam code defines the flow, specifying each dataset(vertex) and computation step(edge), and build up a directed acyclic graph(DAG). There are multiple ways to execute the DAG.

The default way is to run locally. This works in most cases.

Here we mostly talk about the distributed mode.

Distributed Mode

The distributed mode has several names to explain: Master, Agent, Executor, Driver.

Gleam Driver

  • Driver is the program users write, it defines the flow, and talks to Master, Agents, and Executors.

Gleam Master

  • The Master is one single server that collects resource information from Agents.
  • It stores transient resource information and can be restarted.
  • When the Driver program starts, it asks the Master for available Executors on Agents.

Gleam Agent

  • Agents runs on any machine that can run computations.
  • Agents periodically send resource usage updates to Master.
  • When the Driver program has executors assigned, it talks to the Agents to start Executors.
  • Agents also manage datasets generated by each Executors.

Gleam Executor

  • Executors are started by Agents. They will read inputs from external or previous datasets, process them, and output to a new dataset.

Dataset

  • The datasets are managed by Agents. By default, the data run only through memory and network, not touching slow disk.
  • Optionally the data can be persist to disk.

By leaving it in memory, the flow can have back pressure, and can support stream computation naturally.

Documentation

Standalone Example

Word Count

Word Count

Basically, you need to register the Go functions first. It will return a mapper or reducer function id, which we can pass it to the flow.

package main

import (
	"flag"
	"strings"

	"github.com/chrislusf/gleam/distributed"
	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
	"github.com/chrislusf/gleam/plugins/file"
)

var (
	isDistributed   = flag.Bool("distributed", false, "run in distributed or not")
	Tokenize  = gio.RegisterMapper(tokenize)
	AppendOne = gio.RegisterMapper(appendOne)
	Sum = gio.RegisterReducer(sum)
)

func main() {

	gio.Init()   // If the command line invokes the mapper or reducer, execute it and exit.
	flag.Parse() // optional, since gio.Init() will call this also.

	f := flow.New("top5 words in passwd").
		Read(file.Txt("/etc/passwd", 2)).  // read a txt file and partitioned to 2 shards
		Map("tokenize", Tokenize).    // invoke the registered "tokenize" mapper function.
		Map("appendOne", AppendOne).  // invoke the registered "appendOne" mapper function.
		ReduceByKey("sum", Sum).         // invoke the registered "sum" reducer function.
		Sort("sortBySum", flow.OrderBy(2, true)).
		Top("top5", 5, flow.OrderBy(2, false)).
		Printlnf("%s\t%d")

	if *isDistributed {
		f.Run(distributed.Option())
	} else {
		f.Run()
	}

}

func tokenize(row []interface{}) error {
	line := gio.ToString(row[0])
	for _, s := range strings.FieldsFunc(line, func(r rune) bool {
		return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
	}) {
		gio.Emit(s)
	}
	return nil
}

func appendOne(row []interface{}) error {
	row = append(row, 1)
	gio.Emit(row...)
	return nil
}

func sum(x, y interface{}) (interface{}, error) {
	return gio.ToInt64(x) + gio.ToInt64(y), nil
}

Now you can execute the binary directly or with "-distributed" option to run in distributed mode. The distributed mode would need a simple setup described later.

A bit more blown up example is here, using the predefined mapper or reducer: https://github.com/chrislusf/gleam/blob/master/examples/word_count_in_go/word_count_in_go.go

Word Count by Unix Pipe Tools

Here is another way to do the similar by unix pipe tools.

Unix Pipes are easy for sequential pipes, but limited to fan out, and even more limited to fan in.

With Gleam, fan-in and fan-out parallel pipes become very easy.

package main

import (
	"fmt"

	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
	"github.com/chrislusf/gleam/gio/mapper"
	"github.com/chrislusf/gleam/plugins/file"
	"github.com/chrislusf/gleam/util"
)

func main() {

	gio.Init()

	flow.New("word count by unix pipes").
		Read(file.Txt("/etc/passwd", 2)).
		Map("tokenize", mapper.Tokenize).
		Pipe("lowercase", "tr 'A-Z' 'a-z'").
		Pipe("sort", "sort").
		Pipe("uniq", "uniq -c").
		OutputRow(func(row *util.Row) error {

			fmt.Printf("%s\n", gio.ToString(row.K[0]))

			return nil
		}).Run()

}

This example used OutputRow() to process the output row directly.

Join two CSV files.

Assume there are file "a.csv" has fields "a1, a2, a3, a4, a5" and file "b.csv" has fields "b1, b2, b3". We want to join the rows where a1 = b2. And the output format should be "a1, a4, b3".

package main

import (
	. "github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
	"github.com/chrislusf/gleam/plugins/file"
)

func main() {

	gio.Init()

	f := New("join a.csv and b.csv by a1=b2")
	a := f.Read(file.Csv("a.csv", 1)).Select("select", Field(1,4)) // a1, a4
	b := f.Read(file.Csv("b.csv", 1)).Select("select", Field(2,3)) // b2, b3

	a.Join("joinByKey", b).Printlnf("%s,%s,%s").Run()  // a1, a4, b3

}

Distributed Computing

Setup Gleam Cluster Locally

Start a gleam master and several gleam agents

// start "gleam master" on a server
> go get github.com/chrislusf/gleam/distributed/gleam
> gleam master --address=":45326"

// start up "gleam agent" on some different servers or ports
> gleam agent --dir=2 --port 45327 --host=127.0.0.1
> gleam agent --dir=3 --port 45328 --host=127.0.0.1

Setup Gleam Cluster on Kubernetes

Install Kubernetes tools At the very least you will need a local K8s cluster, Docker & Kubectl. Docker Desktop provides all of this out the box.

Install Skaffold

Choose the appropriate binary here. For example, ARM64:

curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-darwin-arm64 && \
sudo install skaffold /usr/local/bin/

Run Latest Version

cd ./k8s
skaffold run --profile base 

Use skaffold delete --profile base to bring the cluster down.

Alternately Build & Run Local Version

You can build a local copy of gleam for development with hot reloading:

cd ./k8s
skaffold dev --profile dev 

Change Execution Mode.

After the flow is defined, the Run() function can be executed in local mode or distributed mode.

  f := flow.New("")
  ...
  // 1. local mode
  f.Run()

  // 2. distributed mode
  import "github.com/chrislusf/gleam/distributed"
  f.Run(distributed.Option())
  f.Run(distributed.Option().SetMaster("master_ip:45326"))

Important Features

Status

Gleam is just beginning. Here are a few todo items. Welcome any help!

  • Add new plugin to read external data.
  • Add windowing functions similar to Apache Beam/Flink. (in progress)
  • Add schema support for each dataset.
  • Support using SQL as a flow step, similar to LINQ.
  • Add dataset metadata for better caching of often re-calculated data.

Especially Need Help Now:

  • Go implementation to read Parquet files.

Please start to use it and give feedback. Help is needed. Anything is welcome. Small things count: fix documentation, adding a logo, adding docker image, blog about it, share it, etc.

License

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

gleam's People

Contributors

anacrolix avatar awaseem2 avatar bertrandgoddard avatar chenbinhi avatar chrislusf avatar dependabot[bot] avatar dominicfollett avatar jackstenglein avatar jcftang avatar july2993 avatar lewgun avatar martindam avatar mountkin avatar mschneider82 avatar njt99 avatar testwill avatar vsoch avatar walnut-tom avatar xitongsys avatar zhaoyunyang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gleam's Issues

Driver program scattering amongst agent servers

Each time I run the flow, the /tmp folder in every agent server is populated with a folder named by job id and the driver program in it. Somehow my driver program grows into a 26MB file, and the accumulation of the same file is kind of a problem for me. Is there any way to workaround this?

Can't collect all of the expected rows in OutputRow

Say I submit a source of slice[53][1], I am expecting to get the final reduced output of the same number of rows, 53. But I get only 18, which means 35 of them are lost.
screen shot 2017-08-28 at 5 52 08 pm
screen shot 2017-08-28 at 5 54 23 pm
screen shot 2017-08-28 at 5 52 51 pm

Flow creation code:

f := flow.New("KDJ Score Calculation").Slices(mapSource).RoundRobin("rr", int(shard)).
Map("kdjScorer", KdjScorer).
ReduceBy("kdjScoreCollector", KdjScoreCollector, sortOption).
OutputRow(func(r *util.Row) error {
logr.Debugf("Output Row: %+v", r)
rep.RowIds = append(rep.RowIds, r.K[0].(string))
rep.Scores = append(rep.Scores, r.V[0].(float64))
return nil
})

WriteTo encoding error: msgp: type "map[string][]float64" not supported

Every time I update my own project, gleam is updated along with it. My driver program fails to serve request with the following error:

2017/09/03 23:22:10 Failed to run task Slices-0: WriteTo encoding error: msgp: type "map[string][]float64" not supported
2017/09/03 23:22:10 Failed to execute on driver side: Failed to send source data: WriteTo encoding error: msgp: type "map[string][]float64" not supported

Is there anything changed with the mapper/reducer api?

Example do not finish.

Running the CSV example in standalone mode on windows never finishes.
It is also not using any CPU. So it seem to be blocking somewhere.

Do you test on windows? What other information can I give?

Invalid memory address or nil pointer dereference

I have a large data set divided to groups, each group requires heavy calculation and I submit them to gleam flow to do the job (concurrently). For each group of data set I'd called flow.New()...Run(distributedOption), but when the whole process progressed at about 1.5% or so, I got a nil pointer error. Processing the group of data set individually at which the whole process failed didn't trigger the error. Is it safe to call flow.New()...Run(distributedOption) concurrently, or is there any limitation in it? Is it advisable to submit groups of data set through channel instead of creating new flow for each?

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x458651]

goroutine 103250 [running]:
github.com/golang/protobuf/proto.(*Buffer).EncodeRawBytes(0xc424b23528, 0x0, 0x9c, 0x9c, 0x0, 0x0)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:201 +0x8f
github.com/golang/protobuf/proto.(*Buffer).enc_proto3_slice_byte(0xc424b23528, 0xc42039fcc0, 0xc4218bdec0, 0xc4200103c0, 0xc420010301)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:691 +0x130
github.com/golang/protobuf/proto.(*Buffer).enc_struct(0xc424b23528, 0xc420d7f0e0, 0xc4218bdec0, 0x0, 0xc428123460)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1235 +0x3c6
github.com/golang/protobuf/proto.(*Buffer).enc_len_struct.func1(0xc420380320, 0xc42155eae0)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1305 +0x3c
github.com/golang/protobuf/proto.(*Buffer).enc_len_thing(0xc424b23528, 0xc428123518, 0xc428123628, 0x41223d, 0xc424b23528)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1313 +0x7f
github.com/golang/protobuf/proto.(*Buffer).enc_len_struct(0xc424b23528, 0xc420d7f0e0, 0xc4218bdec0, 0xc428123628, 0xc428123638, 0x50f0a9)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1305 +0x6d
github.com/golang/protobuf/proto.(*Buffer).enc_slice_struct_message(0xc424b23528, 0xc4203dfa40, 0xc42220aae0, 0x0, 0x0)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:985 +0x199
github.com/golang/protobuf/proto.(*Buffer).enc_struct(0xc424b23528, 0xc420d7f040, 0xc42220aae0, 0x0, 0x0)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1235 +0x3c6
github.com/golang/protobuf/proto.(*Buffer).enc_len_struct.func1(0xc424b23528, 0xa)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1305 +0x3c
github.com/golang/protobuf/proto.(*Buffer).enc_len_thing(0xc424b23528, 0xc428123840, 0xc428123950, 0x41223d, 0xc424b23528)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1313 +0x7f
github.com/golang/protobuf/proto.(*Buffer).enc_len_struct(0xc424b23528, 0xc420d7f040, 0xc42220aae0, 0xc428123950, 0x0, 0x0)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1305 +0x6d
github.com/golang/protobuf/proto.(*Buffer).enc_slice_struct_message(0xc424b23528, 0xc4203df400, 0xc423ca5720, 0x0, 0x0)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:985 +0x199
github.com/golang/protobuf/proto.(*Buffer).enc_struct(0xc424b23528, 0xc420d7ef00, 0xc423ca5720, 0xdfb7c0, 0xc423ca5720)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:1235 +0x3c6
github.com/golang/protobuf/proto.(*Buffer).Marshal(0xc424b23528, 0x12d4c60, 0xc423ca5720, 0xc428231900, 0x0)
/path/to/go/src/github.com/golang/protobuf/proto/encode.go:274 +0x2b4
google.golang.org/grpc.protoCodec.marshal(0xdfb7c0, 0xc423ca5720, 0xc424b23520, 0xc421f49a54, 0xc42588c1f8, 0xc422b28c50, 0xb, 0xc42588c150)
/path/to/go/src/google.golang.org/grpc/codec.go:63 +0xe8
google.golang.org/grpc.protoCodec.Marshal(0xdfb7c0, 0xc423ca5720, 0xc422f38ec8, 0xc423848340, 0x9b, 0xc422b28c88, 0x411230)
/path/to/go/src/google.golang.org/grpc/codec.go:73 +0x73
google.golang.org/grpc.(*protoCodec).Marshal(0x133f108, 0xdfb7c0, 0xc423ca5720, 0x1320a00, 0xe786a8, 0xc423848358, 0xc422b28c70, 0x41223d)
<autogenerated>:48 +0x59
google.golang.org/grpc.encode(0x12d7e20, 0x133f108, 0xdfb7c0, 0xc423ca5720, 0x0, 0x0, 0x0, 0x0, 0xc422b28f90, 0x2, ...)
/path/to/go/src/google.golang.org/grpc/rpc_util.go:302 +0x2c4
google.golang.org/grpc.(*clientStream).SendMsg(0xc426c0c160, 0xdfb7c0, 0xc423ca5720, 0x0, 0x0)
/path/to/go/src/google.golang.org/grpc/stream.go:365 +0x11d
github.com/chrislusf/gleam/pb.(*gleamMasterSendFlowExecutionStatusClient).Send(0xc421130020, 0xc423ca5720, 0x0, 0x1)
/path/to/go/src/github.com/chrislusf/gleam/pb/gleam.pb.go:1929 +0x49
github.com/chrislusf/gleam/distributed/driver.(*FlowDriver).reportStatus(0xc423cc8000, 0x12d8620, 0xc423cc8ac0, 0xc420373030, 0xe53ca9, 0xf, 0xc4257c6300)
/path/to/go/src/github.com/chrislusf/gleam/distributed/driver/driver.go:154 +0x39d
created by github.com/chrislusf/gleam/distributed/driver.(*FlowDriver).RunFlowContext
/path/to/go/src/github.com/chrislusf/gleam/distributed/driver/driver.go:93 +0x4d8

Update README for Golang version requirements

In the past, I have had minor problems with keeping up with colliding Go versions that can be deceiving. I suggest maybe keeping a Travis CI auto-build for everything < Go tip for people discovering this project as well as wanting to dive in. Or, the alternative, slapping "Requires Go > 1.6", my guess because Googles grpc.

You do a great job at keeping up with this and it's an awesome accomplishment that I love seeing Open Source.

why no output in terminal?

package main

import (
"os"

. "github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/plugins/csv"

)

func main() {

f := New()
a := f.ReadFile(csv.New("a.csv")).Select(Field(1,4)) // a1, a4
b := f.ReadFile(csv.New("b.csv")).Select(Field(2,3)) // b2, b3

a.Join(b).Fprintf(os.Stdout, "%s,%s,%s\n").Run()  // a1, a4, b3

}

above is an example code, run this code in my environment, no output!

my environment: go 1.8, plz help me

Regression after the recent changes

The code from a week ago is working, but with the latest version I get:

2017/06/17 06:21:38 Start Job Status URL http://127.0.0.1:45326/job/940783262
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x8 pc=0xb11a6c]

goroutine 24 [running]:
github.com/chrislusf/gleam/flow.(*Flow).Channel.func1(0x0, 0x0, 0x0, 0xc4202a80f0, 0x1, 0x1, 0x0, 0x1, 0x30)
/go/src/github.com/chrislusf/gleam/flow/dataset_source.go:137 +0x8c
github.com/chrislusf/gleam/flow.(*Step).RunFunction(0xc4201b7c70, 0xc420072a80, 0x0, 0x0)
/go/src/github.com/chrislusf/gleam/flow/step.go:53 +0x38d
github.com/chrislusf/gleam/distributed/driver/scheduler.(*Scheduler).localExecuteSource(0xc4201c2600, 0x110f940, 0xc4201c2640, 0xc4200729c0, 0xc420072a80, 0xc4201b4d50, 0x0, 0x1)
/go/src/github.com/chrislusf/gleam/distributed/driver/scheduler/scheduler_execute.go:101 +0x2f9
github.com/chrislusf/gleam/distributed/driver/scheduler.(*Scheduler).localExecute(0xc4201c2600, 0x110f940, 0xc4201c2640, 0xc4200729c0, 0xc420072a80, 0xc4201b4d50, 0xc420028d30, 0xc4201c26c0)
/go/src/github.com/chrislusf/gleam/distributed/driver/scheduler/scheduler_execute.go:83 +0x6e
github.com/chrislusf/gleam/distributed/driver/scheduler.(*Scheduler).ExecuteTaskGroup.func1(0xc420073680, 0xd3e4a0, 0xc420073680)
/go/src/github.com/chrislusf/gleam/distributed/driver/scheduler/scheduler_execute_task_group.go:34 +0x57
github.com/chrislusf/gleam/pb.(*FlowExecutionStatus_TaskGroup).Track(0xc420072e40, 0xc420028eb8, 0x0, 0x0)
/go/src/github.com/chrislusf/gleam/pb/status_proto_helper.go:17 +0x103
github.com/chrislusf/gleam/distributed/driver/scheduler.(*Scheduler).ExecuteTaskGroup(0xc4201c2600, 0x110f940, 0xc4201c2640, 0xc4200729c0, 0xc420072e40, 0xc4201b4d50, 0xc4201ca0a0, 0x4030aaaaaaaaaaab, 0xc4201c2300, 0x2, ...)
/go/src/github.com/chrislusf/gleam/distributed/driver/scheduler/scheduler_execute_task_group.go:35 +0xc4e
github.com/chrislusf/gleam/distributed/driver.(*FlowDriver).RunFlowContext.func2(0xc4201c2600, 0x110f940, 0xc4201c2640, 0xc4200729c0, 0xc4201c2340, 0xc4201b4d50, 0xc4201ca0a0)
/go/src/github.com/chrislusf/gleam/distributed/driver/driver.go:84 +0xd2
created by github.com/chrislusf/gleam/distributed/driver.(*FlowDriver).RunFlowContext
/go/src/github.com/chrislusf/gleam/distributed/driver/driver.go:85 +0x3c3

Dynamic Source, or Multiple Pass Flow

Is it possible to modify a Source such that the next time fc.Run() is called, the new source is called and updated as well? This would give the "illusion" of state... maybe that defeats the purpose of a DAG...

What I am trying

fc.Source(...).Map(...).Reduce(...).Output(...)
fc.Run()
// ... Wait
// Update fc.Source(...) or reset the source so that Run calls the source function again?
fc.Run()

What I want to accomplish

Basically, I would like to accomplish something like this:

Run => flow.Source() -> (Some Mappers/Reducers) -> flow.Output() -> (Update flow.Source)
Run()
Run()
...
Run()
... And so on...

I understand the idea behind Directed Acyclic Graphs and that this kind of defeats the purpose, but maybe you could help point me in the right direction... Essentially, I want to accomplish state by passing new source information based on what was collected previous.

Thank you for you help.

deploy via Kubernetes

Need some help from people with Kubernetes experience on how to deploy the docker image and make the "gleam agent" know where is the "gleam master".

Local Sort broken?

The code below should be equivalent to sort | uniq -c

flow.New().
	Strings([]string{"a", "a", "b", "a"}).
	Map(`function(x) return x, 1 end`).
	ReduceBy(`function(i1, i2) return i1 + i2 end`).
	Fprintf(os.Stdout, "%s %d\n").
	Run()

The expected output should be something like

a 3
b 1

but when I run it, I get:

a 2
b 1
a 1

It looks like the local sort is not working.

Below, I have implemented the example in a test:

func TestCountStringOccurances(t *testing.T) {
	var s string
	var count int
	flow.New().
		Strings([]string{"a", "a", "b", "a"}).
		Map(`function(x) return x, 1 end`).
		ReduceBy(`function(i1, i2) return i1 + i2 end`).
		SaveFirstRowTo(&s, &count).
		Run()

	assert.Equal(t, "a", s, "We expect the first row to be 'a'")
	assert.Equal(t, 3, count, "The count should be 3!")

Datasets with nil values are not supported

The following test reproduces the issue:

func TestLuaMapWithNil(t *testing.T) {

	testLuaScript(
		"test mapper",
		func(script Script) {
			script.Map(`
			function(x, y, z)
				return x, y, z
			end`)
		},
		func(inputWriter io.Writer) {
			// The row we write has nil on index 1:
			util.WriteRow(inputWriter, 1999, nil, "hello")
		},
		func(outputReader io.Reader) {
			row, err := util.ReadRow(outputReader)
			if err != nil {
				t.Errorf("read row error: %v", err)
				return
			}
			if row[1] != nil {
				t.Errorf("Row no longer contains nil: %+v", row)
			}
		},
	)
}

The problem is that the decodeRow function in lua.go does now support nil values. The problem is that the decoded values are inserted into a table using the table.insert function which does not support nil values. See also http://lua-users.org/wiki/StoringNilsInTables

match to CQRS and events systems

There are allot of relationships in Glow and Gleam with CQRS systems.

Have to say this look allot better than Glow. I used glow in a project that was a CQRS architecture.

Why not use some of them same libraries and techniques. Otherwise your reinventing a huge amount of the wheel in the infrastructure layer. The core logic of the flow namespace is great, but for getting distributed computation going, why not just use NATS streaming. It integrates discovery, storage, back pressure, etc etc.
https://github.com/nats-io/nats-streaming-server

just saying...

Failed to run gleam in distributed mode

Hi,
I tried to run gleam in the distributed mode. Here is my step:

First, I started the gleam master and agents in my local computer. MessagePack.lua is readly.

// start "gleam master" on a server
> go get github.com/chrislusf/gleam/distributed/gleam
> gleam master --address=":45326"
> gleam agent --dir=2 --port 45327 --host=127.0.0.1
> gleam agent --dir=3 --port 45328 --host=127.0.0.1

Then I run my code in the distributed mode

package main

import (
	"os"

	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/distributed"
)
func main() {
	f :=flow.New()
	f.TextFile("/home/phuongdv/alice30.txt").FlatMap(`
	    function(line)
	        return line:gmatch("%w+")
	    end
	`).Map(`
	    function(word)
	        return word, 1
	    end
	`).ReduceBy(`
	    function(x, y)
	        return x + y
	    end
	`).Fprintf(os.Stdout, "%s,%d\n")

	// distributed mode
	f.Run(distributed.Option())
	f.Run(distributed.Option().SetMaster("master_ip:45326"))

}

And my application stuck in this error:


no gleam.yaml found.
2017/01/20 11:28:52 localhost:45326 allocated 1 executors with 0 MB memory.
2017/01/20 11:28:52 127.0.0.1:45328 FlatMap.0-Map.0-LocalSort.0-LocalReduceBy.0> starting with 0 MB memory...
2017/01/20 11:28:52 remote execution error: 127.0.0.1:45328 FlatMap.0-Map.0-LocalSort.0-LocalReduceBy.0> no gleam.yaml found.

and the gaeam agent throws this exception:

2017/01/20 11:28:16 Cancelled command /home/phuongdv/Go/gleam [/home/phuongdv/Go/gleam execute --note FlatMap.0-Map.0-LocalSort.0-LocalReduceBy.0]
2017/01/20 11:28:16 Failed to run command FlatMap.0-Map.0-LocalSort.0-LocalReduceBy.0: signal: killed

How can I solve this problem?

Thanks

I am running a very easy word count, and I would like to see the data when map working.

I add a statement on map processing,

func tokenize(row []interface{}) error {
line := string(row[0].([]byte))

fmt.Println(row[0]) // I add
fmt.Println(line) // I add 
for _, s := range strings.FieldsFunc(line, func(r rune) bool {
	return !('A' <= r && r <= 'Z' || 'a' <= r && r <= 'z' || '0' <= r && r <= '9')
}) {
	gio.Emit(s)
}
return nil

}

when I added, there is a error,i want to know how to println the data when Map or reduce working, and how should I do

there is the error:
➜ session git:(master) ✗ ./session
Running in standalone mode.
2017/04/25 17:30:43 Failed to execute mapper [./session -gleam.mapper=m2]: input row error: Failed to read message content size 540357467, but read only 40020: unexpected EOF

The datasets can be cached in memory?

I want to find a way to use distributed computation system for computing datasets cached in memory.
The datasets only load in one time,and cached in memory.
Can the gleam help me ?
Tks very much!

Regression: unknown shorthand flag: 'g' in -gleam.mapper

I believe the last change leads to regression.

This is what I get for previously working scenario:

Error: unknown shorthand flag: 'g' in -gleam.mapper
Usage:
goparallel run [flags]

Flags:
-a, --args string Test arguments (default "n/a")
-b, --binary string The binary file with the test to execute (default "n/a")
-o, --output string The file the output to be saved to (default "result")
-p, --processes int How many processes to run (default 3)
-t, --test string Test to be executed (default "n/a")

FFmpeg encoding cluster !!

Hi Chris,

Hopes you've been doing great. Since we're using one of your product SeaWeedFS for our video streaming website as thumbs servers. It has been working dramatically great. We've around 5K concurrent requests on each thumb server & haven't encountered any I/O bottlenecks yet. Currently we've 2 x thumbs servers & soon to add more. Thanks for the great product :-) .

Getting back to the point now, as we've video streaming website where users upload videos, servers encode it with ffmpeg/imagick & store it into storages for serving over HTTP, we're currently striving to make encoding faster since our encoding servers are very simple, each video gets encoded on single server what we're looking for is Encoding cluster where single video can be encoded into multiple servers.

I was looking at different solutions but looks like there are not much available & thought if i could ask for your advice if Gleam can be used as encoding cluster or if you've something better suggestion it'd be really helpful?

Thanks a lot for all the work & efforts :)

Regards.
Shahzaib

Define Map in golang like glow?

I'd like to begin using gleam (having used glow for awhile now). Is there any way to write Map functions in golang? i.e. Is there a way to do something like the following so that I can continue to utilize go packages? Or can I only define Maps/Reduces with lua/shell scripts in gleam?

package main

import (
    ".../somepackage"
    "github.com/chrislusf/gleam/flow"
)

func main() {

    flow.New().TextFile("/etc/passwd").Map(func(line string) string {
        res, _ := somepackage.Transform(line)
        return res
    }).Fprintf(os.Stdout, "%s\n").Run()
}

Is there is any example usage case in streaming processing?

Is there is any example usage case in streaming processing?

Like the Driver is keep alive, and keep read from one streaming source like kafka,
process each data batch and send to a streaming output like kafka.

Some question:
1. Is any "example usage case" or "best practices" to process streaming data?
2. If Gleam does work, whether need to restart one Gleam each "streaming data slice"?
Like Kafka source, whether need to reconnect to the broker list each time?
3. For the result each batch done, whether Gleam support any method to keep some "dataset" for next batch? Or a mechanism to support "Global Context Dataset" across Different Gleam batch?
4. Whether support one mechanism like one "Never Stop" distributed mapper can produce "streaming data slice" dataset, and trigger the slice dataset batch to run in sequence? To avoid recronnect to kafka.
5. Any orhter Idea is help to processing streaming data?

thanks~

Agent log crammed with CollectExecutionStatistics(_) error

excerpt from the log file:

2017/08/22 23:13:40 &{0xc420274000}.CollectExecutionStatistics(_) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc420252000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc4201be1c0}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc4201b21c0}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc420130700}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc4201561c0}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc4201ba1c0}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc420274000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 in memory 5:1 finished writing f1213273487-d5-s1 0 bytes
2017/08/22 23:13:40 in memory 6:0 finished reading f1213273487-d5-s1 0 bytes
2017/08/22 23:13:40 &{0xc420388000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 &{0xc42009c540}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 in memory 5:6 finished writing f1213273487-d5-s6 0 bytes
2017/08/22 23:13:40 in memory 6:0 finished reading f1213273487-d5-s6 0 bytes
2017/08/22 23:13:40 in memory 5:9 finished writing f1213273487-d5-s9 0 bytes
2017/08/22 23:13:40 in memory 6:0 finished reading f1213273487-d5-s9 0 bytes
2017/08/22 23:13:40 &{0xc4201fe380}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:40 in memory 5:11 finished writing f1213273487-d5-s11 0 bytes
2017/08/22 23:13:40 in memory 6:0 finished reading f1213273487-d5-s11 0 bytes
2017/08/22 23:13:40 &{0xc420374000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:41 deleting f1213273487-d5-s6
2017/08/22 23:13:41 deleting f1213273487-d5-s11
2017/08/22 23:13:41 deleting f1213273487-d5-s1
2017/08/22 23:13:41 deleting f1213273487-d5-s9
2017/08/22 23:13:46 in memory 5:5 starts writing f1750670629-d5-s5 expected reader:1
2017/08/22 23:13:46 &{0xc42034c000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:46 in memory 5:1 starts writing f1750670629-d5-s1 expected reader:1
2017/08/22 23:13:46 &{0xc4202a6000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:46 &{0xc420130380}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:46 &{0xc420230000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:46 &{0xc4201c81c0}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:46 &{0xc4201ca1c0}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:13:46 in memory 6:0 waits for f1750670629-d5-s1
2017/08/22 23:13:46 in memory 6:0 start reading f1750670629-d5-s1
2017/08/22 23:13:46 in memory 6:0 waits for f1750670629-d5-s5
2017/08/22 23:13:46 in memory 6:0 start reading f1750670629-d5-s5
2017/08/22 23:15:15 &{0xc42014e1c0}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:15:15 &{0xc4201c8540}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:15:15 &{0xc420278000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:15:15 &{0xc42023a000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:15:15 in memory 5:5 finished writing f1750670629-d5-s5 0 bytes
2017/08/22 23:15:15 in memory 6:0 finished reading f1750670629-d5-s5 0 bytes
2017/08/22 23:15:15 in memory 5:1 finished writing f1750670629-d5-s1 0 bytes
2017/08/22 23:15:15 in memory 6:0 finished reading f1750670629-d5-s1 0 bytes
2017/08/22 23:15:15 &{0xc420380000}.CollectExecutionStatistics(
) = , rpc error: code = Unavailable desc = grpc: the connection is unavailable
2017/08/22 23:15:15 &{0xc42009c1c0}.CollectExecutionStatistics(
) = _, rpc error: code = Unavailable desc = grpc: the connection is unavailable

My application gives me unexpected result running in distributed mode which consists of 4 agent nodes. If I run it with local master and agent, everything seems fine. Not sure if this sort of error is related.
2017/08/22 23:15:15 &{0xc42014e1c0}.CollectExecutionStatistics(_) = _, rpc error: code = Unavailable desc = grpc: the connection is unavailable

Failed to read stdout: read |0: file already closed

I've updated to the latest version of gleam with cleanup code added, but now I always get one less item in my reducer. Not sure it is related to the cleanup yet, I get something suspicious from the log:

2017/09/02 23:53:42 sendExecutionRequest stream receive: rpc error: code = Unknown desc = Failed to read stdout: read |0: file already closed

screen shot 2017-09-02 at 11 07 19 pm

Streaming using Gleam

Hi I would like to know if Gleam supports stream processing for through

  1. raw sockets
  2. kafka
  3. aws kinesis

If it is possible, is there an example of how to do it. I am exploring golang alternatives of Spark Streaming.

Help wanted?

is there a separate list maintained elsewhere for help-wanted? I see only one for K8 deployment. Anything else? Thanks

Go deserialization of map[string] results in panic

Seems like there is an issue with how map types are deserialized. The issue can be reproduced with the following code:

package main

import (
	"os"

	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
)

var (
	ToDict = gio.RegisterMapper(toDict)
)

func toDict(row []interface{}) error {
	gio.Emit(map[string]interface{}{"Hello": row})
	return nil
}
func main() {
	gio.Init()
	funky1()
}

// This test indicates that deserialisation of map[string] is broken
func funky1() {
	flow.New().
		Strings([]string{"World"}).
		Mapper(ToDict).
		Map("function(row) return row end"). // If line is commented out everything works
		Fprintf(os.Stdout, "=> %s\n").
		Run()
}

// This test indicates that the issue is not with Lua code
func funky2() {
	flow.New().
		Strings([]string{"World"}).
		Mapper(ToDict).
		GroupBy(flow.Field(1)). // If line is commented out everything works
		Fprintf(os.Stdout, "=> %s\n").
		Run()
}

// This test shows that the issue is not related to Go serialization
func funky3() {
	flow.New().
		Strings([]string{"World"}).
		Map(`
		function(row)
			d = {}
			d["foo"] = 42
			return d
		end`).
		Fprintf(os.Stdout, "=> %s\n").
		Run()
}

When I run it, I get the following panic

panic: runtime error: hash of unhashable type []uint8

goroutine 39 [running]:
panic(0x85c340, 0xc420342060)
	/usr/local/Cellar/go/1.7.5/libexec/src/runtime/panic.go:500 +0x1a1
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.decodeMap(0xc420348000, 0xc420294480, 0x810000c42006cb60, 0x0, 0xc42021b360)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode_map.go:69 +0x15d
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.(*Decoder).DecodeMap(0xc420348000, 0x0, 0x0, 0x0, 0xc4ffffffff)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode_map.go:179 +0x2e
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.(*Decoder).DecodeInterface(0xc420348000, 0x3b0000000032feb5, 0x8, 0xc42001cea0, 0xe18b60)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode.go:268 +0x21b
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.(*Decoder).interfaceValue(0xc420348000, 0x83c060, 0xc420342020, 0x194, 0xdf53a0, 0x83c060)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode.go:228 +0x2f
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.decodeInterfaceValue(0xc420348000, 0x83c060, 0xc420342020, 0x194, 0x83c060, 0xc420342020)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode_value.go:188 +0x78
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.(*Decoder).DecodeValue(0xc420348000, 0x83c060, 0xc420342020, 0x194, 0xc420342020, 0x194)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode.go:195 +0x8c
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.(*Decoder).decode(0xc420348000, 0x7d8380, 0xc420342020, 0x10, 0x83c060)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode.go:190 +0x1c7
code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack%2ev2.(*Decoder).Decode(0xc420348000, 0xc420051bb8, 0x1, 0x1, 0x0, 0x40)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/gopkg.in/vmihailenco/msgpack.v2/decode.go:72 +0x5b
code.uber.internal/infra/thecarddivision/vendor/github.com/chrislusf/gleam/util.DecodeRow(0xc420342010, 0x10, 0x10, 0xc4202133e8, 0x106f4b0, 0xc42023fd10, 0x3fe0b, 0x1000)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/github.com/chrislusf/gleam/util/row_read_write.go:54 +0x27a
code.uber.internal/infra/thecarddivision/vendor/github.com/chrislusf/gleam/util.Fprintf.func1(0xc420342010, 0x10, 0x10, 0x10, 0x10)
	/Users/jrb/gocode/src/code.uber.internal/infra/thecarddivision/vendor/github.com/chrislusf/gleam/util/printf.go:32 +0x7e

RFC: removing Lua support?

How about we remove Lua support from Gleam?

This is because of:

  1. Complexity to install Gleam. Although installation is fairly easy, installing Lua and the message pack library are still extra steps.

  2. Complexity to support Lua. To support a scripting language, I need to support serialization and deserialization between MsgPack and Lua in memory data row, meta data, etc. This makes adding new features, such as windowing, watermark, etc, more complicated.

  3. Lua has several flavors. Impossible to support and test all of them. Currently it is already limited to only LuaJit. The pure Go interpreters for Lua do not work due to subtle details. And they are yet another flavor of Lua.

  4. Lua's nil in the array is causing truncated rows.

  5. Lua treat both []byte and string as string. Losing type information during SerDe.

If removing Lua,

  1. Conceptually much simpler to add new Gleam features. Being an open source project, this is very important to move this project forward.

  2. Conceptually much simpler to deploy.

  3. No need for Go developers to learn Lua. Lua is simple, but hidden pitfalls are not obvious to beginners.

  4. One way to write Gleam code. Multiple ways to do one thing is not Go style.

Reduce does not seem to work with nil

The following test reproduces the issue:

func TestLuaReduceByWithNil(t *testing.T) {
	testLuaScript(
		"test ReduceBy with nil",
		func(script Script) {
			script.ReduceBy(`
				function(x, y, a, b)
					return a, b
				end
			`, []int{1})
		},
		func(inputWriter io.Writer) {
			util.WriteRow(inputWriter, "key1", 100, nil)
			util.WriteRow(inputWriter, "key1", 101, 3)
		},
		func(outputReader io.Reader) {
			row, _ := util.ReadRow(outputReader)
			t.Logf("row1: %+v", row)
			if !(row[1].(uint64) == 101 && row[2].(uint64) == 3) {
				t.Errorf("failed ReduceBy results: [%s %d %d]", row...)
			}
		},
	)
}

If the nil is replaced with a number, then test works.

runner to executor grpc connection issue

I quite frequently got the following kind of error messages in one environment and not the other:

runner heartbeat to [::]:16198: runner => executor [::]:16198: rpc error: code = Unavailable desc = grpc: the connection is unavailable
...
runner reportStatus to [::]:44223: runner => executor [::]:44223: rpc error: code = Unavailable desc = grpc: the connection is unavailable

As I look into the code, I found that the grpc connection is closed within 50 ms without knowing if the function has finished its job with the grpc connection. I presume this could be the cause. Would it be safer to call the grpcConection.Close() after the return of fn(client) execution?

gio/runner_grpc_client_to_executor.go:

func withClient(server string, fn func(client pb.GleamExecutorClient) error) error {
	if server == "" {
		return nil
	}

	grpcConection, err := grpc.Dial(server, grpc.WithInsecure())
	if err != nil {
		return fmt.Errorf("executor dial agent: %v", err)
	}
	defer func() {
		time.Sleep(50 * time.Millisecond)
		grpcConection.Close()                  // The connection could possibly be closed prematurely
	}()
	client := pb.NewGleamExecutorClient(grpcConection)

	return fn(client)
}

sh: ./main: No such file or directory

I am trying to run a very simple flow and I get sh: ./main: No such file or directory. It seems that it cannot find the executable that is running.

Parallel Execution compile error

Parallel Execution

I did following the example in the Parallel Execution part and it did not work. There is no method found: flow.New().Lines????. Here is my code:

// word_count.go
package main

import (
    "log"
    "os"
    "path/filepath"

    "github.com/chrislusf/gleam/flow"
)

func main() {

    fileNames, err := filepath.Glob("~/txt/en/demo-*.txt")
    if err != nil {
        log.Fatal(err)
    }

    flow.New().Lines(fileNames).Partition(3).PipeAsArgs("cat $1").FlatMap(`
      function(line)
        return line:gmatch("%w+")
      end
    `).Map(`
      function(word)
        return word, 1
      end
    `).ReduceBy(`
      function(x, y)
        return x + y
      end
    `).Fprintf(os.Stdout, "%s\t%d").Run()

}

Many thanks

"Auto Failover" feature

If one or more of the agent server is down during the process, will the rest functioning agents pick up its left job ?

the demands about "Read Parquet files".

Hey chrislusf:
I talked with xitongsys who is the owner of parquet-go about gleam, he likes to help with "Read Parquet files" , he is rewriting it now, he wants to know the demands in the gleam, could you list some at here?

about gleam's RPC framework

Hi, Chris:

Did you consider about adopting the GRPC solution for gleam? Current gleam use raw protobuf with self-maintained dispatching/marshaling logic, I feel that the GRPC(http2+protobuf+rpc framework) will provide an evident assistance for gleam.

GPRC is a mature tech while my current project has involved in for more than one year~

Thanks!

Improve test coverge.

I would like to learn more about the project by getting started with contributing to the tests to improve the coverage. I believe that would a productive start to understand the project.

Reduce seems broken

In code snippet below, the input is a UUID and a ColumnKey and a version number. I use the reduce function to remove all old versions of (UUID, ColumnKey) rows.

The expected result is:

=> 10000000-c825-4d20-9b26-000000000128 FARE 8010
=> 10000000-c825-4d20-9b26-000000000178 BASE 6254

The actual result (on 02c6bef) is:

=> 10000000-c825-4d20-9b26-000000000128 FARE 8010
=> 10000000-c825-4d20-9b26-000000000178 BASE 6254
=> 10000000-c825-4d20-9b26-000000000128 FARE 585

Here is the code:

package main

import (
	"os"

	"github.com/chrislusf/gleam/flow"
	"github.com/chrislusf/gleam/gio"
)

var (
	Largest = gio.RegisterReducer(largest)
)

func main() {
	gio.Init()
	flow.New().
		Strings([]string{
			"10000000-c825-4d20-9b26-000000000128 FARE 585",
			"10000000-c825-4d20-9b26-000000000178 BASE 6254",
			"10000000-c825-4d20-9b26-000000000128 FARE 8010",
		}).
		Map(`
		function(line) -- Function transforms space separated words string to row
			row = {}
			for elm in string.gmatch(line, "%S+") do
				table.insert(row, elm)
			end
			row[3] = tonumber(row[3])
			return unpack(row)
		end
		`).
		ReducerBy(Largest, flow.Field(1, 2)).
		// ReduceBy(`  -- This function does the same as Largest - with the same result
		// function (n1, n2)
		// 	if n1 > n2 then
		// 		return n1
		// 	end
		// 	return n2
		// end`, flow.Field(1, 2)).
		Fprintf(os.Stdout, "=> %s %s %d\n").
		Run()
}

// Returns the row that has the largest numnber
func largest(r1, r2 interface{}) (interface{}, error) {
	n1 := r1.(uint64)
	n2 := r2.(uint64)
	if n1 > n2 {
		return r1, nil
	}
	return r2, nil
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.