Code Monkey home page Code Monkey logo

ekuiper's Introduction

LF Edge eKuiper - An edge lightweight IoT data analytics software

GitHub Release Docker Pulls codecov Go Report Card Slack Twitter Community YouTube

English | 简体中文

Overview

LF Edge eKuiper is a lightweight IoT data analytics and stream processing engine running on resource-constraint edge devices. The major goal for eKuiper is to provide a streaming software framework (similar to Apache Flink) in edge side. eKuiper's rule engine allows user to provide either SQL based or graph based (similar to Node-RED) rules to create IoT edge analytics applications within few minutes.

arch

User scenarios

It can be run at various IoT edge user scenarios, such as,

  • Real-time processing of production line data in the IIoT
  • Gateway of connected vehicle analyze the data from CAN in IoV
  • Real-time analysis of wind turbines and smart bulk energy storage data in smart energy

eKuiper processing at the edge can greatly reduce system response latency, save network bandwidth and storage costs and improve system security.

Features

  • Lightweight

    • Core server package is only about 4.5M, memory footprint is about 10MB
  • Cross-platform

    • CPU Arch:X86 AMD * 32/64; ARM * 32/64; PPC
    • Popular Linux distributions, OpenWrt Linux, MacOS and Docker
    • Industrial PC, Raspberry Pi, industrial gateway, home gateway, MEC edge cloud server
  • Data analysis support

    • Support data ETL
    • Data order, group, aggregation and join with different data sources (the data from databases and files)
    • 60+ functions, includes mathematical, string, aggregate and hash etc
    • 4 time windows & count window
  • Highly extensible

    It supports to extend at Source, Functions and Sink with Golang or Python.

    • Source: allows users to add more data source for analytics.
    • Sink: allows users to send analysis result to different customized systems.
    • UDF functions: allow users to add customized functions for data analysis (for example, AI/ML function invocation)
  • Management

  • Integration with EMQX products

    Seamless integration with EMQX, Neuron & NanoMQ, and provided an end-to-end solution from IIoT, IoV

Quick start

Community

Join our Slack, and then join ekuiper or ekuiper-user channel.

Meeting

Subscribe to community events calendar.

Weekly community meeting at Friday 10:30AM GMT+8:

Contributing

Thank you for your contribution! Please refer to the CONTRIBUTING.md for more information.

Performance test result

MQTT throughput test

  • Using JMeter MQTT plugin to send IoT data to EMQX Broker, such as: {"temperature": 10, "humidity" : 90}, the value of temperature and humidity are random integer between 0 - 100.
  • eKuiper subscribe from EMQX Broker, and analyze data with SQL: SELECT * FROM demo WHERE temperature > 50
  • The analysis result are wrote to local file by using file sink plugin.
Devices Message # per second CPU usage Memory usage
Raspberry Pi 3B+ 12k sys+user: 70% 20M
AWS t2.micro( 1 Core * 1 GB)
Ubuntu18.04
10k sys+user: 25% 20M

EdgeX throughput test

  • A Go application is written to send data to ZeroMQ message bus, the data is as following.

    {
      "Device": "demo", "Created": 000, …
      "readings": 
      [
         {"Name": "Temperature", value: "30", "Created":123 …},
         {"Name": "Humidity", value: "20", "Created":456 …}
      ]
    }
    
  • eKuiper subscribe from EdgeX ZeroMQ message bus, and analyze data with SQL: SELECT * FROM demo WHERE temperature > 50. 90% of data will be filtered by the rule.

  • The analysis result are sent to nop sink, so all the result data will be ignored.

Message # per second CPU usage Memory usage
AWS t2.micro( 1 Core * 1 GB)
Ubuntu18.04
11.4 k sys+user: 75% 32M

Max number of rules support

  • 8000 rules with 800 message/second in total
  • Configurations
    • 2 core * 4GB memory in AWS
    • Ubuntu
  • Resource usage
    • Memory: 89% ~ 72%
    • CPU: 25%
    • 400KB - 500KB / rule
  • Rule
    • Source: MQTT
    • SQL: SELECT temperature FROM source WHERE temperature > 20 (90% data are filtered)
    • Sink: Log

Multiple rules with shared source instance

  • 300 rules with a shared MQTT stream instance.
    • 500 messages/second in the MQTT source
    • 150,000 message processing per second in total
  • Configurations:
    • 2 Core * 2GB memory in AWS
    • Ubuntu
  • Resource usage
    • Memory: 95MB
    • CPU: 50%
  • Rule
    • Source: MQTT
    • SQL: SELECT temperature FROM source WHERE temperature > 20, (90% data are filtered)
    • Sink: 90% nop and 10% MQTT

To run the benchmark by yourself, please check the instruction.

Documents

Check out the latest document in official website.

Build from source

Preparation

  • Go version >= 1.22

Compile

  • Binary:

    • Binary: $ make

    • Binary files that support EdgeX: $ make build_with_edgex

    • Minimal binary file with core runtime only: $ make build_core

  • Packages: $ make pkg

    • Packages: $ make pkg

    • Package files that support EdgeX: $ make pkg_with_edgex

  • Docker images: $ make docker

    Docker images support EdgeX by default

Prebuilt binaries are provided in the release assets. If using os or arch which does not have prebuilt binaries, please use cross-compilation, refer to this doc.

During compilation, features can be selected through go build tags so that users can build a customized product with only the desired feature set to reduce binary size. This is critical when the target deployment environment has resource constraint. Please refer to features for more detail.

Open source license

Apache 2.0

ekuiper's People

Contributors

carlclone avatar dependabot[bot] avatar emqmyd avatar hantmac avatar huskar-t avatar jerempy avatar jinfahua avatar jrtitus avatar l-607 avatar lenalenapan avatar monicaisher avatar ngjaying avatar oucb avatar red-asuka avatar retoool avatar rnhx avatar rory-z avatar rui-gan avatar rwadowski avatar soyoo avatar superrxan avatar swilder-m avatar wangxye avatar wivwiv avatar xjasonlyu avatar xuyukeviki avatar yai-dev avatar yisaer avatar yongxingma avatar ysfscream avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ekuiper's Issues

Sink cache concurrent access problem large scale and long running rules

During testing large scale rules with 8000 rules running and sending 1 message per 10 second per rule for about 2 hours, Kuiper panics for concurrent sink cache save with the below message:

goroutine 53 [running]:
runtime.throw(0xd4b8a4, 0x21)
/usr/local/go/src/runtime/panic.go:774 +0x72 fp=0xc0b33c3388 sp=0xc0b33c3358 pc=0x6af7d2
runtime.mapaccess2(0xc70b80, 0xc02e178cf0, 0xc0a28babc0, 0xc0a28babc0, 0xc0b33c3470)
/usr/local/go/src/runtime/map.go:470 +0x278 fp=0xc0b33c33d0 sp=0xc0b33c3388 pc=0x68ef48
reflect.mapaccess(0xc70b80, 0xc02e178cf0, 0xc0a28babc0, 0xd4378c)
/usr/local/go/src/runtime/map.go:1319 +0x3f fp=0xc0b33c3408 sp=0xc0b33c33d0 pc=0x69125f
reflect.Value.MapIndex(0xc70b80, 0xc005fe3130, 0x195, 0xc4e5e0, 0xc0a28babc0, 0x82, 0x7fe8d0b54fff, 0x400, 0x7fe8d0a85d00)
/usr/local/go/src/reflect/value.go:1176 +0x16d fp=0xc0b33c3480 sp=0xc0b33c3408 pc=0x7132cd
encoding/gob.(*Encoder).encodeMap(0xc0b0ed4be0, 0xc0bd8a2cc0, 0xc70b80, 0xc005fe3130, 0x195, 0xd65260, 0xd65268, 0x0, 0x0)
/usr/local/go/src/encoding/gob/encode.go:375 +0x1e8 fp=0xc0b33c3520 sp=0xc0b33c3480 pc=0x9d6e18
encoding/gob.encOpFor.func3(0xc00016a0c0, 0xc093721bc0, 0xc70b80, 0xc005fe3130, 0x195)
/usr/local/go/src/encoding/gob/encode.go:571 +0xa7 fp=0xc0b33c35a0 sp=0xc0b33c3520 pc=0x9e1d87
encoding/gob.(*Encoder).encodeStruct(0xc0b0ed4be0, 0xc0bd8a2cc0, 0xc049e5f320, 0xcb1aa0, 0xc005fe3130, 0x199)
/usr/local/go/src/encoding/gob/encode.go:328 +0x268 fp=0xc0b33c3678 sp=0xc0b33c35a0 pc=0x9d6508
encoding/gob.(*Encoder).encode(0xc0b0ed4be0, 0xc0bd8a2cc0, 0xca84a0, 0xc005fe3130, 0x16, 0xc000145e80)
/usr/local/go/src/encoding/gob/encode.go:701 +0x16f fp=0xc0b33c3720 sp=0xc0b33c3678 pc=0x9da09f
encoding/gob.(*Encoder).encodeInterface(0xc0b0ed4be0, 0xc0b0ed4c18, 0xc76040, 0xc0b6620400, 0x94)
/usr/local/go/src/encoding/gob/encode.go:419 +0x537 fp=0xc0b33c3860 sp=0xc0b33c3720 pc=0x9d7497
encoding/gob.encOpFor.func5(0xc000154540, 0xc093721b40, 0xc76040, 0xc0b6620400, 0x94)
/usr/local/go/src/encoding/gob/encode.go:589 +0x6a fp=0xc0b33c38a0 sp=0xc0b33c3860 pc=0x9e209a
encoding/gob.(*Encoder).encodeStruct(0xc0b0ed4be0, 0xc0b0ed4c18, 0xc0000974e0, 0xcbec00, 0xc0b6620400, 0x99)
/usr/local/go/src/encoding/gob/encode.go:328 +0x268 fp=0xc0b33c3978 sp=0xc0b33c38a0 pc=0x9d6508
encoding/gob.encOpFor.func4(0x0, 0xc093721b00, 0xcbec00, 0xc0b6620400, 0x99)
/usr/local/go/src/encoding/gob/encode.go:581 +0xac fp=0xc0b33c39d0 sp=0xc0b33c3978 pc=0x9e1fac
encoding/gob.encodeReflectValue(0xc093721b00, 0xcbec00, 0xc0b6620400, 0x99, 0xc000130590, 0x0)
/usr/local/go/src/encoding/gob/encode.go:363 +0xcf fp=0xc0b33c3a50 sp=0xc0b33c39d0 pc=0x9d6b5f
encoding/gob.(*Encoder).encodeMap(0xc0b0ed4be0, 0xc0b0ed4c18, 0xc70a00, 0xc000145e08, 0x195, 0xd65280, 0xc000130590, 0x0, 0x0)
/usr/local/go/src/encoding/gob/encode.go:375 +0x22e fp=0xc0b33c3af0 sp=0xc0b33c3a50 pc=0x9d6e5e
encoding/gob.encOpFor.func3(0xc0000fddd0, 0xc093721ac0, 0xc70a00, 0xc000145e08, 0x195)
/usr/local/go/src/encoding/gob/encode.go:571 +0xa7 fp=0xc0b33c3b70 sp=0xc0b33c3af0 pc=0x9e1d87
encoding/gob.(*Encoder).encodeSingle(0xc0b0ed4be0, 0xc0b0ed4c18, 0xc0000974c0, 0xc70a00, 0xc000145e08, 0x195)
/usr/local/go/src/encoding/gob/encode.go:301 +0x1c5 fp=0xc0b33c3c08 sp=0xc0b33c3b70 pc=0x9d61b5
encoding/gob.(*Encoder).encode(0xc0b0ed4be0, 0xc0b0ed4c18, 0xc2e820, 0xc000145e08, 0x16, 0xc000144180)
/usr/local/go/src/encoding/gob/encode.go:703 +0x1b7 fp=0xc0b33c3cb0 sp=0xc0b33c3c08 pc=0x9da0e7
encoding/gob.(*Encoder).EncodeValue(0xc0b0ed4be0, 0xc2e820, 0xc000145e08, 0x16, 0x0, 0x0)
/usr/local/go/src/encoding/gob/encoder.go:251 +0x3ce fp=0xc0b33c3d88 sp=0xc0b33c3cb0 pc=0x9db77e
encoding/gob.(*Encoder).Encode(0xc0b0ed4be0, 0xc2e820, 0xc000145e08, 0xc78740, 0xc0b659bec0)
/usr/local/go/src/encoding/gob/encoder.go:176 +0xa4 fp=0xc0b33c3dd8 sp=0xc0b33c3d88 pc=0x9db194
github.com/patrickmn/go-cache.(*cache).Save(0xc000145e00, 0xe0bc00, 0xc0b1be4f48, 0x0, 0x0)
/home/ubuntu/go/pkg/mod/github.com/patrickmn/[email protected]+incompatible/cache.go:975 +0x1aa fp=0xc0b33c3ee8 sp=0xc0b33c3dd8 pc=0x9ecf9a
github.com/patrickmn/go-cache.(*cache).SaveFile(0xc000145e00, 0xc0000790e0, 0x5c, 0x0, 0x0)
/home/ubuntu/go/pkg/mod/github.com/patrickmn/[email protected]+incompatible/cache.go:989 +0x86 fp=0xc0b33c3f48 sp=0xc0b33c3ee8 pc=0x9ed0b6
github.com/emqx/kuiper/common.(*SimpleKVStore).run(0xc000145e40)
/home/ubuntu/kuiper/common/kv.go:111 +0xe2 fp=0xc0b33c3fd8 sp=0xc0b33c3f48 pc=0xa21272
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1357 +0x1 fp=0xc0b33c3fe0 sp=0xc0b33c3fd8 pc=0x6dccb1
created by github.com/emqx/kuiper/common.NewSimpleKVStore
/home/ubuntu/kuiper/common/kv.go:74 +0xd3

Several issues of rule metrics

  • Remove "kuiper_" in all of metric names, since all of them are metrics of Kuiper
  • Can metrics print with the order that processed in topology? Such as source > pre-processor > operators > sinks
  • Change the order in a specified metric?
    • in_total, out_total, exceptions_total, latency_ms, last_invocation
  • Change the time format
    • "kuiper_op_filter_0_last_invocation": "0001-01-01 00:00:00 +0000 UTC",. If no message are processed, can we just set the value to an empty string?
    • "kuiper_op_filter_0_last_invocation": "2019-12-25 13:36:50.656026 +0800 CST m=+2030.910846243". I would suggest to remove +0800 CST m=+2030.910846243, and just keep 2019-12-25 13:36:50.656026.

Build Linux package problem

GOOS=linux GOARCH=arm make pkg
                       Ultimate Packer for eXecutables
                          Copyright (C) 1996 - 2018
UPX 3.95        Markus Oberhumer, Laszlo Molnar & John Reiser   Aug 26th 2018

        File size         Ratio      Format      Name
   --------------------   ------   -----------   -----------
   7151768 ->   2870752   40.14%    linux/arm    cli

Packed 1 file.
                       Ultimate Packer for eXecutables
                          Copyright (C) 1996 - 2018
UPX 3.95        Markus Oberhumer, Laszlo Molnar & John Reiser   Aug 26th 2018

        File size         Ratio      Format      Name
   --------------------   ------   -----------   -----------
   8663112 ->   3474404   40.11%    linux/arm    server

Packed 1 file.
Build successfully
mv: rename engine.zip to ../_packages/engine.zip: No such file or directory
mv: rename engine.tar.gz to ../_packages/engine.tar.gz: No such file or directory
make: *** [pkg] Error 1

An automatic approach to build the kuiper releases

Need an automatic approach to create following kuiper releases.

engine_x.x.x.darwin-amd64.tar.gz - For Mac
engine_x.x.x.darwin-amd64.zip - For Mac

engine_x.x.x.linux-amd64.tar.gz - For Linux 64
engine_x.x.x.linux-amd64.zip - For Linux 64

engine_x.x.x.linux-386.tar.gz - For Linux 32
engine_x.x.x.linux-386.zip - For Linux 32

Add a web sink

A web sink for sending the result data to remote address.

Change EdgeX plugins as internal shipped sources

To support the Windows developers and runtime env, move the EdgeX plugins as internal shipped sources. Several questions to be figured out.

  • The new approach will increase binary size too much?
  • Will it add extra setup steps for users that not use EdgeX?
  • Others?

The format of CLI & API services

  • All of the returned result of REST API should be JSON format
  • Keep the original output of CLI

Below is an example....

Use GET http://127.0.0.1:8080/rules/rule1/status to get the rule status, but the response is not pure json.

Running with metrics: should be removed.

Running with metrics:
{
  "source_demo_0_records_in_total": 0,
  "source_demo_0_records_out_total": 0,
  "source_demo_0_exceptions_total": 0,
  "source_demo_0_process_latency_ms": 0,
  "source_demo_0_buffer_length": 0,
  "source_demo_0_last_invocation": 0,
  "op_preprocessor_demo_0_records_in_total": 0,
  "op_preprocessor_demo_0_records_out_total": 0,
  "op_preprocessor_demo_0_exceptions_total": 0,
  "op_preprocessor_demo_0_process_latency_ms": 0,
  "op_preprocessor_demo_0_buffer_length": 0,
  "op_preprocessor_demo_0_last_invocation": 0,
  "op_project_0_records_in_total": 0,
  "op_project_0_records_out_total": 0,
  "op_project_0_exceptions_total": 0,
  "op_project_0_process_latency_ms": 0,
  "op_project_0_buffer_length": 0,
  "op_project_0_last_invocation": 0,
  "sink_sink_log_0_records_in_total": 0,
  "sink_sink_log_0_records_out_total": 0,
  "sink_sink_log_0_exceptions_total": 0,
  "sink_sink_log_0_process_latency_ms": 0,
  "sink_sink_log_0_buffer_length": 0,
  "sink_sink_log_0_last_invocation": 0,
  "sink_sink_mqtt_0_records_in_total": 0,
  "sink_sink_mqtt_0_records_out_total": 0,
  "sink_sink_mqtt_0_exceptions_total": 0,
  "sink_sink_mqtt_0_process_latency_ms": 0,
  "sink_sink_mqtt_0_buffer_length": 0,
  "sink_sink_mqtt_0_last_invocation": 0
}

Schemaless support - Parser

Support to create Schemaless schema.

bin/cli create stream demo '() WITH (FORMAT="JSON", DATASOURCE="demo" TYPE="edgex")'

Data template usage for Rest sinks can be improved?

{
  "id": "rule1",
  "sql": "select rand() as rkey,randomnumber from demo where randomnumber > 80",
  "actions": [
      {
      "rest": {
        "url": "http://127.0.0.1:48060/api/v1/notification",
        "method": "post",
        "dataTemplate": "{\"slug\": \"notice-test-{{.rkey}}\", \"sender\": \"System Management\", \"category\": \"SECURITY\", \"severity\": \"CRITICAL\", \"content\": \"Number is too high: {{.randomnumber}}\",  \"labels\": [ \"cool\", \"test\" ] }",
        "sendSingle": true
        }
      }
    ]
}

Currently, if there are " in the data template, then it need to be escaped, can we improve this?

Kuiper server quits when a rule is drop

It quit with following error stack trace.

bin/server
Serving Kuiper server on port 20498
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x28 pc=0x146bb2c]

goroutine 60 [running]:
engine/xstream/extensions.(*MQTTSource).Close(0xc000103880, 0x164d240, 0xc0001326c0, 0xc005528f38, 0x1)
	/Users/rockyjin/Downloads/workspace/edge/src/kuiper/xstream/extensions/mqtt_source.go:150 +0x8c
engine/xstream/nodes.(*SourceNode).Open.func1(0xc00002bc40, 0x164d240, 0xc0001326c0, 0xc0055380e0, 0xc00a8bc000)
	/Users/rockyjin/Downloads/workspace/edge/src/kuiper/xstream/nodes/source_node.go:45 +0x1a2
created by engine/xstream/nodes.(*SourceNode).Open
	/Users/rockyjin/Downloads/workspace/edge/src/kuiper/xstream/nodes/source_node.go:28 +0x11c

Prometheus issues

  1. Some of operators values are not exported to Prometheus?
    image

The command of bin/cli getstatus rule rule1 listed as in below.

{
  "source_demo_0_records_in_total": 0,
  "source_demo_0_records_out_total": 0,
  "source_demo_0_exceptions_total": 0,
  "source_demo_0_process_latency_ms": 0,
  "source_demo_0_buffer_length": 0,
  "op_preprocessor_demo_0_records_in_total": 0,
  "op_preprocessor_demo_0_records_out_total": 0,
  "op_preprocessor_demo_0_exceptions_total": 0,
  "op_preprocessor_demo_0_process_latency_ms": 0,
  "op_preprocessor_demo_0_buffer_length": 0,
  "op_window_0_records_in_total": 0,
  "op_window_0_records_out_total": 0,
  "op_window_0_exceptions_total": 0,
  "op_window_0_process_latency_ms": 0,
  "op_window_0_buffer_length": 0,
  "op_filter_0_records_in_total": 0,
  "op_filter_0_records_out_total": 0,
  "op_filter_0_exceptions_total": 0,
  "op_filter_0_process_latency_ms": 0,
  "op_filter_0_buffer_length": 0,
  "op_project_0_records_in_total": 0,
  "op_project_0_records_out_total": 0,
  "op_project_0_exceptions_total": 0,
  "op_project_0_process_latency_ms": 0,
  "op_project_0_buffer_length": 0,
  "sink_sink_rest_0_records_in_total": 0,
  "sink_sink_rest_0_records_out_total": 0,
  "sink_sink_rest_0_exceptions_total": 0,
  "sink_sink_rest_0_process_latency_ms": 0,
  "sink_sink_rest_0_buffer_length": 0
}
  1. kuiper_ prefix should be added back, otherwise the metrics cannot be easily found in Prometheus console.

  2. Prometheus metrics collection should be disabled by default for two reasons?

    • Reduce the memory footprint.
    • Maybe the feature is not necessary for all users.

Investigate automatic FVT framework

Create an end-2-end automatic framework for functional testing.

  • Jenkins pipeline
  • End-2-end user scenarios
  • General Linux server & Raspberry Pi env

Stream definition is not updated with rule restarted

  1. Create a stream with below schema.
{
"sql" : "create stream demo (temperature float, humidity bigint) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\" )"
}
  1. Create a rule that referenced a field that does not existed in stream schema, such as in below. The light field does not existed in previous stream definition.
{
  "id": "rule1",
  "sql": "SELECT light FROM demo",
  "actions": [
    {
      "mqtt": {
        "server": "tcp://${mqtt_srv}:1883",
        "topic": "devices/result",
        "qos": 1,
        "clientId": "demo_001"
      }
    }
  ]
}
  1. Drop & re-create a stream definition that includes new light definition as in below.
{
"sql" : "create stream demo (temperature float, humidity bigint, light bigint) WITH (FORMAT=\"JSON\", DATASOURCE=\"devices/+/messages\" )"
}
  1. Restart the rule, but the light value still cannot be fetch.

Rule status is not correct

  1. Create a stream demo
  2. Let's configure the source to a non-existed MQTT broker in source, and then create a rule that use demo. The log displays rule is closing. But if type the command ./cli getstatus rule rule_name, it prints running.
  3. Change to the correct MQTT broker address, and restart the rule. It seems still failed to start the rule.

Not Authorized from remote server

I have filled the username and password into the mqtt_source.yaml, buf everytime it said to me the result of query is "Not Authorized" when I query from remote server.

GROUP BY issue

Build the binary from branch agg_func_validator.

  1. Stream definition
bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
  1. Then debug follow SQL in bin/cli query command.
SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 5);
  1. Send following data into MQTT topic. The 1st column is device id, the 2nd column is temperature, the 3rd column is humidity.
1,20,30
2,31,40
1,35,50
2,20,30
1,80,90
2,45,20
1,10,90
2,12,30
1,65,35
2,55,32
  1. The output of the query is not constant, but it looks like the calculation is not correct. With below result, the 1st row of device_id should be "2", instead of "1".
xstream > [{"device_id":"1","t_av":26.5,"t_count":4,"t_max":35,"t_min":20}]
[{"device_id":"1","t_av":42.4,"t_count":5,"t_max":80,"t_min":10}]
[{"device_id":"2","t_av":55,"t_count":1,"t_max":55,"t_min":55}]

Specify protocols in docker run parameters

Currently, no protocols are required for running dockers. With this approach, user cannot change to SSL connections.

docker run -d --name kuiper -e MQTT_BROKER_ADDRESS=broker.emqx.io:1883 emqx/kuiper:latest

So Docker image need to let user to specify the protocols, such as in below.

docker run -d --name kuiper -e MQTT_BROKER_ADDRESS=tcp://broker.emqx.io:1883 emqx/kuiper:latest

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.