diff --git a/src/main/java/org/fluentd/logger/FluentLoggerFactory.java b/src/main/java/org/fluentd/logger/FluentLoggerFactory.java index 9c2d5a5..932043e 100644 --- a/src/main/java/org/fluentd/logger/FluentLoggerFactory.java +++ b/src/main/java/org/fluentd/logger/FluentLoggerFactory.java @@ -19,6 +19,7 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; +import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.WeakHashMap; @@ -33,7 +34,7 @@ public class FluentLoggerFactory { private final Map loggers; public FluentLoggerFactory() { - loggers = new WeakHashMap(); + loggers = Collections.synchronizedMap(new WeakHashMap()); } public FluentLogger getLogger(String tagPrefix) { @@ -48,7 +49,7 @@ public FluentLogger getLogger(String tagPrefix, String host, int port, int timeo return getLogger(tagPrefix, host, port, timeout, bufferCapacity, new ExponentialDelayReconnector()); } - public synchronized FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, + public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, Reconnector reconnector) { String key = String.format("%s_%s_%d_%d_%d", new Object[] { tagPrefix, host, port, timeout, bufferCapacity }); if (loggers.containsKey(key)) { @@ -73,6 +74,21 @@ public synchronized FluentLogger getLogger(String tagPrefix, String host, int po } } + public FluentLogger getLogger(String tagPrefix, String host, int port, int timeout, int bufferCapacity, + Sender sender) { + if (sender == null) { + return getLogger(tagPrefix, host, port, timeout, bufferCapacity); + } + String key = String.format("%s_%s_%d_%d_%d_%s", new Object[] { tagPrefix, host, port, timeout, bufferCapacity, sender == null ? "null" : sender .getName() }); + if (loggers.containsKey(key)) { + return loggers.get(key); + } else { + FluentLogger logger = new FluentLogger(tagPrefix, sender); + loggers.put(key, logger); + return logger; + } + } + @SuppressWarnings("unchecked") private Sender createSenderInstance(final String className, final Object[] params) throws ClassNotFoundException, SecurityException, NoSuchMethodException, IllegalArgumentException, InstantiationException, diff --git a/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java new file mode 100644 index 0000000..b1a96eb --- /dev/null +++ b/src/main/java/org/fluentd/logger/sender/AsyncRawSocketSender.java @@ -0,0 +1,94 @@ + +package org.fluentd.logger.sender; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.fluentd.logger.sender.ExponentialDelayReconnector; +import org.fluentd.logger.sender.RawSocketSender; +import org.fluentd.logger.sender.Reconnector; +import org.fluentd.logger.sender.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author mkobyakov + * + */ +public class AsyncRawSocketSender implements Sender { + + private RawSocketSender sender; + private Reconnector reconnector; + + @SuppressWarnings("unused") + private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class); + + private static final ExecutorService flusher = Executors.newSingleThreadExecutor(); + + public AsyncRawSocketSender() { + this("localhost", 24224); + } + + public AsyncRawSocketSender(String host, int port) { + this(host, port, 3 * 1000, 8 * 1024 * 1024); + } + + public AsyncRawSocketSender(String host, int port, int timeout, + int bufferCapacity) { + this(host, port, timeout, bufferCapacity, + new ExponentialDelayReconnector()); + } + + public AsyncRawSocketSender(String host, int port, int timeout, + int bufferCapacity, Reconnector reconnector) { + this.reconnector = reconnector; + this.sender = new RawSocketSender(host, port, timeout, bufferCapacity, + reconnector); + } + + @Override + public synchronized void flush() { + final RawSocketSender sender = this.sender; + flusher.execute(new Runnable() { + @Override + public void run() { + sender.flush(); + } + }); + } + + @Override + public void close() { + sender.close(); + } + + @Override + public boolean emit(String tag, Map data) { + return emit(tag, System.currentTimeMillis() / 1000, data); + } + + @Override + public boolean emit(final String tag, final long timestamp, final Map data) { + final RawSocketSender sender = this.sender; + flusher.execute(new Runnable() { + @Override + public void run() { + sender.emit(tag, timestamp, data); + } + }); + + return sender.isConnected() || reconnector.enableReconnection(System.currentTimeMillis()); + } + + @Override + public String getName() { + return sender.getName(); + } + + @Override + public boolean isConnected() { + return sender.isConnected(); + } +}