diff --git a/src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java b/src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java index 97209a655..fedbed8f4 100644 --- a/src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java +++ b/src/main/java/com/gotocompany/firehose/sink/httpv2/HttpV2SinkUtils.java @@ -8,12 +8,18 @@ public class HttpV2SinkUtils { public static void addAdditionalConfigsForHttpV2Sink(Map env) { - switch (KafkaConsumerMode.valueOf(env.getOrDefault("SOURCE_KAFKA_CONSUMER_MODE", "SYNC"))) { + System.out.println(env.getOrDefault("SOURCE_KAFKA_CONSUMER_MODE", "SYNC").toUpperCase()); + switch (KafkaConsumerMode.valueOf(env.getOrDefault("SOURCE_KAFKA_CONSUMER_MODE", "SYNC").toUpperCase())) { case SYNC: env.put("SINK_HTTPV2_MAX_CONNECTIONS", "1"); break; + case ASYNC: env.put("SINK_HTTPV2_MAX_CONNECTIONS", env.getOrDefault("SINK_POOL_NUM_THREADS", "1")); + break; + default: + throw new IllegalArgumentException("Consumer mode should be async or sync"); + } } }