Comments (57)
Kafka output is very useful feature since in our case every log coming through kafka and then reroute to it's stateful service like elasticsearch / hadoop / other real time streaming.
Actually this is the only feature im waiting for it to use FluentBit since I want to use Docker with fluentd log driver, and then forward it to kafka.
@edsiper do you may have eta when you will be able to start working on it?
from fluent-bit.
btw, are we ok to close this one ?
from fluent-bit.
Thanks @edsiper
What are the benefits to work against Kafka REST instead of the solid TCP?
from fluent-bit.
hello!
I wanted to share with you a preview of the new out_kafka plugin, it still need some work but it's in a functional stage. For now I've pushed a Docker image that includes Fluent Bit 0.12.8 + out_kafka, you can get it from here:
fluent/fluent-bit-kafka-dev
That image is totally experimental and is not considered to run in production. From a configuration perspective, the plugin requires an output section like this:
[OUTPUT]
Name kafka
Match *
Brokers 127.0.0.1
Topic sometopic
Timestamp_Key @timestamp
or run it directly from the command line:
$ docker run -ti fluent/fluent-bit-kafka-dev /fluent-bit/bin/fluent-bit -i cpu -o kafka -p brokers=IP -f 1
Please send your feedback.
from fluent-bit.
@StevenACoffman actually I am working on that at the moment.
from fluent-bit.
@edsiper I really want to know when release v0.13 ?
from fluent-bit.
Added to the to-do list. thanks
from fluent-bit.
@panda87 between Q2/Q3
from fluent-bit.
Is there ongoing work with this? There is a https://github.com/samsung-cnct/fluent-bit-kafka-output-plugin but it needs more work, configurability in particular. If a native impl is on its way I could try to contribute there instead.
from fluent-bit.
@solsson not implemented yet, as said, between Q2/Q3.
from fluent-bit.
Yes I read so and interpreted it as 2017-06-30T23:59:59 :) So I figured there's some source somewhere. But I will give the go impl a chance.
from fluent-bit.
FYI:
On GIT Master I've pushed a new Kafka REST output plugin.
The plugin is still under development but functional, any feedback is welcome. You can find a configuration example here
from fluent-bit.
On GIT Master I've pushed a new Kafka REST output plugin.
Interesting. I'll give it a try once there's a new build of fluent/fluent-bit:0.12-dev
.
from fluent-bit.
@panda87 : both plugins will be implemented: kafka-rest and kafka (native TCP), this is a snapshot of kafka-rest
@solsson fluent/fluent-bit:0.12-dev it's under build process (it should take ~20 min)
from fluent-bit.
Thanks @edsiper
from fluent-bit.
I quickly tested this as a branch from https://github.com/fluent/fluent-bit-kubernetes-daemonset/, with Yolean/kubernetes-kafka#45, but no luck. Logs only say:
[2017/07/25 06:10:33] [ info] [engine] started
[2017/07/25 06:10:33] [ info] [filter_kube] https=1 host=kubernetes.default.svc port=443
[2017/07/25 06:10:33] [ info] [filter_kube] local POD info OK
[2017/07/25 06:10:33] [ info] [filter_kube] testing connectivity with API server...
[2017/07/25 06:10:33] [ info] [filter_kube] API server connectivity OK
No trace of connections to kafka rest. I don't have time to dig into this now. Maybe it has to do with the upgrade from 0.11 to 0.12.
Edit: The plugin works just fine. I had two errors that obscured each other, and because no logs were read I got no error message in the container's stdout. Diff documented as Yolean/fluent-bit-kubernetes-logging#1.
from fluent-bit.
I get a lot of logs in Kafka from the fluent-bit container itself saying
[ warn] [out_kafka_rest] http_do=-1
[error] [http_client] broken connection to rest.kafka.svc.cluster.local:80
correctly logged (in the Kafka sense) as
{"@timestamp":"2017-07-25T11:47:39.53173578Z","_fluent-tag":"kube.var.log.containers.fluent-bit-3tk62_kube-system_fluent-bit-9524b0051905794bc305d80567fb157524ad1a888ee76c07f6d9135ae1a2bc1b.log","log":"[2017/07/25 11:47:39] [ warn] [out_kafka_rest] http_do=-1\\n","stream":"stderr","time":"2017-07-25T11:47:39.053173578Z","kubernetes":{"pod_name":"fluent-bit-3tk62","namespace_name":"kube-system","container_name":"fluent-bit","docker_id":"9524b0051905794bc305d80567fb157524ad1a888ee76c07f6d9135ae1a2bc1b","pod_id":"ad6edb72-712e-11e7-9508-080027faebd3","labels":{"controller-revision-hash":"2302797867","k8s-app":"fluent-bit-logging","kubernetes.io/cluster-service":"true","pod-template-generation":"1","version":"v1"},"annotations":{"kubernetes.io/created-by":"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"DaemonSet\\\",\\\"namespace\\\":\\\"kube-system\\\",\\\"name\\\":\\\"fluent-bit\\\",\\\"uid\\\":\\\"ad6ab493-712e-11e7-9508-080027faebd3\\\",\\\"apiVersion\\\":\\\"extensions\\\",\\\"resourceVersion\\\":\\\"782159\\\"}}\\n"}}}
{"@timestamp":"2017-07-25T11:47:39.54767537Z","_fluent-tag":"kube.var.log.containers.fluent-bit-3tk62_kube-system_fluent-bit-9524b0051905794bc305d80567fb157524ad1a888ee76c07f6d9135ae1a2bc1b.log","log":"[2017/07/25 11:47:39] [error] [http_client] broken connection to rest.kafka.svc.cluster.local:80 ?\\n","stream":"stderr","time":"2017-07-25T11:47:39.054767537Z","kubernetes":{"pod_name":"fluent-bit-3tk62","namespace_name":"kube-system","container_name":"fluent-bit","docker_id":"9524b0051905794bc305d80567fb157524ad1a888ee76c07f6d9135ae1a2bc1b","pod_id":"ad6edb72-712e-11e7-9508-080027faebd3","labels":{"controller-revision-hash":"2302797867","k8s-app":"fluent-bit-logging","kubernetes.io/cluster-service":"true","pod-template-generation":"1","version":"v1"},"annotations":{"kubernetes.io/created-by":"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"DaemonSet\\\",\\\"namespace\\\":\\\"kube-system\\\",\\\"name\\\":\\\"fluent-bit\\\",\\\"uid\\\":\\\"ad6ab493-712e-11e7-9508-080027faebd3\\\",\\\"apiVersion\\\":\\\"extensions\\\",\\\"resourceVersion\\\":\\\"782159\\\"}}\\n"}}}
@edsiper Do you think this is an issue with my configuration, or a non-error being logged by the output plugin?
from fluent-bit.
@solsson the error is that Fluent Bit is not able to connect to rest.kafka.svc.cluster.local TCP port 80
from fluent-bit.
FYI: Documentation updated:
http://fluentbit.io/documentation/0.12/output/kafka-rest-proxy.html
from fluent-bit.
What's the status on out_kafka (native)? Alternatively, what's the status on fluent-bit-go wrt 0.12?
from fluent-bit.
I saw fluent/fluent-bit-go#6 (comment) now.
from fluent-bit.
For what it is worth, we would really like to produce directly to kafka instances without the REST proxy.
from fluent-bit.
Yeah, me too, feels like the extra layer is both a performance and a reliability risk. In the meantime I've been experimenting with filebeat's kafka support, it's on the broker level, though memory footprint looks like 10x that of fluent-bit: Yolean/kubernetes-kafka#88
As a fun experiment I've also experimented with tail -f + kafkacat, which runs on a tenth of filebeat's memory but of course doesn't remember the position and can't add k8s metadata.
Didn't have the time to try more fluent-bit-go, and from the history of it I'd have to port such code for new fluent-bit minor releases.
from fluent-bit.
The reliability is our main concern. If we have an init container that can bootstrap the kafka cluster ip addresses, so the normal kafka producer will adjust as kafka nodes come and go. Looks like the fluent-bit-go is being maintained, and the uncertainty of this ticket is depressing maintenance of fluent-bit-kafka-output-plugin). Thanks for the pointer.
from fluent-bit.
@solsson Did you notice this fluent/fluentd-kubernetes-daemonset#34? I was thinking of building fluentd 0.14.22 now that it's stable and comparing it's memory and cpu usage to filebeat and fluent-bit.
from fluent-bit.
@StevenACoffman Actually I didn't try fluentd, as I was so impressed with the scope and footprint of fluent-bit, and there was a kafka output scheduled for "between Q2/Q3" ... and then ofc I also needed to work on how to process those logs once they're in Kafka :)
from fluent-bit.
Hello everyone,
Thanks for sharing your comments and interest. I wanted to let you know that the plugin is already in the development phase, so I will keep you posted about it :)
from fluent-bit.
Great news! I would love to see this feature.
from fluent-bit.
@solsson I forked the official fluentd repo over at StevenACoffman/fluentd-kubernetes-daemonset#1 to update it to fluentd 14.22 and added the kafka plugin. All those images got pushed to my public docker hub but I haven't benchmarked it yet. I really appreciated your work in the Yolean/kubernetes-kafka repo!
@edsiper I've said it before, and I'll probably say it again, but very nice work! I'd much prefer to use fluent-bit with a direct kafka plugin, so I'll wait on that.
from fluent-bit.
I gave it a quick spin, resulting in fluent/fluent-bit-kubernetes-logging#11 - @edsiper btw thanks for the refactoring there into configmap instead of custom docker build. Used a fresh minikube (meaning I have no meaningful indications of performance or resource use yet).
I do get a healthy amount of sane looking messages in kafka. I also see some errors like [error] [out_kafka] fluent-bit#producer-1: [thrd:kafka-1.broker.kafka.svc.cluster.local:9092/bootstrap]: kafka-1.broker.kafka.svc.cluster.local:9092/1: Receive failed: Disconnected
with no obvious disturbance in kafka, but too little to draw any conclusions.
I also see lots of [ warn] [filter_kube] could not pack merged json
but I have yet to try to figure out if that means we lose messages. TODO compare message count to that from filebeat.
An interesting question is what to use as key? In a production cluster the logs topic will most certainly need partitioning, thus order is only preserved within keys. The obvious choice would be to use namespace_name
+ _
+ pod_name
as key, but I've been thinking about other approaches while evaluating logs streaming approaches ...
This would obviously need to be configurable, but: JSON formatted keys are not uncommon in Kafka. How about putting the json structure in the key, omitting the timestamp and the message? Then put the verbatim message as value. Kafka stores a timestamp per record already, and log records tend to include one, so the @timestamp
isn't strictly necessary. The advantage of this is that you can consume messages as you would log streams directly from the host, possibly filtering on key.
Great work @edsiper. I'll do further testing next week.
from fluent-bit.
@solsson thanks for testing. Comments below:
I do get a healthy amount of sane looking messages in kafka. I also see some errors
like [error] [out_kafka] fluent-bit#producer-1: [thrd:kafka-1.broker.kafka.svc.cluster.
local:9092/bootstrap]: kafka-1.broker.kafka.svc.cluster.local:9092/1: Receive
failed: Disconnected with no obvious disturbance in kafka, but too little to draw
any conclusions.
by default all errors are enable, but definitely is a TCP disconnect from the server side (or network problem).
I also see lots of [ warn] [filter_kube] could not pack merged json but I have yet to
try to figure out if that means we lose messages. TODO compare message count to
that from filebeat.
when Merge_JSON_Log option is enabled on filter_kubernetes, the filter try to interpret the log content as a JSON map (stripping slashes first). If that resulting JSON map is invalid, the filter will raise the "could not pack merged json", so likely the app that generate that JSON log, generated an invalid entry.
When this happens, the log is not lost, just the nested JSON is handled as a string.
An interesting question is what to use as key? In a production cluster the logs topic
will most certainly need partitioning, thus order is only preserved within keys. The
obvious choice would be to use namespace_name + _ + pod_name as key, but I've
been thinking about other approaches while evaluating logs streaming approaches ...
I think on this case it needs to be flexible, as stated on fluent/fluent-bit-kubernetes-logging#11 (comment) I will work on a way to compose values on runtime using the record fields.
This would obviously need to be configurable, but: JSON formatted keys are not uncommon
in Kafka. How about putting the json structure in the key, omitting the timestamp and the
message? Then put the verbatim message as value. Kafka stores a timestamp per record
already, and log records tend to include one, so the @timestamp isn't strictly necessary.
The advantage of this is that you can consume messages as you would log streams
directly from the host, possibly filtering on key.
About putting the payload in the key...hmm I would ask for more feedback, I don't have a strong opinion about it.
About the timestamp this is required, from a consumer perspective you don't know if you are analyzing archive logs that arrived recently to kafka or not.
from fluent-bit.
In our home grown log collector/aggregator/forwarder we have in our structured log message a dest
key that maps to an array of kafka topics (destinations). That way the application can pick it's own topics at different points. If no destination key specified (or if it is an unstructured log message), then it will be routed to a default app specific key. Usually this is full of stack traces and other stuff.
If fluent-bit's out_kafka could produce to multiple topics if they were comma separated, that would require much less configuration than in fluentd where we need to use record_reformer to add the field to tag, use the tag routing system, and use the tag as topic to create multiple topics per data stream.
Alternatively, we could produce all messages to a static topic and have a consumer/producer that would perform fanout there.
from fluent-bit.
thanks @StevenACoffman , so basically the plugin needs:
- add support for multiple topics (multiple producers)
- built-in routing capability where when taking a record/field, route to a specific topic based on a pattern
from fluent-bit.
We've seen this need for multiple topics too, but I was guessing it could be accomplished using regular routing or the Match
field in [OUTPUT]
. Can't Match
use anything other than Tag
s? Alternatively, can parsers or filters add Tag
s, for example per regex match?
Another question: I've noticed that a significant percentage (~30%) of the data produced is for the kubernetes.io/created-by
annotations. I like the inclusion of annotations, but is there some way to exclude that particular one? If not I'd rather have an option to exclude the annotations
key from payloads. It'll be a tradeoff with both write performance and cost.
About the timestamp this is required, from a consumer perspective you don't know if you are analyzing archive logs that arrived recently to kafka or not.
Producers can set this timestamp, if I'm not mistaken. Consumers can read it. The idea was only to handle @timestamp
in a kafka-specific way, not to omit it.
from fluent-bit.
To clarify, I'd be happy to put this in production with the current feature set. Wouldn't want to wish for things that delay a release. If you ask me to prioritize after that, I'd currently go:
- Key based on container name (prefixed by namespace would be a bonus)
- Some way to omit
kubernetes.io/created-by
or the entireannotations
.
There's always the option to produce to a topic with short retention, then transform the payload at will to another topic.
from fluent-bit.
[UPDATE]
I've pushed a new version of the image with the following changes:
- Support multiple topics based on record key
- Small cleanups
Image fluent/fluent-bit-kafka-dev:0.2
New configuration schema is as follows:
[OUTPUT]
Name kafka
Match *
Brokers 127.0.0.1
Topics topicA,topicB,topicC
Topic_Key router
Timestamp_Key @timestamp
If you look carefully the configuration now have a Topics configuration parameter instead of Topic, you can set all topics separated by comma. The default topic is the first set on the left.
Optionally a new parameter Topic_Key was added. If set, for each incoming record the topic key value will be matched against the record field router, if matches, it will use router field value as the topic (which was previously set in Topics.
Examples of records that are routed to specific topics:
{"key": 12345, "router": "topicB"}
{"key": 12345, "router": "topicA"}
As usual please send your feedback.
from fluent-bit.
@edsiper So this appears to work very well and opens a host of options to us. As always, you are amazing!
Question: I assume that this plugin supports a Retry_Limit (we like to set it to false) like the others, so what if we Route to both out_kafka plugin as well as an out_es and one succeeds and the other fails? Would fluent-bit retry both? The documentation diagram shows buffer before route and output.
from fluent-bit.
@StevenACoffman thanks!
the retry_logic is what is missing at the moment on this new plugin, to be implemented shortly.
from fluent-bit.
[UPDATE]
I've pushed a new version 0.3:
fluent/fluent-bit-kafka-dev:0.3
Changes
- New configuration parameter format, by default output record format is json, optionally it can be set to msgpack (for high performance environment needs)
from fluent-bit.
@edsiper We've run this in parallel to other solutions for a week to compare it to alternatives, and had good success. There are only a few missed messages due to transient network glitches, so we're just waiting for the retry_logic for us to call it good. Thanks!
from fluent-bit.
@edsiper just checking in. For our planning, does it still seem likely to add the retry_logic in the near time?
from fluent-bit.
New version has been pushed:
fluent/fluent-bit-kafka-dev:0.4
Please test and send me some feedback (retry logic in place)
from fluent-bit.
any feedback on the last version provided ?
from fluent-bit.
@edsiper see fluent/fluent-bit-kubernetes-logging/pull/11
from fluent-bit.
Sorry I haven't had a chance to set up a public repo to reliably and reproducibly test it. I will try to do that today
from fluent-bit.
@edsiper Is there any way to pass debug flags to librdkafka? With kafkacat it's very useful to add -d broker,topic
if problems occur.
I was having some issues that were internal to kafka and it was difficult to sort out whether it was the retry logic or kafka's config that was problematic.
Overall impression is that it is pretty solid, but I haven't fully explored different retry limit scenarios.
from fluent-bit.
@solsson would you please re-submit the PR on 0.13-dev branch https://github.com/fluent/fluent-bit-kubernetes-logging/tree/0.13-dev ?
Note: I am pushing a new image and docs for Fluent Bit 0.13-dev on Kubernetes, the old kafka image will not be updated, please check for more details here:
https://github.com/fluent/fluent-bit-kubernetes-logging/tree/0.13-dev
from fluent-bit.
@wukq I would like to have it released on January 31 (I am focusing on that), but as you know everything will depends on general feedback and improvements required.
For now I will keep updating the 0.13-dev image, if more people test it, we will be more confident about the current status
from fluent-bit.
@solsson would you please re-submit the PR on 0.13-dev branch https://github.com/fluent/fluent-bit-kubernetes-logging/tree/0.13-dev ?
@edsiper done in fluent/fluent-bit-kubernetes-logging#16
Fantastic work with the new version. I've enabled the Prometheus exporter in our QA and fluentbit_input_bytes_total
is immensely useful to spot unexpectedly high volumes. Also fluentbit_output_retries_total
should help us track issues with the Retry_Limit
tests from fluent/fluent-bit-kubernetes-logging#11 (comment). Haven't had any retries yet though :)
from fluent-bit.
FYI: the 0.13-dev image have been updated to 0.6:
https://github.com/fluent/fluent-bit-kubernetes-logging/tree/0.13-dev (fixed)
note: this one fix a crash found in the Prometheus exporter.
from fluent-bit.
FYI: 0.13-dev image has been updated to 0.7:
https://github.com/fluent/fluent-bit-kubernetes-logging/tree/0.13-dev
changes on this version are related to fixes in memory handling, new configuration option for out_kafka to adjust rdkafka internals and in_tail fixes.
from fluent-bit.
IMO out_kafka is feature complete now.
from fluent-bit.
I agree. The newest improvements (notably the new configuration option for out_kafka to adjust rdkafka internals) to be able to limit kafka buffer make it a complete feature now. It has been pretty solid in my testing over the last 24 hours, but I'm going to try putting a few million messages through it, and see how it handles.
from fluent-bit.
@solsson @StevenACoffman should I set these settings by default in the plugin ? what do you think ?:
- rdkafka.log.connection.close false
- rdkafka.queue.buffering.max.kbytes 10240
- rdkafka.request.required.acks 1
note: of course they can be overridden by config any time
from fluent-bit.
I can see people adjusting them but Those look pretty good as defaults to me.
from fluent-bit.
I'm unsure. For users of the kubernets manifests it's quite transparent that defaults are from https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md and you see suggested overrides in plain text.
Without the sample manifests I'd say you definitely want rdkafka.log.connection.close
to avoid support requests. The [error]
level log entries for a non-issue fooled me for sure. Maybe the same goes for rdkafka.queue.buffering.max.kbytes
, if we can confirm that it solves the memory spikes. But the value I set was just a guess. On the other hand, again for transparency this might be better placed in the output plugin's docs.
from fluent-bit.
thanks for the feedback. I will document that in the plugin docs.
from fluent-bit.
Related Issues (20)
- Allow you to specify the port when working with the kinesis plugin. HOT 2
- MQTT input plugin does not accept events over 1KB of size HOT 2
- multiple multiline logic question
- [systemd] Crash with SIGBUS on log rotation HOT 10
- Metrics counter is 0 for fluentbit_input_bytes_total and fluentbit_input_records_total with additional rewrite_tag filter
- Publish to ECR public galleries HOT 2
- [error] [config] section 'output' is missing the 'name' property
- Parsing the Same Log in Different Ways HOT 4
- [error] [config] error in C:\conf\fluent-bit.conf:5: undefined value - check config is in valid classic format HOT 1
- Build fails with `-DFLB_HTTP_SERVER=No`
- [v3.1.5] Reserve_Data = On not functioning as expected on Regex Parsers HOT 2
- README.md links to integration tests which are disabled and are failing
- Memory leak in in_emitter mem_buf when appending metric HOT 10
- /api/v2/metrics/prometheus endpoint doesn't follow prometheus 2.0 exposition format HOT 2
- Buffer overrun when reallocating JSON buffer in `flb_msgpack_raw_to_json_sds`
- [v3.1.5] Time_System_Timezone feature fails to work when DST is in effect HOT 2
- Building on Windows fails HOT 7
- AWS S3 Output Plugin: Multipart Upload / no pause between retries HOT 1
- Need Fluent-Bit to support reading cert from Windows Certstore
- Kafka outputs on Windows HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fluent-bit.