aws / amazon-kinesis-firehose-for-fluent-bit Goto Github PK
View Code? Open in Web Editor NEWA Fluent Bit output plugin for Amazon Kinesis Data Firehose
License: Apache License 2.0
A Fluent Bit output plugin for Amazon Kinesis Data Firehose
License: Apache License 2.0
According to the Kinesis Firehose docs:
The PutRecordBatch operation can take up to 500 records per call or 4 MiB per call, whichever is smaller. This limit cannot be changed.
The throughput limits for a stream can be measured in records per second or MB/s. The default limits for most regions are 1000 records per second and 1 MiB/s. What this means is that if you make a request with 500 records, and the total payload is less than 4MiB, you're not fully using the available throughput of the stream.
Right now, we send 1 log event per record- and in practice, log events are usually quite small. This means that in most cases the throughput of the stream is under-utilized.
This is because for some Firehose destinations- like ElasticSearch- each record needs to be an individual log event.
However, many will use Firehose to send to S3, and with S3, multiple log events can be sent in a single record if they are newline delimited.
It looks like empty payloads are being sent to Firehose and in turn to Elasticsearch, only the timestamp
field is populated by fluentbit
:
{
"_index": "log-2021-08-26",
"_type": "_doc",
"_id": "49621369204884914652732002225806045706578374688214351874.3",
"_score": 1,
"_source": {
"timestamp": "2021-08-26T09:37:02.608335"
},
"fields": {
"timestamp": [
"2021-08-26T09:37:02.608Z"
]
}
}
has anyone experienced something similar?
Any idea how I could thoroughly debug it, event with FLB_LOG_LEVEL=debug
can't see much
When I built the Firehose and CloudWatch plugins, I thought it would be a good idea for them to implement backoff in case of API throttling. That instinct was good, but the execution shows my lack of understanding of Fluent Bit at the time. Fluent Bit has it's own retry mechanisms; a plugin can return FLB_RETRY and let Fluent Bit handle it. More importantly, the backoff mechanism in use currently causes the plugin to sleep- since Fluent Bit is single-threaded, but concurrent, this sleep stops all other plugins, and thus impacts its overall performance.
In practice I suspect few users trigger throttling; however, this is a very easy thing to fix, we just need to remove a bit of code from all the plugins.
Hi,
Can't seem to get the firehose output working. I used the docker hub image 1.2.2
Can someone have a look? Looks like an issue with the image to me.
Config:
fluent-bit.conf: |-
[SERVICE]
Flush 2
Daemon Off
Config_Watch On
Parsers_File parsers.conf
HTTP_Listen 0.0.0.0
HTTP_Server On
HTTP_Port 2020
[INPUT]
Name tail
Tag kube.*
Parser docker
Path /var/log/containers/*.log
DB /var/log/flb_kube.db
Mem_Buf_Limit 5MB
[OUTPUT]
Name firehose
Match kube.*
region eu-west-1
delivery_stream spl-traefik-logs
Error:
log is DEPRECATED and will be removed in a future version. Use logs instead.
Output plugin 'firehose' cannot be loaded
Error: You must specify an output target. Aborting
Hi, just fyi, I created a helm chart for your fluent-bit plugin.
helm/charts#15830
Would you mind checking the content, documentation and references?
Cheers!
Right now, you can only configure a single Firehose destination. If you attempt to configure multiple, the final config will overwrite any previous configs. This behavior is not intentional.
Originally reported on the CloudWatch plugin: aws/amazon-cloudwatch-logs-for-fluent-bit#5
If we are using logs as a source of events, we might want to push logs to kinesis.
I see that v1.6 of upstream/vanilla Fluent Bit now includes its own output plugin for writing to Firehose, and says this:
This is the documentation for the core Fluent Bit Firehose plugin written in C. It can replace the aws/amazon-kinesis-firehose-for-fluent-bit Golang Fluent Bit plugin released last year.
However, I don't see any corresponding deprecation notice here. So, to clarify:
Currently, all log records that are greater than the max size are dropped. We should ideally truncate the JSON instead of discarding it so that the customers can see the partial log and identify why it is so large.
This issue needs to be fixed even on https://github.com/aws/amazon-kinesis-streams-for-fluent-bit
Skaffold adds skaffold specific labels to deployments when it is used to deploy e.g
app.kubernetes.io/managed-by=skaffold-v1.12.0
if another label already exists e.g.
app=app_name
then the firehost -> elasticsearch transfer fails due to an attempt to map twice, once as text, once as an object
{"attemptsMade":8,"arrivalTimestamp":1595704200773,"errorCode":"400","errorMessage":"{\"type\":\"mapper_parsing_exception\",\"reason\":\"Could not dynamically add mapping for field [app.kubernetes.io/managed-by]. Existing mapping for [kubernetes.labels.app] must be of type object but found text
The dots do not cause issues with Elasticsearch fluentbit plugin, as it has a Replace_Dots option
[OUTPUT]
Name es
Replace_Dots On
...
but for firehose, this isn't the case and elasticsearch transfer fails with
This isn't a skaffold specific issue, but more the lack of the Replace_Dots functionality of the firehose plugin. Is there any plan to add a Replace_Dots type option?
The above problem is due to using the kubernetes filter
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix kube.var.log.containers.
Merge_Log On
Merge_Log_Key log_processed
K8S-Logging.Parser On
Copyright (C) Treasure Data
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter delivery_stream = 'niki-staging-logs'\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter region = 'ap-south-1'\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter data_keys = ''\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter role_arn = 'arn:aws:iam::xxx:role/niki-staging-fluentd'\n"
time="2019-07-23T04:57:59Z" level=info msg="[firehose] plugin parameter endpoint = ''\n"
[2019/07/23 04:57:59] [ info] [storage] initializing...
[2019/07/23 04:57:59] [ info] [storage] in-memory
[2019/07/23 04:57:59] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2019/07/23 04:57:59] [ info] [engine] started (pid=1)
[2019/07/23 04:57:59] [ warn] [filter_kube] merge_json_log is deprecated, enabling 'merge_log' option instead
[2019/07/23 04:57:59] [ info] [filter_kube] https=1 host=kubernetes.default.svc port=443
[2019/07/23 04:57:59] [ info] [filter_kube] local POD info OK
[2019/07/23 04:57:59] [ info] [filter_kube] testing connectivity with API server...
[2019/07/23 04:57:59] [ info] [filter_kube] API server connectivity OK
[2019/07/23 04:57:59] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2019/07/23 04:57:59] [ info] [sp] stream processor started
time="2019-07-23T04:58:02Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"
time="2019-07-23T04:58:03Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"
time="2019-07-23T04:58:03Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"
time="2019-07-23T04:58:13Z" level=error msg="[firehose] PutRecordBatch request returned with no records successfully recieved\n"
Firehose destination settings
Amazon S3 destination
S3 bucket
niki-staging-logs
Prefixr aw/!{timestamp:yyyy/MM-dd}/
Error prefixerror/!{firehose:error-output-type}/!{timestamp:yyyy/MM-dd}/
Buffer conditions 30 MB or 600 seconds
CompressionDisabled
EncryptionDisabled
No record format conversion
Fluentbit conf
fluent-bit.conf: |
[SERVICE]
Flush 1
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
@INCLUDE input-kubernetes.conf
@INCLUDE filter-grep.conf
@INCLUDE filter-kubernetes.conf
@INCLUDE output-firehose.conf
input-kubernetes.conf: |
[INPUT]
Name tail
Tag app.*
Path /var/log/containers/*_app_*.log
Parser docker
DB /var/log/flb_kube_ns.db
Mem_Buf_Limit 30MB
Skip_Long_Lines On
Refresh_Interval 10
filter-grep.conf: |
[FILTER]
Name grep
Match *
Exclude log /health
filter-kubernetes.conf: |
[FILTER]
Name kubernetes
Match app.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Regex_Parser ns_k8s_parser
Merge_JSON_Log On
output-firehose.conf: |
[OUTPUT]
Name firehose
Match app.*
region ap-south-1
role_arn arn:aws:iam::xxx:role/niki-staging-fluentd
delivery_stream niki-staging-logs
parsers.conf: |
[PARSER]
Name first_line
Format regex
Regex ^{"log":"(?!\\u0009)(?<log>\S(?:(\\")|[^"]){9}(?:(\\")|[^"])*)"
[PARSER]
Name nested_json
Format json
Time_Keep true
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L
Decode_Field_As json log do_next
Decode_Field_As escaped message
[PARSER]
Name ns_k8s_parser
Format regex
Regex (?<tag>[^.]+)?\.?(?<pod_name>[^_]+)_(?<namespace_name>[^_]+)_(?<container_name>.+-(?<docker_id>[a-z0-9]{64}))\.log$
[PARSER]
Name docker
Format json
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%L
Time_Keep On
Decode_Field_As json log
What am I doing wrong?
Describe the bug
I am running fluent-bit 1.2.2 with systemd input and AWS kinesis output. Systemd does not generate the timestamp field inside the log, but put it prior to it like this:
[0] host.unknown: [1565706755.786588000, {"TRANSPORT"=>"syslog", "PRIORITY"=>"6", "SYSLOG_FACILITY"=>"10", "SYSLOG_IDENTIFIER"=>"sshd", "UID"=>"1000", "GID"=>"1000", "BOOT_ID"=>"5601a829777c459e845852afc9bcbeaf", "MACHINE_ID"=>"e14407097f0e4283a492f816e4bf6e79", "HOSTNAME"=>"ip-172-16-5-69.us-east-2.compute.internal", "SYSLOG_PID"=>"11971", "PID"=>"11971", "MESSAGE"=>"Disconnected from 172.16.12.137 port 51494", "SOURCE_REALTIME_TIMESTAMP"=>"1565706755768498"}]
When sending such a message to es output, it works fine since the output adds the timestamp, but Kinesis output seems to strip anything outside of json, so log messages look like this:
{"BOOT_ID":"53fb93cfe3664a0180672a247a4826cd","CAP_EFFECTIVE":"3fffffffff","CMDLINE":"/sbin/auditd","COMM":"auditd","EXE":"/usr/sbin/auditd","GID":"0","HOSTNAME":"ip-172-16-14-74.us-east-2.compute.internal","MACHINE_ID":"e14407097f0e4283a492f816e4bf6e79","MESSAGE":"Audit daemon rotating log files with keep option","PID":"15321","PRIORITY":"5","SOURCE_REALTIME_TIMESTAMP":"1565700926388220","SYSLOG_FACILITY":"3","SYSLOG_IDENTIFIER":"auditd","SYSLOG_PID":"15321","SYSTEMD_CGROUP":"/system.slice/auditd.service","SYSTEMD_SLICE":"system.slice","SYSTEMD_UNIT":"auditd.service","TRANSPORT":"syslog","UID":"0"}
As a result, i dont have a date field in AWS Elasticsearch. Parsing log messages for timestamp is not an option, since they dont have a consistent place to extract it from.
Can anything be done about this?
Version used: 1.2.2 with Kinesis output plugin
Configuration:
` fluent-bit-service.conf: |-
[SERVICE]
Flush 5
Daemon Off
Log_Level info
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_PORT 2020
Parsers_File parsers.conf
fluent-bit-input.conf: |-
[INPUT]
Name systemd
Tag host.*
Read_From_Tail true
Strip_Underscores true
fluent-bit-filter.conf: |-
[FILTER]
Name modify
Match host.*
Add SYSTEMD_UNIT unspecified
fluent-bit-output.conf: |-
[OUTPUT]
Name firehose
Match *
region us-west-2
delivery_stream my-stream
Running in EKS.
Today, if Firehose returns a throttling error plugin just discards the records and the log data is lost.
We have a fluent bit DaemonSet pushing data to the firehose (there are 4 instances (pod on each of the 4 nodes of k8s cluster) of the FluentBit, all 4 are sending the data to the same Firehose delivery stream. The limit we are hitting is 1MB/s data rate. Like most of the regions in Canada central AWS supports only 1MB/s. I tried the simple_aggregation configuration but it won't help much rather it would contribute to the problem of spikes.
Also, we don't have a lot of data. There are rare random spikes that cross the limit of 1 MB/s, for most of the part data is much less than 1MB/s. So increasing the Firehose limit just for these spikes doesn't seem reasonable.
There are two ways to handle it,
For option 2,
The plugin should make a Retry to the fluent bit engine whenever the Throttle error is returned by the Firehose. Ref: https://docs.fluentbit.io/manual/administration/scheduling-and-retries
And there should also be a configuration parameter to control the retry time and number of attempts in case of error.
With the introduction of AWS Graviton2 we observe customer interest in https://github.com/aws/aws-for-fluent-bit which is in turn dependent on an Arm64 build of amazon-kinesis-firehose-for-fluent-bit to be available.
Here's the associated issue in aws-for-fluent-bit - aws/aws-for-fluent-bit#44
Hello, for logs without timestamps, how to add the timestamp from Fluent Bit to the record sent to Kinesis?
Hi team, I am planning to use this plugin in our production cluster. After going through the documentation, it looks like there is a new high performance version available, but it lacks the replace_dots option. I require that option to avoid some mapping conflicts with existing elasticsearch index. I would like to know when will the high performance version have the replace_dots option?
Thanks.
When using aws-for-fluent-bit to send the records to Elasticsearch via Kinesis Firehose, I see duplicate records in Elasticsearch.
Every record appears twice.
Is this a known issue? Any work arounds for this? Blocked moving Firelens into Production with this.
We're using this plugin (firehose
) to send data directly to our Kinesis Firehose VPC endpoint, but the throughput seems fairly low (seems to max out at around 5 MB/s). After doing a packet capture, it appears as if this plugin only opens a single TCP socket/connection to the Firehose -- is there a way to get it to open multiple connections simultaneously, to increase throughput?
We tried using the new 'multiple workers' option mentioned in the documentation for the core plugin (kinesis_firehose
), but it didn't seem to have any effect when used with this plugin, presumably because this plugin doesn't support it. If that's the case, and that feature would solve our throughput issue, do you have an estimate on when this plugin will support that 'multiple workers' feature?
I see the following error for each configured usage of the kinesis firehose plugin whenever fluent-bit shuts down.
It looks like the plugin is trying to flush any remaining records and attempts to send an empty batch. Not sure if my configuration needs to be adjusted or perhaps some additional validation in the plugin is needed to ensure a batch is only sent if it contains at least one record.
[engine] caught signal (SIGTERM)
[2020/06/30 06:11:45] [ info] [input] pausing tail.0
[2020/06/30 06:11:45] [ info] [input] pausing tail.1
[2020/06/30 06:11:45] [ info] [input] pausing tail.2
[2020/06/30 06:11:45] [ info] [input] pausing tail.3
[2020/06/30 06:11:45] [ info] [input] pausing tail.4
[2020/06/30 06:11:45] [ info] [input] pausing tail.5
[2020/06/30 06:11:45] [ warn] [engine] service will stop in 5 seconds
[engine] caught signal (SIGTERM)
[2020/06/30 06:11:45] [ warn] [engine] service will stop in 5 seconds
[2020/06/30 06:11:50] [ info] [engine] service stopped
time="2020-06-30T06:11:50Z" level=error msg="[firehose 0] PutRecordBatch failed with InvalidParameter: 1 validation error(s) found.\n- minimum field size of 1, PutRecordBatchInput.Records.\n"
time="2020-06-30T06:11:50Z" level=error msg="[firehose 0] InvalidParameter: 1 validation error(s) found.\n- minimum field size of 1, PutRecordBatchInput.Records.\n\n"
I'm running aws-for-fluent-bit 2.3.1 which is bundled with Fluent Bit 1.4.5 and Amazon Kinesis Firehose for Fluent Bit 1.2.1.
At present, the Time_Format parameter only supports strftime format strings, which do not provide a sub-second feature. This makes it impossible to properly order logs.
Fluent Bit supports %L
, which provides nanoseconds. Is this a feasible addition to this Firelens plugin?
I've been trying to configure Fluent Bit with the kinesis firehose output but records sent to the stream are arriving in the destination S3 bucket with all fields assigned a value of null.
Here's an example record:
{"event_id":"534682","endpoint":"getProfile","patronnum":"2586677490","player_id":"823525","httpcode":"","curlerror":"","returncode":"","returntext":"","body":"","points":"","reference_id":"","customerrequestid":"","created":"2021-03-16 19:09:21","request":""}
The firehose stream I'm sending to has JSON to Parquet conversion configured, although it doesn't seem that this is the issue. I've tested sending an exact copy of the above record to the stream from the same server, but using the following AWS CLI command, and it is arriving in the bucket unscathed:
aws --region us-east-1 firehose put-record --delivery-stream-name 'Stream_teststream_testtable_backendevents' --record 'Data="{"event_id":"534682","endpoint":"getProfile","patronnum":"2586677490","player_id":"823525","httpcode":"","curlerror":"","returncode":"","returntext":"","body":"","points":"","reference_id":"","customerrequestid":"","created":"2021-03-16 19:09:21","request":""}"'
I've looked at the FB log but I don't see any errors in there, definitely nothing related to this issue. FYI, this image shows how the records are ending up in the parquet files:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.