From 1b7ab6095c7810384d9621f832944cb6b7e24bc1 Mon Sep 17 00:00:00 2001 From: Lavkesh Lahngir Date: Thu, 28 Sep 2023 17:11:37 +0800 Subject: [PATCH] fix: fetch schema from cache to force refresh when no incoming traffic (#31) * bug: recreate connection when idle * chore: cleanup * chore: checkstyle * chore: logging * chore: typo in method name * chore: version bump * fixes: call fetch schema before conversion * fix: check streamwriter new schema before convert * refactor: only keep the lock while refreshing * test: fix test * fix: adding 60 second refresh even if no incoming traffic * fix: use stencil ttl for refresh --- build.gradle | 2 +- .../storage/proto/BigQueryProtoStorageClient.java | 11 +++++++++++ .../com/gotocompany/depot/message/MessageParser.java | 4 ++++ .../depot/message/proto/ProtoMessageParser.java | 7 +++++++ 4 files changed, 23 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index afdb5f31..c5ae0d53 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ plugins { } group 'com.gotocompany' -version '0.7.1' +version '0.7.2' repositories { mavenLocal() diff --git a/src/main/java/com/gotocompany/depot/bigquery/storage/proto/BigQueryProtoStorageClient.java b/src/main/java/com/gotocompany/depot/bigquery/storage/proto/BigQueryProtoStorageClient.java index f2bbb68f..d016253d 100644 --- a/src/main/java/com/gotocompany/depot/bigquery/storage/proto/BigQueryProtoStorageClient.java +++ b/src/main/java/com/gotocompany/depot/bigquery/storage/proto/BigQueryProtoStorageClient.java @@ -27,14 +27,19 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class BigQueryProtoStorageClient implements BigQueryStorageClient { + private static final long MESSAGE_PARSER_CHECKER_DELAY_MILLIS = 1000; private final BigQueryProtoWriter writer; private final BigQuerySinkConfig config; private final MessageParser parser; private final String schemaClass; private final SinkConnectorSchemaMessageMode mode; + private final ScheduledExecutorService messageParserChecker = Executors.newScheduledThreadPool(1); public BigQueryProtoStorageClient(BigQueryWriter writer, BigQuerySinkConfig config, MessageParser parser) { this.writer = (BigQueryProtoWriter) writer; @@ -43,6 +48,11 @@ public BigQueryProtoStorageClient(BigQueryWriter writer, BigQuerySinkConfig conf this.mode = config.getSinkConnectorSchemaMessageMode(); this.schemaClass = mode == SinkConnectorSchemaMessageMode.LOG_MESSAGE ? config.getSinkConnectorSchemaProtoMessageClass() : config.getSinkConnectorSchemaProtoKeyClass(); + this.messageParserChecker.scheduleWithFixedDelay( + () -> parser.refresh(schemaClass), + MESSAGE_PARSER_CHECKER_DELAY_MILLIS, + config.getSchemaRegistryStencilCacheTtlMs(), + TimeUnit.MILLISECONDS); } @@ -155,6 +165,7 @@ private void addRepeatedFields(DynamicMessage.Builder messageBuilder, Descriptor @Override public void close() throws IOException { writer.close(); + messageParserChecker.shutdownNow(); } } diff --git a/src/main/java/com/gotocompany/depot/message/MessageParser.java b/src/main/java/com/gotocompany/depot/message/MessageParser.java index f8babe0e..c0f4eef4 100644 --- a/src/main/java/com/gotocompany/depot/message/MessageParser.java +++ b/src/main/java/com/gotocompany/depot/message/MessageParser.java @@ -6,4 +6,8 @@ public interface MessageParser { ParsedMessage parse(Message message, SinkConnectorSchemaMessageMode type, String schemaClass) throws IOException; MessageSchema getSchema(String schemaClass) throws IOException; + + default void refresh(String schemaClass) { + + } } diff --git a/src/main/java/com/gotocompany/depot/message/proto/ProtoMessageParser.java b/src/main/java/com/gotocompany/depot/message/proto/ProtoMessageParser.java index 961248b9..5a1e02ae 100644 --- a/src/main/java/com/gotocompany/depot/message/proto/ProtoMessageParser.java +++ b/src/main/java/com/gotocompany/depot/message/proto/ProtoMessageParser.java @@ -82,6 +82,13 @@ public MessageSchema getSchema(String schemaClass) throws IOException { return new ProtoMessageSchema(protoField); } + @Override + public void refresh(String schemaClass) { + // this will try to fetch the descriptor for the class and reloading the cache in the process + // this is useful for batch ingestion with frequency more than TTL and possibility of new data. + stencilClient.get(schemaClass); + } + private Map getTypeNameToPackageNameMap(Map descriptors) { return descriptors.entrySet().stream() .filter(distinctByFullName(t -> t.getValue().getFullName()))