fluent / fluent-logger-java Goto Github PK
View Code? Open in Web Editor NEWA structured logger for Fluentd (Java)
License: Apache License 2.0
A structured logger for Fluentd (Java)
License: Apache License 2.0
When We try to use the Fluentd java logger it throws the fellowing NullPointerException when the Fluentd server is down:
An exception occurred processing Appender fluentd java.lang.NullPointerException at org.fluentd.logger.sender.RawSocketSender.flushBuffer(RawSocketSender.java:158) at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:168) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:149) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:131) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:126) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:101) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:86)
The source of the problem is that when the Fluentd server is down the socket in the RawSocketSender object is null so the call to the method RawSocketSender.flushBuffer#Line:152 throws the null pointer exception when executing the fellowing instruction (RawSocketSender.flushBuffer#Line:158):
LOG.error("Cannot send logs to " + socket.getInetAddress().toString());
message:
[tag, time, record]
or
[tag, [[time,record], [time,record], ...]]
For bulk writing, could we have a way to send the second?
FluentLoggerFactory factory = new FluentLoggerFactory();
ArrayList<FluentLogger> loggers = new ArrayList<FluentLogger>();
for(int i=0; i<100; i++) {
loggers.add(factory.getLogger("testtag" + i, "localhost", 999));
}
System.gc();
assertEquals(loggers.size(), factory.getLoggers().size());
The last assertion is sometimes failed. That because WeakHashMap
has weak-referenced keys and strong-referenced values, so when key(String
generated in FluentLoggerFactory#getLogger()
) is GCed, correspond logger is removed from the map.
Because of this, FluentLogerFactory#closeAll()
and flushAll()
has unexpected behaviour: Some loggers are closed/flushed, and rest is not touched.
It seems just a bug and use normal HashMap
to fix it. Is this correct?
Could you create develop
branch so that contributors can send pull requests without harming the master
branch?
I think master branch should be preserved as a pointer to the latest release that works correctly, while develop branch is for enhancing features.
I want to get a Sender instance from an instance of FluentLogger, but I could not be located such as a method. Is there a way to do that?
In particular, I wouldn't mind doing as the following.
https://github.com/moznion/fluent-logger-mock-sender
Currently, only TCP protocol is supported between the app logger and the td-agent.
Can this be changed to a different protocol, for ex. UDP?
Please let me know. Thanks.
Raghuram
thuntaa (at) gmail.com
Hi, I have problem using fluent-logger-java in Spring Boot because Logback configuration reinitialized on Spring startup.
case:
fluentLogger = FluentLogger.getLogger(tag, remoteHost, port);
FluentLoggerFactory.loggers
is empty, was created new logger protected FluentLogger(String tagPrefix, Sender sender) {
this.tagPrefix = tagPrefix;
this.sender = sender;
}
fluentLogger.close();
public void close() {
if (sender != null) {
sender.flush();
sender.close();
sender = null;
}
}
FluentLoggerFactory.loggers
fluentLogger = FluentLogger.getLogger(tag, remoteHost, port);
was returning from cache FluentLoggerFactory.loggers
Now, this fluentLogger
not worked, because was stopped before and have sender = null;
I and @komamitsu san discussed how to improve error reporting of the current 0.2.x versions.
The major changes we considered are:
setHandler(handler)
method to FluentLogger to accept an application-specific error handler.In this change, we should consider the following problem:
log
method (true or false), unmanaged exceptions or exceptions thrown when the buffer is full. If an error handler is added, log
method should be non-blocking method, and the error must be handled in the user-defined error handler (in the subsequent code or in another thread). Does it the right choice?Another option would be:
log
a blocking method and reporting errors by Exception rather than returning true or false. The last log event should be included in the thrown exception.After writing this ticket, Plan 2 now looks simpler to me.
Any idea?
For some reason RawSocketSender#close() is not synchronized. That may lead to NPEs if RawSocketSender#close() is called during flushing operations
Currently FluentLogger.log() calls RawSocketSender.emit() which calls RawSocketSender.send().
If the connection to fluentd fails for a long enough period of time that the pendings ByteBuffer fills up then RawSocketSender.send() will complain about the full buffer and then return.
This means it will never attempt to reconnect to fluentd as the reconnect attempt happens after the return.
I believe that send() should append the new message if the buffer has space, drop it on the floor if the buffer is full but in either case should continue to the reconnect check.
In my use case, we can not assume that fluentd will always be running. Ideally, the logger would log as usual if fluentd is running; if it is not running, it should probably log a warning and move on instead of throwing exceptions; finally, it should silently try to reconnect in the background and resume logging as soon as a connection is available.
Ideally the library would also do some in-memory buffering to minimize loss of log msgs. Thoughts?
Commit version - da14ec3
Describe the bug
It is possible to encounter an unexpected crash due to using the shared variable array in multiple locations (variable name is counters, in TestFluentLogger). In one place at Line 403 sets the value in this data-structure, whereas another location at Line 461 gets the value from this shared variable. However, there is no synchronization given between these accesses. As a result, anytime due to the existence of multiple threads, the program may crash.
Expected behavior
No crash is expected
Additional context
When I investigated the TestFluentLogger class, I found that the counters is a shared variable declared at Line 385 that is used at Line 403, Line 461. Here, the unexpected crash or inconsistency may occur if multiple threads interleave.
Environment:
I ran the test on an Ubuntu 20.04 LTS machine using OpenJDK 1.8.0_312.
I was thinking the default behaviour it will reconnect to fluentd socket. I was not changing the port, only restarting fluentd. But after fluentd restarted, webapp in tomcat can't forward event to fluentd.
If I use parallel deployment, the new version able to forward the message to fluentd.
How to make webapp still able to forward the message to fluentd?
I'm using spring framework, and I uses the logger like below:
@Controller
public class TestController {
private static FluentLogger LOG = FluentLogger.getLogger("tracker","localhost", 24224);
@ResponseBody
@RequestMapping(value = "/test", method = RequestMethod.GET)
public String test(HttpServletRequest request) throws IOException {
Map<String, Object> data = new HashMap<String, Object>();
String ip = IpHelper.getClientIpAddress(request)
data.put("ip", ip);
LOG.log("data", data);
return ip;
}
}
Is there anything I did wrong? Is there any way to achieve what I want? (webapp still able to forward log to fluentd after fluentd get restarted)
Given that the most common pattern for FluentLogger.getLogger is to assign it as a static member, these objects are likely to get initialized early in the application lifecycle. The current implementation seems to try and open a connection to fluentd immediately, and fails miserably if fluentd isn't running:
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at org.fluentd.logger.sender.RawSocketSender.connect(RawSocketSender.java:89)
at org.fluentd.logger.sender.RawSocketSender.open(RawSocketSender.java:77)
at org.fluentd.logger.sender.RawSocketSender.<init>(RawSocketSender.java:72)
at org.fluentd.logger.FluentLoggerFactory.getLogger(FluentLoggerFactory.java:61)
at org.fluentd.logger.FluentLoggerFactory.getLogger(FluentLoggerFactory.java:44)
at org.fluentd.logger.FluentLogger.getLogger(FluentLogger.java:31)
This is bad for many reasons, the most important being that doing this kind of error-prone I/O in constructors or factory methods is Bad Practice.
Hi,
I'd like to share the report on backward binary compatibility and API changes for the library: https://abi-laboratory.pro/java/tracker/timeline/fluent-logger/
The report is generated by the https://github.com/lvc/japi-compliance-checker tool for jars found at http://central.maven.org/maven2/org/fluentd/fluent-logger/ according to https://wiki.eclipse.org/Evolving_Java-based_APIs_2.
Thank you.
If FluentLogger is used as an appender with logback (such as https://github.com/sndyuk/logback-more-appenders) then any logging by FluentLogger and related classes such as RawSocketSender can end up creating a nice little logging loop.
For instance, RawSocketSender will log "Created messages" or a message when the buffer is full... all of which get routed back into slf4j and then likely routed right back into FluentLogger for logging....
While the obvious answer is to drop any org.fluentd.logger.* messages that would end up routed to the FluentLogger appender (thereby possibly leaving them routed to other appends) that would then mask any real messages but I wonder if there is a better solution?
I don't have any ideas but figured I'd throw the problem out there and maybe someone would have a good idea.
It may also be helpful to enable some type of throttling on messages--- like if the buffer is full due to a down fluentd.
Also cross posted here: sndyuk/logback-more-appenders#2
There is no way to set a socket read timeout.
socket.setSoTimeout() is not configurable.
So if there are network issues (packet loss for a certain duration) OR if fluentd process itself is not responding then all logging threads could get blocked and bring the application to a standstill.
When looking through RawSocketSender to troubleshoot our other issue (see comment on #18), I noticed that RawSocketSender does not handle large events very well in send(). It assumes that flush() always makes enough capacity in the pendings buffer to save the new bytes. If the event was bigger than the buffer, you get a BufferOverflowException.
java.nio.BufferOverflowException
at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:189)
at java.nio.ByteBuffer.put(ByteBuffer.java:859)
at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:171)
Also, if every log message you send is larger than the buffer, then the RawSocketSender will attempt to flush to the socket on every message.
Is this the desired behavior? I could imagine adding a size check to the send() method that immediately returns false if the event is too large, to avoid the BufferOverflowException. Or perhaps the large event could skip the buffer and write directly out to the socket. Since we are already using a BufferedOutputStream, I also wonder why we need the internal pendings buffer at all?
What is the normal way of handling errors in RawSocketSender?
Tested sending 4 million logs of data to fluentd using fluent-logger-java. And we were testing the buffer of fluent-logger. We send to a log forwarder which sends it to an aggregator. The aggregator writes it into the postgresql database. Tried disallowing the connection of the aggregator to the database so that the buffer would be filled up.
However at around 2 million logs worth of data trying to be sent through fluent logger java, this error occurred.
We were thinking if it were possible to check how much data is in the buffer already. Is there a way for us to check the current buffer size of fluent-logger-java?
@repeatedly @komamitsu
I'll be happy to contribute the code if interested.
When running spark-submit with slf4j-log4j12-1.7.18.jar in the ClassPath, FluentLogger works with no issues.
When we use version 1.7.30 which comes with Spark3.0, Fluentlogger.getLogger throws a null pointer exception.
This is the offending commit introduced in 1.7.19 which was done to fix a NullPointerException. Its force initializing rootLogger to fix a NPE in multithreaded scenarios looks like. The irony is this is the commit which is causing NullPointerException for us
qos-ch/slf4j@111b8e5
Can we upgrade the version of slf4j-api in FluentLogger to latest 1.7.30 and make it compatible?
hdiuser@hn0-d0140a:/usr/hdp/4.1-0/spark2/jars$ spark-submit --version
SPARK_MAJOR_VERSION is set to 2, using Spark2
Unexpected problem occured during version sanity check
Reported exception:
java.lang.NullPointerException
at org.slf4j.LoggerFactory.versionSanityCheck(LoggerFactory.java:267)
at org.slf4j.LoggerFactory.performInitialization(LoggerFactory.java:126)
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:412)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
at org.fluentd.logger.sender.RawSocketSender.(RawSocketSender.java:34)
at org.fluentd.logger.FluentLoggerFactory.getLogger(FluentLoggerFactory.java:72)
at org.fluentd.logger.FluentLoggerFactory.getLogger(FluentLoggerFactory.java:51)
at org.fluentd.logger.FluentLogger.getLogger(FluentLogger.java:40)
at com.mdsdclient.MdsLogger.(Unknown Source)
at com.log4jappender.common.logger.HdInsightLoggerLinux.(Unknown Source)
at com.log4jappender.common.logger.HdInsightLoggerFactory.getLogger(Unknown Source)
at com.log4jappender.common.appendercore.AnonymizeLogAppenderCommon.getLogger(Unknown Source)
at com.log4jappender.common.appendercore.AnonymizeLogAppenderCommon.(Unknown Source)
at com.microsoft.log4jappender.AnonymizeLogAppender.activateOptions(Unknown Source)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.(LogManager.java:127)
at org.slf4j.impl.Log4jLoggerFactory.(Log4jLoggerFactory.java:66)
at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:72)
at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:45)
at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.fluentd.logger.FluentLoggerFactory.getLogger(FluentLoggerFactory.java:72)
at org.fluentd.logger.FluentLoggerFactory.getLogger(FluentLoggerFactory.java:51)
at org.fluentd.logger.FluentLogger.getLogger(FluentLogger.java:40)
at com.mdsdclient.MdsLogger.(Unknown Source)
at com.log4jappender.common.logger.HdInsightLoggerLinux.(Unknown Source)
at com.log4jappender.common.logger.HdInsightLoggerFactory.getLogger(Unknown Source)
at com.log4jappender.common.appendercore.AnonymizeLogAppenderCommon.getLogger(Unknown Source)
at com.log4jappender.common.appendercore.AnonymizeLogAppenderCommon.(Unknown Source)
at com.log4jappender.AnonymizeLogAppender.activateOptions(Unknown Source)
at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
at org.apache.log4j.PropertyConfigurator.configureRootCategory(PropertyConfigurator.java:648)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:514)
at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
at org.apache.log4j.LogManager.(LogManager.java:127)
at org.slf4j.impl.Log4jLoggerFactory.(Log4jLoggerFactory.java:66)
at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:72)
at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:45)
at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222)
at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127)
at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:83)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.slf4j.LoggerFactory.getILoggerFactory(LoggerFactory.java:418)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:357)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
at org.fluentd.logger.sender.RawSocketSender.(RawSocketSender.java:34)
... 31 more
It would be nice if this logger would support Unix Domain Sockets as the target instead of only (host, port) inet addresses. UnixDomainSocketAddress
was added in Java 16. However, this library need not depend on that if SocketAddress
can be supplied to getLogger()
which has been around since 1.4.
This feature would be particularly useful for container environments such as Docker because some not very nice tricks are required for a container to connect to a host socket. Note that Unix domain sockets are supported by Fluentd and Fluent-Bit.
To be able to use the logger in an OSGi container as a bundle the manifest file needs some entries. A good primer on OSGi and what needs to be in the manifest is the tutorial on vogella.com http://www.vogella.com/tutorials/OSGi/article.html
I have read some issues before about the reason to use a WeakHashMap in FluentLoggerFactory. And the WeakHashMap was changed from WeakHashMap<String, FluentLogger> (version 0.3.1) to WeakHashMap<FluentLogger, String>(current version 0.3.2).
But I got a FluentLogger with null sender in version 0.3.2 when using logback in Spring Boot application.
The sender could not be connecting to fluent server for long time. So the sender should be assigned to null after calling FluentLogger.close().
But in my situation, how can I get a work-well logger after Spring Boot application has been luanched.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
public class FluentLoggerFactory {
private final Map<FluentLogger, String> loggers;
public FluentLoggerFactory() {
loggers = new WeakHashMap<FluentLogger, String>();
}
public synchronized FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
Reconnector reconnector) {
String key = String.format("%s_%s_%d_%d_%d", new Object[] { tagPrefix, host, port, timeout, bufferCapacity });
for (Map.Entry<FluentLogger, String> entry : loggers.entrySet()) {
if (entry.getValue().equals(key)) {
FluentLogger found = entry.getKey();
if(found != null) {
return found; // the found contains a null valued sender
}
break;
}
}
Sender sender = null;
Properties props = System.getProperties();
if (!props.containsKey(Config.FLUENT_SENDER_CLASS)) {
// create default sender object
sender = new RawSocketSender(host, port, timeout, bufferCapacity, reconnector);
} else {
String senderClassName = props.getProperty(Config.FLUENT_SENDER_CLASS);
try {
sender = createSenderInstance(senderClassName, new Object[] { host, port, timeout, bufferCapacity });
} catch (Exception e) {
throw new RuntimeException(e);
}
}
FluentLogger logger = new FluentLogger(tagPrefix, sender);
loggers.put(logger, key);
return logger;
}
...
...
}
@SpringBootApplication
public class Application {
private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
LOGGER.info("hello world);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<springProperty scope="context" name="springAppName" source="spring.application.name"/>
<property name="CONSOLE_LOG_PATTERN"
value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-B3-ParentSpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
<encoder>
<pattern>${CONSOLE_LOG_PATTERN}</pattern>
<charset>utf8</charset>
</encoder>
</appender>
<appender name="FLUENT_TEXT_SYNC" class="ch.qos.logback.more.appenders.FluentLogbackAppender">
<tag>debug</tag>
<label>logback</label>
<remoteHost>localhost</remoteHost>
<port>24224</port>
</appender>
<appender name="FLUENT_TEXT" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>999</queueSize>
<appender-ref ref="FLUENT_TEXT_SYNC" />
</appender>
<logger name="org.springframework" level="INFO"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FLUENT_TEXT"/>
</root>
</configuration>
public class FluentLogger {
public boolean log(String tag, Map<String, Object> data, long timestamp) {
String concatTag = null;
if (tagPrefix == null || tagPrefix.length() == 0) {
concatTag = tag;
}
else {
concatTag = tagPrefix + "." + tag;
}
if (timestamp != 0) {
return sender.emit(concatTag, timestamp, data);
} else {
return sender.emit(concatTag, data);
}
}
public void flush() {
sender.flush();
}
public void close() {
if (sender != null) {
sender.flush();
sender.close();
sender = null; // set sender to null after closing FluentLogger
}
}
}
Hello,
I am wondering if this library is still maintained.
The fact that it is in the official fluent namespace seems like it would be an obvious choice,
but the stagnant development unfortunately paints a different picture. Mostly because the dependencies are severly outdated and contain CVEs.
Issue 653 of fluent/fluentd extends times to Fluent::EventTime. fluentd-logger-java apparently does not yet support this.
I think it's possible that a RawSocketSender instance has buffered some records when close() is called. In this case, the buffered records is going to be lost. So I think RawSocketSender.close() should call flush() before releasing resources.
Is this project affected by CVE-2021-44228?
Cheatsheet:
https://www.techsolvency.com/story-so-far/cve-2021-44228-log4j-log4shell/
Logback issue:
https://jira.qos.ch/browse/LOGBACK-1591?filter=-6
Log4j issue:
elastic/elasticsearch#81620
Issues in latest version (0.3.1)
public boolean log(String tag, Map<String, Object> data)
This function never returns false, even if we don't have valid host name.
flush function in RawSocketAppender class doesn't have any return type. So if there is IO Exception, this function will catch it , logs it , but no signal send to caller. Hence, log function will be returning true , even if we are not able to write. (Line no. 178-192 )
I'd like to register fluent-logger-scala into sonatype maven repository.
Currently, fluent-logger-scala depends on the upcoming version of fluent-logger-java(0.26-RC).
Do you have any ideas to register fluent-logger-java 0.26-RC version into fluentd.org maven repository?
Or, I'll register the version into sonatype maven repository.
Thanks,
Tsuyoshi
Hi
We are sending logs from the client to the in_forward input plugin. In
production we the number of connections to the in_forward port increases over
time. This results in huge memory usage on our applications.
In my initial investigation, we think that when keys get expired in [1], the
sender instance on that key doesn't close properly. So when the [2] tried to
recreate the key [2], another connection will be made. Is this a correct
interpretation on what's happening?
[1] https://github.com/fluent/fluent-logger-java/blob/master/src/main/java/org/fluentd/logger/FluentLoggerFactory.java#L36
[2] https://github.com/fluent/fluent-logger-java/blob/master/src/main/java/org/fluentd/logger/FluentLoggerFactory.java#L56
Spring reset loggers your appender restarted and sender becomes null and never initialized again.
2017-12-27 19:46:39,120 ERROR [org.fluentd.logger.sender.RawSocketSender] - <org.fluentd.logger.sender.RawSocketSender>
java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.fluentd.logger.sender.RawSocketSender.flush(RawSocketSender.java:189)
at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:177)
at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:147)
at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:129)
at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:124)
at org.fluentd.logger.FluentLogger.log(FluentLogger.java:101)
at org.fluentd.logger.FluentLogger.log(FluentLogger.java:86)
at com.hisense.hitv.api.shopping.threadpool.log.BehaviorLogSendTask.sendExtraLog(BehaviorLogSendTask.java:91)
at com.hisense.hitv.api.shopping.threadpool.log.BehaviorLogSendTask.run(BehaviorLogSendTask.java:45)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I found org.fluentd.logger.sender.RawSocketSender#getBuffer
changes the position of pendings
buffer without any lock in spite of it's a public method. It means the user can call getBuffer() when calling flush() at the same time, and the sender instance can read a wrong position of the buffer.
Also, I don't know why org.fluentd.logger.sender.Sender
has getBuffer(). I think the method shouldn't be exposed originally.
@muga What do you think? We can remove the method from org.fluentd.logger.sender.Sender
and make it be a private method?
currently it seems like it cannot forward messages to remote fluentd with secure_forward.
fluentLogger = FluentLogger.getLogger("tagprefix", "myhost", 24224)
fluentLogger.close();
fluentLogger = FluentLogger.getLogger("tagprefix", "myhost", 24224)
fluentLogger.log('mytag', 'my message'); // wont work
If you close an individual Fluentlogger
the instance retained in the FluentLoggerFactory
persists, but the Sender
will be null and cannot send out messages.
You can call FluentLogger.closeAll()
to fix the above, but this is too aggressive because it will close all loggers which may not be your intention.
Ideally there would be a FluentLogger.close(myFluentLogger)
method which would clean up the logger in the FluentLoggerFactory
I am getting below error on calling fluentlooger.log(). This is not occurring always but is happening frequently and I am not getting all of my logs. I am using version 0.3.3.
ERROR RawSocketSender: org.fluentd.logger.sender.RawSocketSender
java.net.SocketException: Connection reset
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.fluentd.logger.sender.RawSocketSender.flush(RawSocketSender.java:196)
at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:184)
at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:149)
at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:131)
at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:126)
at org.fluentd.logger.FluentLogger.log(FluentLogger.java:101)
at org.fluentd.logger.FluentLogger.log(FluentLogger.java:86)
What can be the reason?? How to fix?
Conflict with Spring cloud SpringApplication run. spring cloud because BootstrapApplicationListener.Causes Spring Application to execute many times.
and call FluentLogger stop. close sender, with sender is null.
then call log(String tag, Map<String, Object> data, long timestamp)
function Throw out NullPointerException?
in FluentLoggerFactory 58 line add this code?
public synchronized FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity,
Reconnector reconnector) {
String key = String.format("%s_%s_%d_%d_%d", new Object[] { tagPrefix, host, port, timeout, bufferCapacity });
for (Map.Entry<FluentLogger, String> entry : loggers.entrySet()) {
if (entry.getValue().equals(key)) {
FluentLogger found = entry.getKey();
if(found != null) {
if(found.getSender() == null){
found.setSender(getNewSender(host, port, timeout, bufferCapacity, reconnector));
}
return found;
}
break;
}
}
FluentLogger logger = new FluentLogger(tagPrefix, getNewSender(host, port, timeout, bufferCapacity, reconnector));
loggers.put(logger, key);
return logger;
}
private Sender getNewSender( String host, int port, int timeout, int bufferCapacity, Reconnector reconnector){
Sender sender = null;
Properties props = System.getProperties();
if (!props.containsKey(Config.FLUENT_SENDER_CLASS)) {
// create default sender object
sender = new RawSocketSender(host, port, timeout, bufferCapacity, reconnector);
} else {
String senderClassName = props.getProperty(Config.FLUENT_SENDER_CLASS);
try {
sender = createSenderInstance(senderClassName, new Object[] { host, port, timeout, bufferCapacity });
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return sender;
}
RawSocketSender#open
shows the exception when RawSocketSender
fails to connect to server. The behaviour itself has no problem. However, it can confuses us when testing. We can set Level.OFF
as an additional argument of RawSocketSender
for testing.
The log is as follows:
Running org.fluentd.logger.sender.TestRawSocketSender
2013/09/05 23:11:36 org.fluentd.logger.sender.RawSocketSender open
fatal: Failed to connect fluentd: localhost/127.0.0.1:25227
2013/09/05 23:11:36 org.fluentd.logger.sender.RawSocketSender open
fatal: Connection will be retried
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:351)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:213)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:200)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:432)
at java.net.Socket.connect(Socket.java:529)
at java.net.Socket.connect(Socket.java:478)
at org.fluentd.logger.sender.RawSocketSender.connect(RawSocketSender.java:89)
at org.fluentd.logger.sender.RawSocketSender.open(RawSocketSender.java:77)
at org.fluentd.logger.sender.RawSocketSender.<init>(RawSocketSender.java:72)
at org.fluentd.logger.sender.RawSocketSender.<init>(RawSocketSender.java:60)
at org.fluentd.logger.sender.RawSocketSender.<init>(RawSocketSender.java:56)
at org.fluentd.logger.sender.TestRawSocketSender.testNormal03(TestRawSocketSender.java:134)
It looks like there are some important updates committed to source. Any chance you could please cut a new release and push to maven?
Two questions/issues
I dont want to add Fluentd related code into my application. So, is there a way to log to fluentd by configuring the SocketAppender in log4j?
Similar to logstash config for log4j below?
https://github.com/agorski/logstash-log4j-example/blob/master/src/main/resources/log4j.xml
I have a fluentD aggregator running in AWS with configuration like this
<source>
@type forward
port 24224
<security>
self_hostname fluentd-aggregator
shared_key test_key
</security>
</source>
I didn't find any examples for passing self_hostname and shared_key in FluentLogger
How do I pass credentials from FluentLogger ?
The RawSocketSender
takes a host and port in its constructor and coverts these into an InetSocketAddress
, which tries to resolve the hostname. On RawSocketSender
connect this InetSocketAddress
is used to reconnect, but if it wasn't resolved during fluent-logger init then it will fail over and over. The code snipped from RawSocketSender
is below:
public RawSocketSender(String host, int port, int timeout, int bufferCapacity, Reconnector reconnector) {
msgpack = new MessagePack();
msgpack.register(Event.class, Event.EventTemplate.INSTANCE);
pendings = ByteBuffer.allocate(bufferCapacity);
server = new InetSocketAddress(host, port); // Create InetSocketAddress on init
this.reconnector = reconnector;
name = String.format("%s_%d_%d_%d", host, port, timeout, bufferCapacity);
this.timeout = timeout;
}
private void connect() throws IOException {
try {
socket = new Socket();
socket.connect(server, timeout); // Reconnection uses pre-resolved server field
out = new BufferedOutputStream(socket.getOutputStream());
} catch (IOException e) {
throw e;
}
}
This issue comes up when using the fluent-logger in a highly dynamic environment (like on Docker Swarm) where apps may come up before a DNS entry in Consul is even resolvable. It also means that during failover of a Fluent host (triggering a DNS change) the then failing socket connection will always try the old Fluent host IP and never re-resolve the DNS entry.
My recommendation is to store the Fluent host and port as private fields within the RawSocketSender
, not a resolved InetSocketAddress
(now stored as this.server
) and create a new InetSocketAddress
on every socket connection.
I understand there will be performance implications of this and am happy to submit a PR, but wanted to bring it up in an issue in case there were reasons for implementing the RawSocketSender
the current way.
There are two types of errors I've run into:
private static final FluentLogger LOG = FluentLogger.getLogger("audit", "10.0.0.1", 22224);
java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.fluentd.logger.sender.RawSocketSender.connect(RawSocketSender.java:85) at org.fluentd.logger.sender.RawSocketSender.reconnect(RawSocketSender.java:98) at org.fluentd.logger.sender.RawSocketSender.flush(RawSocketSender.java:206) at org.fluentd.logger.sender.RawSocketSender.send(RawSocketSender.java:195) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:157) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:139) at org.fluentd.logger.sender.RawSocketSender.emit(RawSocketSender.java:134) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:101) at org.fluentd.logger.FluentLogger.log(FluentLogger.java:86) at LoggerInfra.log(FluentdLoggerApp.java:27) at LoggerCaller.testLogger(FluentdLoggerApp.java:79) at FluentdLoggerApp.main(FluentdLoggerApp.java:112)
There error was printed to console (expected because of limited error handling #15), but the log()
call returned true
instead of false
which hid the internal failure.
Result of logic in RawSocketSender.java#L204 which simple logs the error message to LOG.error()
.
log()
call returned true and the stack trace was printed out to terminal.We noticed that the project is carrying along junit as a production dependency because of its dependency on json-simple.
Since that project appears to be dead (it's on googlecode), can we just exclude it from this project?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.