diff --git a/timbermill-java/timbermill-local/src/main/java/com/datorama/oss/timbermill/common/ElasticsearchUtil.java b/timbermill-java/timbermill-local/src/main/java/com/datorama/oss/timbermill/common/ElasticsearchUtil.java index 2a4e8e2a..00fd586a 100644 --- a/timbermill-java/timbermill-local/src/main/java/com/datorama/oss/timbermill/common/ElasticsearchUtil.java +++ b/timbermill-java/timbermill-local/src/main/java/com/datorama/oss/timbermill/common/ElasticsearchUtil.java @@ -10,6 +10,7 @@ import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.BlockingQueue; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static com.datorama.oss.timbermill.TaskIndexer.logErrorInEventsMap; @@ -304,16 +305,23 @@ public class ElasticsearchUtil { private static final String OLD_SUFFIX = "old"; private static final Set envsSet = Sets.newConcurrentHashSet(); + private static final Pattern metadataPatten = Pattern.compile("metadata.*"); + private static Pattern notToSkipRegexPattern; public static Set getEnvSet() { return envsSet; } public static void drainAndIndex(BlockingQueue eventsQueue, TaskIndexer taskIndexer, int maxElement) { + drainAndIndex(eventsQueue, taskIndexer, maxElement, false, ".*"); + } + + public static void drainAndIndex(BlockingQueue eventsQueue, TaskIndexer taskIndexer, int maxElement, boolean skipEventsAtDrainFlag, String notToSkipRegex) { while (!eventsQueue.isEmpty()) { try { - Collection events = new ArrayList<>(); - eventsQueue.drainTo(events, maxElement); + Collection unfilteredEvents = new ArrayList<>(); + eventsQueue.drainTo(unfilteredEvents, maxElement); + Collection events = filterEvents(unfilteredEvents, skipEventsAtDrainFlag, notToSkipRegex); KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withoutTags().decrement(events.size()); logErrorInEventsMap(events.stream().filter(event -> event.getTaskId() != null).collect(Collectors.groupingBy(Event::getTaskId)), "drainAndIndex"); @@ -346,6 +354,31 @@ public static void drainAndIndex(BlockingQueue eventsQueue, TaskIndexer t } } + private static Collection filterEvents(Collection unfilteredEvents, boolean skipEventsAtDrainFlag, String notToSkipRegex) { + if (skipEventsAtDrainFlag) { + return unfilteredEvents.stream() + .filter(event-> shouldKeep(event.getName(), event.getTaskId(), notToSkipRegex)) + .collect(Collectors.toList()); + } + return unfilteredEvents; + } + + private static boolean shouldKeep(String eventName, String eventId, String notToSkipRegex) { + if (metadataPatten.matcher(eventName).matches()) { + return true; + } + if (notToSkipRegexPattern == null) { + notToSkipRegexPattern = Pattern.compile(notToSkipRegex); + } + boolean match = notToSkipRegexPattern.matcher(eventName).matches(); + if (match) { + LOG.info("skipEvents | keeping task {} task id: {} at drain", eventName, eventId); + return true; + } + LOG.debug("skipEvents | skipping task {} task id: {} at drain", eventName, eventId); + return false; + } + public static long getTimesDuration(ZonedDateTime taskIndexerStartTime, ZonedDateTime taskIndexerEndTime) { return ChronoUnit.MILLIS.between(taskIndexerStartTime, taskIndexerEndTime); } diff --git a/timbermill-java/timbermill-server/src/main/java/com/datorama/timbermill/server/service/TimbermillService.java b/timbermill-java/timbermill-server/src/main/java/com/datorama/timbermill/server/service/TimbermillService.java index 5d7ee307..9fc7231a 100644 --- a/timbermill-java/timbermill-server/src/main/java/com/datorama/timbermill/server/service/TimbermillService.java +++ b/timbermill-java/timbermill-server/src/main/java/com/datorama/timbermill/server/service/TimbermillService.java @@ -51,7 +51,8 @@ public class TimbermillService { private static Pattern notToSkipRegexPattern = null; private static Pattern metadataPatten = Pattern.compile("metadata.*"); - private String skipEventsFlag; + private boolean skipEventsAtInsertFlag; + private boolean skipEventsAtDrainFlag; private String notToSkipRegex; @@ -113,7 +114,8 @@ public TimbermillService(@Value("${INDEX_BULK_SIZE:200000}") Integer indexBulkSi @Value("${LIMIT_FOR_PERIOD:30000}") int limitForPeriod, @Value("${LIMIT_REFRESH_PERIOD_MINUTES:1}") int limitRefreshPeriod, @Value("${RATE_LIMITER_CAPACITY:1000000}") int rateLimiterCapacity, - @Value("${skip.events.flag:false}") String skipEventsFlag, + @Value("${skip.events.at.insert.flag:false}") boolean skipEventsAtInsertFlag, + @Value("${skip.events.at.drain.flag:false}") boolean skipEventsAtDrainFlag, @Value("${not.to.skip.events.regex:.*}") String notToSkipRegex){ @@ -121,7 +123,8 @@ public TimbermillService(@Value("${INDEX_BULK_SIZE:200000}") Integer indexBulkSi overflowedQueue = new LinkedBlockingQueue<>(overFlowedQueueCapacity); terminationTimeout = terminationTimeoutSeconds * 1000; - this.skipEventsFlag = skipEventsFlag; + this.skipEventsAtInsertFlag = skipEventsAtInsertFlag; + this.skipEventsAtDrainFlag = skipEventsAtDrainFlag; this.notToSkipRegex = notToSkipRegex; RedisService redisService = null; @@ -168,7 +171,7 @@ private void startWorkingThread() { Thread workingThread = new Thread(() -> { LOG.info("Timbermill has started"); while (keepRunning) { - ElasticsearchUtil.drainAndIndex(eventsQueue, taskIndexer, eventsMaxElement); + ElasticsearchUtil.drainAndIndex(eventsQueue, taskIndexer, eventsMaxElement, skipEventsAtDrainFlag, notToSkipRegex); } stoppedRunning = true; }); @@ -210,8 +213,8 @@ void handleEvents(Collection events) { } } - private Boolean shouldKeep(Event event) { - if (Boolean.parseBoolean(skipEventsFlag)) { + private boolean shouldKeep(Event event) { + if (skipEventsAtInsertFlag) { if (metadataPatten.matcher(event.getName()).matches()) { return true; } @@ -220,9 +223,11 @@ private Boolean shouldKeep(Event event) { } boolean match = notToSkipRegexPattern.matcher(event.getName()).matches(); if (match) { - LOG.info("skipEvents | keeping task {} task id: {}", event.getName(), event.getTaskId()); + LOG.info("skipEvents | keeping task {} task id: {} at insert", event.getName(), event.getTaskId()); + return true; } - return match; + LOG.debug("skipEvents | skipping task {} task id: {} at insert", event.getName(), event.getTaskId()); + return false; } return true; } diff --git a/timbermill-java/timbermill-server/src/main/resources/application.properties b/timbermill-java/timbermill-server/src/main/resources/application.properties index 0dcba477..30ba48ae 100644 --- a/timbermill-java/timbermill-server/src/main/resources/application.properties +++ b/timbermill-java/timbermill-server/src/main/resources/application.properties @@ -9,5 +9,6 @@ management.health.elasticsearch.enabled=false management.endpoint.health.show-details=always PLUGINS_JSON=[{"class":"SwitchCasePlugin","taskMatcher":{"name":"Eventplugin"},"searchField":"exception","outputAttribute":"errorType","switchCase":[{"match":["TOO_MANY_SERVER_ROWS"],"output":"TOO_MANY_SERVER_ROWS"}]}] -#skip.events.flag = true +#skip.events.at.insert.flag = true +#skip.events.at.drain.flag = false #not.to.skip.events.regex = "(account_analytics.*)|(page_view.*)|(validate_login)|(last_workspace_update)|(interactive_dingo_query)|(widget_init)|(widget_rendered)|(page_load)|(iframe_route_states_log)|(workspace_analytics_average_query_load_time)|(top_workspace_analytics_orphan_count)|(workspace_analytics_orphan_count)" \ No newline at end of file diff --git a/timbermill-java/timbermill-server/src/test/java/com/datorama/timbermill/server/service/TimbermillServiceTest.java b/timbermill-java/timbermill-server/src/test/java/com/datorama/timbermill/server/service/TimbermillServiceTest.java index 29030afe..286599bc 100644 --- a/timbermill-java/timbermill-server/src/test/java/com/datorama/timbermill/server/service/TimbermillServiceTest.java +++ b/timbermill-java/timbermill-server/src/test/java/com/datorama/timbermill/server/service/TimbermillServiceTest.java @@ -56,7 +56,7 @@ public void init() { @TimberLogTask(name = EVENT) public void testHandleEventsOneMatchingTask() { - ReflectionTestUtils.setField(service, "skipEventsFlag", "true"); + ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", true); ReflectionTestUtils.setField(service, "notToSkipRegex", ".*keep.*"); Event event = new StartEvent(TimberLogger.getCurrentTaskId(), KEEP, new LogParams(), null); @@ -68,7 +68,7 @@ public void testHandleEventsOneMatchingTask() { @TimberLogTask(name = EVENT) public void testHandleEventsOneNotMatchingTask() { - ReflectionTestUtils.setField(service, "skipEventsFlag", "true"); + ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", true); ReflectionTestUtils.setField(service, "notToSkipRegex", ".*keep.*"); Event event = new StartEvent(TimberLogger.getCurrentTaskId(), SKIP, new LogParams(), null); @@ -82,7 +82,7 @@ public void testHandleEventsOneNotMatchingTask() { public void testHandleEventsMultipleTasks() { - ReflectionTestUtils.setField(service, "skipEventsFlag", "true"); + ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", true); ReflectionTestUtils.setField(service, "notToSkipRegex", ".keep.*"); Event eventToKeep1 = new StartEvent(TimberLogger.getCurrentTaskId(), KEEP + "1", new LogParams(), null); @@ -103,7 +103,7 @@ public void testHandleEventsMultipleTasks() { @TimberLogTask(name = EVENT) public void testHandleEventsMultipleTasksFlagClosed() { - ReflectionTestUtils.setField(service, "skipEventsFlag", "false"); + ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", false); ReflectionTestUtils.setField(service, "notToSkipRegex", ".keep.*"); Event eventToKeep1 = new StartEvent(TimberLogger.getCurrentTaskId(), KEEP + "1", new LogParams(), null);