Code Monkey home page Code Monkey logo

gopipe's Introduction

GoPipe

A processing pipe-line (similar to logstash) written in Go

Quick-Start

There is a Makefile you can use to quickly build and start working on this project:

$ make help

Available targets:

  rdkafka         Build librdkafka locally
  gopipe          Build gopipe
  tests           Run tests (includes fmt and vet)
  show_coverage   Show coverage results

Architecture

Our goal is to define a pipeline where events are received and processed in a modular way. Events can contain:

  • Raw unformated bytes (Data["raw"])
  • String contents (Data["message"])
  • Decoded JSON value (Data as map[string]interface{})

We call our modules "Components" and these fall into three categories:

  • Input: Components that generate events and push them into a Go channel
  • Proc: Processing components that modify event data. They read and write events from and to Go channels
  • Output: there are just reading events and output in some form. They usually do not push them back into a Go channel, but instead discard them.

In essence, all components are the "same" (implement the same interface). The only difference is which channels are made available to them.

Whys and Hows?

  1. Well... I have done something similar in C++ for processing netflow packets and thought since Go is (really) fast and concurrent is a perfect match for such an application.

  2. Why not use something already out there: We could extend an existing framework however, this is a Go learning exercise to replicate the C++ code...

  3. How is that different? We focus on a systems perspective and we want this framework to be more network/data oriented rather than logs oriented:

    • Raw data handling - see the flowreplicator.json configuration.
    • Allows the user to write more logic, still based on config (if/else support)
    • (Experimental) Support tasks to feed and update processing module's data
  4. What are the future plans: We plan to maintain and extend this until we fully port our C++ code... Maintenance will continue but we kinda hope we will extend as needed with the help of the community.

Components

Inputs

  • TCP: Supporting raw, string, CSV and JSON
  • UDP: Supporting raw, string, CSV and JSON
  • Kafka: Supporting raw, string, CSV and JSON

Processing

  • Add field: Add a new field based on static value or expression
  • Add time: Adds timestamp to the data
  • Cast: Converts fields to different data types
  • Drop field: Removes fields from data
  • If/Else: Control flow with if/else/endif
  • In List: Checks a field against a list of values
  • Log: Logs the events' data to stdout
  • Longest Prefix Match: Performs LPM and attaches meta-data to the events' data
  • MD5: Hash event's fields
  • Regex: Convert string events into data ones
  • Sampler: Selectively forward events (one every X)

Output

  • File: Supporting CSV and JSON
  • Null: Blackholes events
  • UDP: Supporting raw and string

Example Configs

UDP FlowReplicator

Will replicate and optionally sample UDP packtes:

{
    "main": {
        "num_cpus": 2,
        "log_level": 1,
        "channel_size": 50000,
        "stats_every": 100000
    },
    "in": {
        "module": "UDPRawInput",
        "listen": "0.0.0.0",
        "port": 9090
    },
    "proc": [
        {
            "module": "SamplerProc",
            "every": 2
        },
        {
            "module": "UDPRawOutput",
            "target": "127.0.0.1",
            "port": 9091
        }
    ],
    "out": {
        "module": "UDPRawOutput",
        "target": "127.0.0.1",
        "port": 9092
    }
}

Format log lines + if/else demo

Receives lines over TCP, parses them into data fields, adds timestamp, converts some data to different data-types, discards the original message, hashes some fields, etc

{
    "main": {
        "num_cpus": 2,
        "log_level": 1,
        "channel_size": 50000,
        "stats_every": 100000
    },
    "in": {
        "module": "TCPStrInput",
        "listen": "0.0.0.0",
        "port": 9092,
        "headers": ["hello", "test", "src"],
        "separator": ",",
        "convert": false
    },
    "proc": [
        {
            "module": "RegexProc",
            "regexes": [
                "(?mi)(?P<host>[0-9a-z]+) (?P<port>[0-9]+): (?P<hostEvent>.*)"
            ]
        },
        {
            "module": "DropFieldProc",
            "field_name": "message"
        },
        {
            "module": "CastProc",
            "fields": ["port"],
            "types": ["int"]
        },
        {
            "module": "InListProc",
            "in_field": "port",
            "out_field": "port_block",
            "reload_minutes": 100000000,
            "list": ["8080", "443", "23230", "14572", "17018"]
        },
        {"module": "if",  "condition": "port_block == true "},
            {
                "module": "Md5Proc",
                "in_fields": ["host"],
                "out_fields": ["host_hash"],
                "salt": "andPepper!"
            },
        {"module": "else"},
            {
                "module": "AddTimeProc",
                "field_name": "_timestamp"
            },
        {"module": "endif"},
        {
            "module": "LogProc",
            "level": "info"
        }
    ],
    "out": {
        "module": "FileJSONOutput",
        "rotate_seconds": 60,
        "folder": "/tmp",
        "file_name_format": "gopipe-20060102-150405.json"
    }
}

Longest Prefix Match

Receive on a TCP socket listening for JSON line:

{
    "main": {
        "num_cpus": 2,
        "log_level": 1,
        "channel_size": 50000,
        "stats_every": 10000000
    },
    "in": {
        "module": "TCPJSONInput",
        "listen": "0.0.0.0",
        "port": 9092
    },
    "proc": [
        {
            "module": "AddTimeProc",
            "field_name": "_timestamp"
        },
        {
            "module": "LPMProc",
            "filepath": "/tmp/prefix-asn.txt",
            "reload_minutes": 1440,
            "in_fields": ["src", "dst"],
            "out_fields": [
                {"newkey": "sky_{{in_field}}_prefix", "metakey": "prefix"},
                {"newkey": "sky_{{in_field}}_asn", "metakey": "asn"}
            ]
        }
    ],
    "out": {
        "module": "FileJSONOutput",
        "rotate_seconds": 60,
        "folder": "/tmp",
        "file_name_format": "gopipe-20060102-150405.json"
    }
}

Tasks

The following config part defines a task that runs every 10 seconds. Usually you would like to update file sources for InListProc and LPMProc components... In such cases the idea is that you have a small shell-script somewhere in your system that will update your local files. Then you need to "invoke" a reload to load the new data in memory:

...
    ],
    "out": {
        "module": "FileJSONOutput",
        "rotate_seconds": 60,
        "folder": "/tmp",
        "file_name_format": "gopipe-20060102-150405.json"
    },
    "tasks": [
        {
            "name": "LSing...",
            "command": ["ls", "-al"],
            "interval_seconds": 10,
            "signals": [
                {"mod": 4, "signal": "reload"}
            ]
        },
        {
            "name": "LSing...2",
            "command": ["ls", "-al"],
            "interval_seconds": 10,
            "signals": []
        }
    ]
...

Above we define two tasks. The difference between them is that the first one will signal a component if it runs successfully. The signal "reload" is going to be sent to component 4 and is up to the component to handle it.

The component index is defined as the order of this component in config including input components. Given that at the moment we only support one input, component 4 above is the 3rd in proc section.

Limitations

  • Only one input is supported at the moment but this might change
  • A bit immature framework :) we need more components
  • JSON: Decoding with UseNumber() is needed for correct output, however, it breaks govaluate so when comparing you have to use json_to_float64(). See TestIf for example...

Developers

Hello! The idea is that we can provide JSON-configurable pipeline processing capability for the end user. However, we do need more components for various jobs and maybe codecs!

  • Components should be extremely easy to implement. Use proc/log.go as a starting point (~60 LOC) to implement you component.

  • Codecs: Have a quick look into linecodecs.go. One can easily implement new line encoders/decoders. These can then be plugged into input/output modules

Not sure with what to help? have a look at TODO.md As always, comments, suggestions, documentation, bug reports, etc are more than welcome :)

gopipe's People

Contributors

ijanki avatar ijanki-sky avatar urban-1 avatar

Watchers

 avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.