Skip to content

Commit

Permalink
W-13716428 filter events after the queue (#591)
Browse files Browse the repository at this point in the history
* filter events after the queue
  • Loading branch information
valeriaares authored Aug 3, 2023
1 parent a4212a2 commit 6672456
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static com.datorama.oss.timbermill.TaskIndexer.logErrorInEventsMap;
Expand Down Expand Up @@ -304,16 +305,23 @@ public class ElasticsearchUtil {
private static final String OLD_SUFFIX = "old";

private static final Set<String> envsSet = Sets.newConcurrentHashSet();
private static final Pattern metadataPatten = Pattern.compile("metadata.*");
private static Pattern notToSkipRegexPattern;

public static Set<String> getEnvSet() {
return envsSet;
}

public static void drainAndIndex(BlockingQueue<Event> eventsQueue, TaskIndexer taskIndexer, int maxElement) {
drainAndIndex(eventsQueue, taskIndexer, maxElement, false, ".*");
}

public static void drainAndIndex(BlockingQueue<Event> eventsQueue, TaskIndexer taskIndexer, int maxElement, boolean skipEventsAtDrainFlag, String notToSkipRegex) {
while (!eventsQueue.isEmpty()) {
try {
Collection<Event> events = new ArrayList<>();
eventsQueue.drainTo(events, maxElement);
Collection<Event> unfilteredEvents = new ArrayList<>();
eventsQueue.drainTo(unfilteredEvents, maxElement);
Collection<Event> events = filterEvents(unfilteredEvents, skipEventsAtDrainFlag, notToSkipRegex);
KamonConstants.MESSAGES_IN_INPUT_QUEUE_RANGE_SAMPLER.withoutTags().decrement(events.size());
logErrorInEventsMap(events.stream().filter(event -> event.getTaskId() != null).collect(Collectors.groupingBy(Event::getTaskId)), "drainAndIndex");

Expand Down Expand Up @@ -346,6 +354,31 @@ public static void drainAndIndex(BlockingQueue<Event> eventsQueue, TaskIndexer t
}
}

private static Collection<Event> filterEvents(Collection<Event> unfilteredEvents, boolean skipEventsAtDrainFlag, String notToSkipRegex) {
if (skipEventsAtDrainFlag) {
return unfilteredEvents.stream()
.filter(event-> shouldKeep(event.getName(), event.getTaskId(), notToSkipRegex))
.collect(Collectors.toList());
}
return unfilteredEvents;
}

private static boolean shouldKeep(String eventName, String eventId, String notToSkipRegex) {
if (metadataPatten.matcher(eventName).matches()) {
return true;
}
if (notToSkipRegexPattern == null) {
notToSkipRegexPattern = Pattern.compile(notToSkipRegex);
}
boolean match = notToSkipRegexPattern.matcher(eventName).matches();
if (match) {
LOG.info("skipEvents | keeping task {} task id: {} at drain", eventName, eventId);
return true;
}
LOG.debug("skipEvents | skipping task {} task id: {} at drain", eventName, eventId);
return false;
}

public static long getTimesDuration(ZonedDateTime taskIndexerStartTime, ZonedDateTime taskIndexerEndTime) {
return ChronoUnit.MILLIS.between(taskIndexerStartTime, taskIndexerEndTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public class TimbermillService {
private static Pattern notToSkipRegexPattern = null;
private static Pattern metadataPatten = Pattern.compile("metadata.*");

private String skipEventsFlag;
private boolean skipEventsAtInsertFlag;
private boolean skipEventsAtDrainFlag;
private String notToSkipRegex;


Expand Down Expand Up @@ -113,15 +114,17 @@ public TimbermillService(@Value("${INDEX_BULK_SIZE:200000}") Integer indexBulkSi
@Value("${LIMIT_FOR_PERIOD:30000}") int limitForPeriod,
@Value("${LIMIT_REFRESH_PERIOD_MINUTES:1}") int limitRefreshPeriod,
@Value("${RATE_LIMITER_CAPACITY:1000000}") int rateLimiterCapacity,
@Value("${skip.events.flag:false}") String skipEventsFlag,
@Value("${skip.events.at.insert.flag:false}") boolean skipEventsAtInsertFlag,
@Value("${skip.events.at.drain.flag:false}") boolean skipEventsAtDrainFlag,
@Value("${not.to.skip.events.regex:.*}") String notToSkipRegex){


eventsQueue = new LinkedBlockingQueue<>(eventsQueueCapacity);
overflowedQueue = new LinkedBlockingQueue<>(overFlowedQueueCapacity);
terminationTimeout = terminationTimeoutSeconds * 1000;

this.skipEventsFlag = skipEventsFlag;
this.skipEventsAtInsertFlag = skipEventsAtInsertFlag;
this.skipEventsAtDrainFlag = skipEventsAtDrainFlag;
this.notToSkipRegex = notToSkipRegex;

RedisService redisService = null;
Expand Down Expand Up @@ -168,7 +171,7 @@ private void startWorkingThread() {
Thread workingThread = new Thread(() -> {
LOG.info("Timbermill has started");
while (keepRunning) {
ElasticsearchUtil.drainAndIndex(eventsQueue, taskIndexer, eventsMaxElement);
ElasticsearchUtil.drainAndIndex(eventsQueue, taskIndexer, eventsMaxElement, skipEventsAtDrainFlag, notToSkipRegex);
}
stoppedRunning = true;
});
Expand Down Expand Up @@ -210,8 +213,8 @@ void handleEvents(Collection<Event> events) {
}
}

private Boolean shouldKeep(Event event) {
if (Boolean.parseBoolean(skipEventsFlag)) {
private boolean shouldKeep(Event event) {
if (skipEventsAtInsertFlag) {
if (metadataPatten.matcher(event.getName()).matches()) {
return true;
}
Expand All @@ -220,9 +223,11 @@ private Boolean shouldKeep(Event event) {
}
boolean match = notToSkipRegexPattern.matcher(event.getName()).matches();
if (match) {
LOG.info("skipEvents | keeping task {} task id: {}", event.getName(), event.getTaskId());
LOG.info("skipEvents | keeping task {} task id: {} at insert", event.getName(), event.getTaskId());
return true;
}
return match;
LOG.debug("skipEvents | skipping task {} task id: {} at insert", event.getName(), event.getTaskId());
return false;
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ management.health.elasticsearch.enabled=false
management.endpoint.health.show-details=always
PLUGINS_JSON=[{"class":"SwitchCasePlugin","taskMatcher":{"name":"Eventplugin"},"searchField":"exception","outputAttribute":"errorType","switchCase":[{"match":["TOO_MANY_SERVER_ROWS"],"output":"TOO_MANY_SERVER_ROWS"}]}]

#skip.events.flag = true
#skip.events.at.insert.flag = true
#skip.events.at.drain.flag = false
#not.to.skip.events.regex = "(account_analytics.*)|(page_view.*)|(validate_login)|(last_workspace_update)|(interactive_dingo_query)|(widget_init)|(widget_rendered)|(page_load)|(iframe_route_states_log)|(workspace_analytics_average_query_load_time)|(top_workspace_analytics_orphan_count)|(workspace_analytics_orphan_count)"
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void init() {
@TimberLogTask(name = EVENT)
public void testHandleEventsOneMatchingTask() {

ReflectionTestUtils.setField(service, "skipEventsFlag", "true");
ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", true);
ReflectionTestUtils.setField(service, "notToSkipRegex", ".*keep.*");

Event event = new StartEvent(TimberLogger.getCurrentTaskId(), KEEP, new LogParams(), null);
Expand All @@ -68,7 +68,7 @@ public void testHandleEventsOneMatchingTask() {
@TimberLogTask(name = EVENT)
public void testHandleEventsOneNotMatchingTask() {

ReflectionTestUtils.setField(service, "skipEventsFlag", "true");
ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", true);
ReflectionTestUtils.setField(service, "notToSkipRegex", ".*keep.*");

Event event = new StartEvent(TimberLogger.getCurrentTaskId(), SKIP, new LogParams(), null);
Expand All @@ -82,7 +82,7 @@ public void testHandleEventsOneNotMatchingTask() {
public void testHandleEventsMultipleTasks() {


ReflectionTestUtils.setField(service, "skipEventsFlag", "true");
ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", true);
ReflectionTestUtils.setField(service, "notToSkipRegex", ".keep.*");

Event eventToKeep1 = new StartEvent(TimberLogger.getCurrentTaskId(), KEEP + "1", new LogParams(), null);
Expand All @@ -103,7 +103,7 @@ public void testHandleEventsMultipleTasks() {
@TimberLogTask(name = EVENT)
public void testHandleEventsMultipleTasksFlagClosed() {

ReflectionTestUtils.setField(service, "skipEventsFlag", "false");
ReflectionTestUtils.setField(service, "skipEventsAtInsertFlag", false);
ReflectionTestUtils.setField(service, "notToSkipRegex", ".keep.*");

Event eventToKeep1 = new StartEvent(TimberLogger.getCurrentTaskId(), KEEP + "1", new LogParams(), null);
Expand Down

0 comments on commit 6672456

Please sign in to comment.