Code Monkey home page Code Monkey logo

amazon-kinesis-firehose-for-fluent-bit's Issues

Add an option to send multiple log events as a record

According to the Kinesis Firehose docs:

The PutRecordBatch operation can take up to 500 records per call or 4 MiB per call, whichever is smaller. This limit cannot be changed.

The throughput limits for a stream can be measured in records per second or MB/s. The default limits for most regions are 1000 records per second and 1 MiB/s. What this means is that if you make a request with 500 records, and the total payload is less than 4MiB, you're not fully using the available throughput of the stream.

Right now, we send 1 log event per record- and in practice, log events are usually quite small. This means that in most cases the throughput of the stream is under-utilized.

This is because for some Firehose destinations- like ElasticSearch- each record needs to be an individual log event.

However, many will use Firehose to send to S3, and with S3, multiple log events can be sent in a single record if they are newline delimited.

Empty documents being sent to Firehose

It looks like empty payloads are being sent to Firehose and in turn to Elasticsearch, only the timestamp field is populated by fluentbit:

{
  "_index": "log-2021-08-26",
  "_type": "_doc",
  "_id": "49621369204884914652732002225806045706578374688214351874.3",
  "_score": 1,
  "_source": {
    "timestamp": "2021-08-26T09:37:02.608335"
  },
  "fields": {
    "timestamp": [
      "2021-08-26T09:37:02.608Z"
    ]
  }
}

has anyone experienced something similar?
Any idea how I could thoroughly debug it, event with FLB_LOG_LEVEL=debug can't see much

Remove exponential backoff code from all plugins

When I built the Firehose and CloudWatch plugins, I thought it would be a good idea for them to implement backoff in case of API throttling. That instinct was good, but the execution shows my lack of understanding of Fluent Bit at the time. Fluent Bit has it's own retry mechanisms; a plugin can return FLB_RETRY and let Fluent Bit handle it. More importantly, the backoff mechanism in use currently causes the plugin to sleep- since Fluent Bit is single-threaded, but concurrent, this sleep stops all other plugins, and thus impacts its overall performance.

In practice I suspect few users trigger throttling; however, this is a very easy thing to fix, we just need to remove a bit of code from all the plugins.

Output plugin 'firehose' cannot be loaded

Hi,

Can't seem to get the firehose output working. I used the docker hub image 1.2.2
Can someone have a look? Looks like an issue with the image to me.

Config:

fluent-bit.conf: |-
    [SERVICE]
        Flush          2
        Daemon         Off
        Config_Watch On
        Parsers_File   parsers.conf
        HTTP_Listen    0.0.0.0
        HTTP_Server On
        HTTP_Port      2020

    [INPUT]
        Name           tail
        Tag            kube.*
        Parser         docker
        Path           /var/log/containers/*.log
        DB             /var/log/flb_kube.db
        Mem_Buf_Limit  5MB

    [OUTPUT]
        Name   firehose
        Match  kube.*
        region eu-west-1
        delivery_stream spl-traefik-logs

Error:

log is DEPRECATED and will be removed in a future version. Use logs instead.
Output plugin 'firehose' cannot be loaded
Error: You must specify an output target. Aborting

Is this now deprecated in favour of the vanilla Firehose plugin?

I see that v1.6 of upstream/vanilla Fluent Bit now includes its own output plugin for writing to Firehose, and says this:

This is the documentation for the core Fluent Bit Firehose plugin written in C. It can replace the aws/amazon-kinesis-firehose-for-fluent-bit Golang Fluent Bit plugin released last year.

However, I don't see any corresponding deprecation notice here. So, to clarify:

  1. Is this Amazon-flavour Firehose plugin now deprecated?
  2. If not, for new users who want to use Fluent Bit to send logs from EKS to Firehose, should we use:
    a) The Amazon-flavour Fluent Bit Docker image (containing this Amazon-flavour Firehose plugin)
    OR
    b) The vanilla Fluent Bit Docker image (containing the new vanilla Firehose plugin)

Dots in keys cause failure in shipping to elasticsearch from firehose.

Skaffold adds skaffold specific labels to deployments when it is used to deploy e.g

app.kubernetes.io/managed-by=skaffold-v1.12.0

if another label already exists e.g.

app=app_name

then the firehost -> elasticsearch transfer fails due to an attempt to map twice, once as text, once as an object

{"attemptsMade":8,"arrivalTimestamp":1595704200773,"errorCode":"400","errorMessage":"{\"type\":\"mapper_parsing_exception\",\"reason\":\"Could not dynamically add mapping for field [app.kubernetes.io/managed-by]. Existing mapping for [kubernetes.labels.app] must be of type object but found text

The dots do not cause issues with Elasticsearch fluentbit plugin, as it has a Replace_Dots option

     [OUTPUT]
       Name            es
       Replace_Dots    On
...

but for firehose, this isn't the case and elasticsearch transfer fails with

This isn't a skaffold specific issue, but more the lack of the Replace_Dots functionality of the firehose plugin. Is there any plan to add a Replace_Dots type option?

The above problem is due to using the kubernetes filter


    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Merge_Log           On
        Merge_Log_Key       log_processed
        K8S-Logging.Parser  On

Exception with simple log delivery

Copyright (C) Treasure Data

time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter delivery_stream = 'niki-staging-logs'\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter region = 'ap-south-1'\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter data_keys = ''\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter role_arn = 'arn:aws:iam::xxx:role/niki-staging-fluentd'\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter endpoint = ''\n"
[2019/07/23 04:57:59] [ info] [storage] initializing...
[2019/07/23 04:57:59] [ info] [storage] in-memory
[2019/07/23 04:57:59] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2019/07/23 04:57:59] [ info] [engine] started (pid=1)
[2019/07/23 04:57:59] [ warn] [filter_kube] merge_json_log is deprecated, enabling 'merge_log' option instead
[2019/07/23 04:57:59] [ info] [filter_kube] https=1 host=kubernetes.default.svc port=443
[2019/07/23 04:57:59] [ info] [filter_kube] local POD info OK
[2019/07/23 04:57:59] [ info] [filter_kube] testing connectivity with API server...
[2019/07/23 04:57:59] [ info] [filter_kube] API server connectivity OK
[2019/07/23 04:57:59] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2019/07/23 04:57:59] [ info] [sp] stream processor started
time="2019-07-23T04:58:02Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"
time="2019-07-23T04:58:03Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"
time="2019-07-23T04:58:03Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"
time="2019-07-23T04:58:13Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"

Firehose destination settings

Amazon S3 destination
S3 bucket
niki-staging-logs
Prefixr aw/!{timestamp:yyyy/MM-dd}/
Error prefixerror/!{firehose:error-output-type}/!{timestamp:yyyy/MM-dd}/
Buffer conditions 30 MB or 600 seconds
CompressionDisabled
EncryptionDisabled

No record format conversion

Fluentbit conf

fluent-bit.conf: |
    [SERVICE]
        Flush         1
        Log_Level     info
        Daemon        off
        Parsers_File  parsers.conf
        HTTP_Server   On
        HTTP_Listen   0.0.0.0
        HTTP_Port     2020

    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-grep.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE output-firehose.conf
  input-kubernetes.conf: |
    [INPUT]
        Name              tail
        Tag               app.*
        Path              /var/log/containers/*_app_*.log
        Parser            docker
        DB                /var/log/flb_kube_ns.db
        Mem_Buf_Limit     30MB
        Skip_Long_Lines   On
        Refresh_Interval  10
  filter-grep.conf: |
    [FILTER]
        Name     grep
        Match    *
        Exclude  log /health
  filter-kubernetes.conf: |
    [FILTER]
        Name                kubernetes
        Match               app.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Regex_Parser        ns_k8s_parser
        Merge_JSON_Log      On
  output-firehose.conf: |
    [OUTPUT]
        Name firehose
        Match app.*
        region ap-south-1
        role_arn arn:aws:iam::xxx:role/niki-staging-fluentd
        delivery_stream niki-staging-logs
  parsers.conf: |
    [PARSER]
        Name        first_line
        Format      regex
        Regex       ^{"log":"(?!\\u0009)(?<log>\S(?:(\\")|[^"]){9}(?:(\\")|[^"])*)"

    [PARSER]
        Name        nested_json
        Format      json
        Time_Keep   true
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Decode_Field_As json log do_next
        Decode_Field_As escaped message

    [PARSER]
        Name        ns_k8s_parser
        Format      regex
        Regex       (?<tag>[^.]+)?\.?(?<pod_name>[^_]+)_(?<namespace_name>[^_]+)_(?<container_name>.+-(?<docker_id>[a-z0-9]{64}))\.log$

    [PARSER]
        Name        docker
        Format      json
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   On
        Decode_Field_As    json     log

What am I doing wrong?

timestamp missing with kinesis output

Bug Report

Describe the bug
I am running fluent-bit 1.2.2 with systemd input and AWS kinesis output. Systemd does not generate the timestamp field inside the log, but put it prior to it like this:

[0] host.unknown: [1565706755.786588000, {"TRANSPORT"=>"syslog", "PRIORITY"=>"6", "SYSLOG_FACILITY"=>"10", "SYSLOG_IDENTIFIER"=>"sshd", "UID"=>"1000", "GID"=>"1000", "BOOT_ID"=>"5601a829777c459e845852afc9bcbeaf", "MACHINE_ID"=>"e14407097f0e4283a492f816e4bf6e79", "HOSTNAME"=>"ip-172-16-5-69.us-east-2.compute.internal", "SYSLOG_PID"=>"11971", "PID"=>"11971", "MESSAGE"=>"Disconnected from 172.16.12.137 port 51494", "SOURCE_REALTIME_TIMESTAMP"=>"1565706755768498"}]

When sending such a message to es output, it works fine since the output adds the timestamp, but Kinesis output seems to strip anything outside of json, so log messages look like this:

{"BOOT_ID":"53fb93cfe3664a0180672a247a4826cd","CAP_EFFECTIVE":"3fffffffff","CMDLINE":"/sbin/auditd","COMM":"auditd","EXE":"/usr/sbin/auditd","GID":"0","HOSTNAME":"ip-172-16-14-74.us-east-2.compute.internal","MACHINE_ID":"e14407097f0e4283a492f816e4bf6e79","MESSAGE":"Audit daemon rotating log files with keep option","PID":"15321","PRIORITY":"5","SOURCE_REALTIME_TIMESTAMP":"1565700926388220","SYSLOG_FACILITY":"3","SYSLOG_IDENTIFIER":"auditd","SYSLOG_PID":"15321","SYSTEMD_CGROUP":"/system.slice/auditd.service","SYSTEMD_SLICE":"system.slice","SYSTEMD_UNIT":"auditd.service","TRANSPORT":"syslog","UID":"0"}

As a result, i dont have a date field in AWS Elasticsearch. Parsing log messages for timestamp is not an option, since they dont have a consistent place to extract it from.
Can anything be done about this?

Version used: 1.2.2 with Kinesis output plugin
Configuration:
` fluent-bit-service.conf: |-
[SERVICE]
Flush 5
Daemon Off
Log_Level info
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_PORT 2020
Parsers_File parsers.conf

fluent-bit-input.conf: |-
[INPUT]
Name systemd
Tag host.*
Read_From_Tail true
Strip_Underscores true

fluent-bit-filter.conf: |-
[FILTER]
Name modify
Match host.*
Add SYSTEMD_UNIT unspecified

fluent-bit-output.conf: |-
[OUTPUT]
Name firehose
Match *
region us-west-2
delivery_stream my-stream

Running in EKS.

Plugin is not allowing Retry for Throttled records

Today, if Firehose returns a throttling error plugin just discards the records and the log data is lost.

We have a fluent bit DaemonSet pushing data to the firehose (there are 4 instances (pod on each of the 4 nodes of k8s cluster) of the FluentBit, all 4 are sending the data to the same Firehose delivery stream. The limit we are hitting is 1MB/s data rate. Like most of the regions in Canada central AWS supports only 1MB/s. I tried the simple_aggregation configuration but it won't help much rather it would contribute to the problem of spikes.

Also, we don't have a lot of data. There are rare random spikes that cross the limit of 1 MB/s, for most of the part data is much less than 1MB/s. So increasing the Firehose limit just for these spikes doesn't seem reasonable.

There are two ways to handle it,

  1. Control the data ingestion into the Firehose
  2. Output plugin Retry to handle such random spikes.

For option 2,
The plugin should make a Retry to the fluent bit engine whenever the Throttle error is returned by the Firehose. Ref: https://docs.fluentbit.io/manual/administration/scheduling-and-retries

And there should also be a configuration parameter to control the retry time and number of attempts in case of error.

replace_dots option for high performance version

Hi team, I am planning to use this plugin in our production cluster. After going through the documentation, it looks like there is a new high performance version available, but it lacks the replace_dots option. I require that option to avoid some mapping conflicts with existing elasticsearch index. I would like to know when will the high performance version have the replace_dots option?

Thanks.

Handling duplicate records

When using aws-for-fluent-bit to send the records to Elasticsearch via Kinesis Firehose, I see duplicate records in Elasticsearch.

Every record appears twice.

Is this a known issue? Any work arounds for this? Blocked moving Firelens into Production with this.

No concurrency: Only opens single socket

We're using this plugin (firehose) to send data directly to our Kinesis Firehose VPC endpoint, but the throughput seems fairly low (seems to max out at around 5 MB/s). After doing a packet capture, it appears as if this plugin only opens a single TCP socket/connection to the Firehose -- is there a way to get it to open multiple connections simultaneously, to increase throughput?

We tried using the new 'multiple workers' option mentioned in the documentation for the core plugin (kinesis_firehose), but it didn't seem to have any effect when used with this plugin, presumably because this plugin doesn't support it. If that's the case, and that feature would solve our throughput issue, do you have an estimate on when this plugin will support that 'multiple workers' feature?

Error sending empty batch on shutdown

I see the following error for each configured usage of the kinesis firehose plugin whenever fluent-bit shuts down.

It looks like the plugin is trying to flush any remaining records and attempts to send an empty batch. Not sure if my configuration needs to be adjusted or perhaps some additional validation in the plugin is needed to ensure a batch is only sent if it contains at least one record.

[engine] caught signal (SIGTERM)
[2020/06/30 06:11:45] [ info] [input] pausing tail.0
[2020/06/30 06:11:45] [ info] [input] pausing tail.1
[2020/06/30 06:11:45] [ info] [input] pausing tail.2
[2020/06/30 06:11:45] [ info] [input] pausing tail.3
[2020/06/30 06:11:45] [ info] [input] pausing tail.4
[2020/06/30 06:11:45] [ info] [input] pausing tail.5
[2020/06/30 06:11:45] [ warn] [engine] service will stop in 5 seconds
[engine] caught signal (SIGTERM)
[2020/06/30 06:11:45] [ warn] [engine] service will stop in 5 seconds
[2020/06/30 06:11:50] [ info] [engine] service stopped
time="2020-06-30T06:11:50Z" level=error msg="[firehose 0] PutRecordBatch failed with InvalidParameter: 1 validation error(s) found.\n- minimum field size of 1, PutRecordBatchInput.Records.\n"
time="2020-06-30T06:11:50Z" level=error msg="[firehose 0] InvalidParameter: 1 validation error(s) found.\n- minimum field size of 1, PutRecordBatchInput.Records.\n\n"

I'm running aws-for-fluent-bit 2.3.1 which is bundled with Fluent Bit 1.4.5 and Amazon Kinesis Firehose for Fluent Bit 1.2.1.

Support sub-second time reporting

At present, the Time_Format parameter only supports strftime format strings, which do not provide a sub-second feature. This makes it impossible to properly order logs.

Fluent Bit supports %L, which provides nanoseconds. Is this a feasible addition to this Firelens plugin?

Records sent to stream arrive in bucket with all null values

I've been trying to configure Fluent Bit with the kinesis firehose output but records sent to the stream are arriving in the destination S3 bucket with all fields assigned a value of null.

Here's an example record:

{"event_id":"534682","endpoint":"getProfile","patronnum":"2586677490","player_id":"823525","httpcode":"","curlerror":"","returncode":"","returntext":"","body":"","points":"","reference_id":"","customerrequestid":"","created":"2021-03-16 19:09:21","request":""}

The firehose stream I'm sending to has JSON to Parquet conversion configured, although it doesn't seem that this is the issue. I've tested sending an exact copy of the above record to the stream from the same server, but using the following AWS CLI command, and it is arriving in the bucket unscathed:

aws --region us-east-1 firehose put-record --delivery-stream-name 'Stream_teststream_testtable_backendevents' --record 'Data="{"event_id":"534682","endpoint":"getProfile","patronnum":"2586677490","player_id":"823525","httpcode":"","curlerror":"","returncode":"","returntext":"","body":"","points":"","reference_id":"","customerrequestid":"","created":"2021-03-16 19:09:21","request":""}"'

I've looked at the FB log but I don't see any errors in there, definitely nothing related to this issue. FYI, this image shows how the records are ending up in the parquet files:

backendevent

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.