diff --git a/src/test/java/org/fluentd/logger/TestFluentLogger.java b/src/test/java/org/fluentd/logger/TestFluentLogger.java index 3632986..88f8a5d 100644 --- a/src/test/java/org/fluentd/logger/TestFluentLogger.java +++ b/src/test/java/org/fluentd/logger/TestFluentLogger.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.net.Socket; import java.util.*; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -448,4 +449,63 @@ public void run() { assertEquals((i * LOOP * (N - i)), (long)counters.get(i)); } } + + @Test + public void testFlushOnClose() throws Exception { + // start mock fluentd + int port = MockFluentd.randomPort(); + String host = "localhost"; + final List elist = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(1); + MockFluentd fluentd = new MockFluentd(port, new MockFluentd.MockProcess() { + public void process(MessagePack msgpack, Socket socket) throws IOException { + BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); + try { + Unpacker unpacker = msgpack.createUnpacker(in); + while (true) { + Event e = unpacker.read(Event.class); + elist.add(e); + latch.countDown(); + } + //socket.close(); + } catch (EOFException e) { + // ignore + } + } + }); + + FixedThreadManager threadManager = new FixedThreadManager(1); + + // start loggers + FluentLogger logger = FluentLogger.getLogger("prefix", host, port); + { + Map data = new HashMap(); + data.put("k", "v"); + // Fluentd hasn't started yet and the record will be buffered. + logger.log("tag", data); + } + + threadManager.submit(fluentd); + Thread.sleep(1000); + + // close loggers and it should flush the buffer + logger.close(); + + // wait for fluentd's getting at least one kv pair + latch.await(3, TimeUnit.SECONDS); + + // close mock fluentd + fluentd.close(); + + // wait for unpacking event data on fluentd + threadManager.join(); + + // check data + assertEquals(1, elist.size()); + Event ev = elist.get(0); + assertEquals("prefix.tag", ev.tag); + assertEquals(1, ev.data.size()); + assertTrue(ev.data.containsKey("k")); + assertTrue(ev.data.containsValue("v")); + } }