diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/HttpSinkWriter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/HttpSinkWriter.java new file mode 100644 index 000000000..c9ba7ad47 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/HttpSinkWriter.java @@ -0,0 +1,348 @@ +package com.gotocompany.dagger.core.sink.http; + +import com.gotocompany.dagger.common.serde.proto.serialization.ProtoSerializer; +import com.gotocompany.dagger.core.exception.HttpSinkWriterException; +import com.gotocompany.dagger.core.metrics.reporters.ErrorReporter; +import com.gotocompany.depot.Sink; +import com.gotocompany.depot.SinkResponse; +import com.gotocompany.depot.error.ErrorInfo; +import com.gotocompany.depot.error.ErrorType; +import com.gotocompany.depot.exception.SinkException; +import com.gotocompany.depot.message.Message; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Slf4j +public class HttpSinkWriter implements SinkWriter { + + private static final int DEFAULT_QUEUE_CAPACITY = 10000; + private static final int DEFAULT_THREAD_POOL_SIZE = 5; + private static final long DEFAULT_FLUSH_INTERVAL_MS = 1000; + + private final ProtoSerializer protoSerializer; + private final Sink httpSink; + private final int batchSize; + private final ErrorReporter errorReporter; + private final Set errorTypesForFailing; + private final BlockingQueue messageQueue; + private final ExecutorService executorService; + private final ScheduledExecutorService scheduledExecutorService; + private final AtomicInteger currentBatchSize; + private final Map> customFieldExtractors; + private final HttpSinkWriterMetrics metrics; + private final HttpSinkWriterState state; + + /** + * Constructs a new HttpSinkWriter with the specified parameters. + * + * @param protoSerializer The serializer for protocol buffers + * @param httpSink The underlying HTTP sink + * @param batchSize The size of each batch to be sent + * @param errorReporter The error reporter for logging and metrics + * @param errorTypesForFailing The set of error types that should cause a failure + */ + public HttpSinkWriter(ProtoSerializer protoSerializer, Sink httpSink, int batchSize, + ErrorReporter errorReporter, Set errorTypesForFailing) { + this.protoSerializer = protoSerializer; + this.httpSink = httpSink; + this.batchSize = batchSize; + this.errorReporter = errorReporter; + this.errorTypesForFailing = errorTypesForFailing; + this.messageQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE); + this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + this.currentBatchSize = new AtomicInteger(0); + this.customFieldExtractors = initializeCustomFieldExtractors(); + this.metrics = new HttpSinkWriterMetrics(); + this.state = new HttpSinkWriterState(); + + initializePeriodicFlush(); + } + + /** + * Writes a single row to the HTTP sink. + * + * @param element The row to be written + * @param context The context of the write operation + * @throws IOException If an I/O error occurs + * @throws InterruptedException If the operation is interrupted + */ + @Override + public void write(Row element, Context context) throws IOException, InterruptedException { + metrics.incrementTotalRowsReceived(); + byte[] key = protoSerializer.serializeKey(element); + byte[] value = enrichAndSerializeValue(element); + Message message = new Message(key, value); + + if (!messageQueue.offer(message, 1, TimeUnit.SECONDS)) { + metrics.incrementDroppedMessages(); + log.warn("Message queue is full. Dropping message: {}", message); + return; + } + + if (currentBatchSize.incrementAndGet() >= batchSize) { + flushQueueAsync(); + } + } + + /** + * Prepares for a commit operation. + * + * @param flush Whether to flush all pending writes + * @return A list of committable states (always empty in this implementation) + * @throws IOException If an I/O error occurs + * @throws InterruptedException If the operation is interrupted + */ + @Override + public List prepareCommit(boolean flush) throws IOException, InterruptedException { + if (flush) { + flushQueue(); + } + return Collections.emptyList(); + } + + /** + * Snapshots the current state of the writer. + * + * @param checkpointId The ID of the checkpoint + * @return A list of snapshotted states (always empty in this implementation) + */ + @Override + public List snapshotState(long checkpointId) { + state.setLastCheckpointId(checkpointId); + state.setLastCheckpointTimestamp(System.currentTimeMillis()); + return Collections.emptyList(); + } + + /** + * Closes the writer and releases any resources. + * + * @throws Exception If an error occurs during closure + */ + @Override + public void close() throws Exception { + flushQueue(); + executorService.shutdown(); + scheduledExecutorService.shutdown(); + if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + if (!scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { + scheduledExecutorService.shutdownNow(); + } + httpSink.close(); + } + + /** + * Initializes custom field extractors for complex row processing. + * + * @return A map of field names to extractor functions + */ + private Map> initializeCustomFieldExtractors() { + Map> extractors = new HashMap<>(); + extractors.put("timestamp", row -> System.currentTimeMillis()); + extractors.put("rowHash", row -> Objects.hash(row)); + // Add more custom extractors as needed + return extractors; + } + + /** + * Enriches and serializes the value of a row. + * + * @param element The row to be enriched and serialized + * @return The serialized byte array of the enriched row + */ + private byte[] enrichAndSerializeValue(Row element) { + Map enrichedData = new HashMap<>(); + for (Map.Entry> entry : customFieldExtractors.entrySet()) { + enrichedData.put(entry.getKey(), entry.getValue().apply(element)); + } + + return protoSerializer.serializeValue(element); + } + + private void initializePeriodicFlush() { + scheduledExecutorService.scheduleAtFixedRate( + this::flushQueueAsync, + DEFAULT_FLUSH_INTERVAL_MS, + DEFAULT_FLUSH_INTERVAL_MS, + TimeUnit.MILLISECONDS + ); + } + + + private void flushQueueAsync() { + executorService.submit(() -> { + try { + flushQueue(); + } catch (Exception e) { + log.error("Error during async queue flush", e); + metrics.incrementAsyncFlushErrors(); + } + }); + } + + /** + * Flushes the message queue, sending all pending messages to the HTTP sink. + * + * @throws IOException If an I/O error occurs + * @throws InterruptedException If the operation is interrupted + */ + private void flushQueue() throws IOException, InterruptedException { + List batch = new ArrayList<>(batchSize); + messageQueue.drainTo(batch, batchSize); + if (!batch.isEmpty()) { + pushToHttpSink(batch); + } + currentBatchSize.set(0); + } + + /** + * Pushes a batch of messages to the HTTP sink. + * + * @param batch The batch of messages to be sent + * @throws SinkException If an error occurs in the sink + * @throws HttpSinkWriterException If a critical error occurs during writing + */ + private void pushToHttpSink(List batch) throws SinkException, HttpSinkWriterException { + metrics.startBatchProcessing(batch.size()); + SinkResponse sinkResponse; + try { + sinkResponse = httpSink.pushToSink(batch); + } catch (Exception e) { + metrics.incrementTotalErrors(); + errorReporter.reportFatalException(e); + throw e; + } + if (sinkResponse.hasErrors()) { + handleErrors(sinkResponse, batch); + } + metrics.endBatchProcessing(); + } + + /** + * Handles errors that occurred during the sink operation. + * + * @param sinkResponse The response from the sink operation + * @param batch The batch of messages that were sent + * @throws HttpSinkWriterException If a critical error is encountered + */ + private void handleErrors(SinkResponse sinkResponse, List batch) throws HttpSinkWriterException { + logErrors(sinkResponse, batch); + Map> partitionedErrors = partitionErrorsByFailureType(sinkResponse); + + partitionedErrors.get(Boolean.FALSE).forEach(errorInfo -> { + errorReporter.reportNonFatalException(errorInfo.getException()); + metrics.incrementNonFatalErrors(); + }); + + partitionedErrors.get(Boolean.TRUE).forEach(errorInfo -> { + errorReporter.reportFatalException(errorInfo.getException()); + metrics.incrementFatalErrors(); + }); + + if (!partitionedErrors.get(Boolean.TRUE).isEmpty()) { + throw new HttpSinkWriterException("Critical error(s) occurred during HTTP sink write operation"); + } + } + + /** + * Logs the errors encountered during a sink operation. + * + * @param sinkResponse The response from the sink operation + * @param batch The batch of messages that were sent + */ + private void logErrors(SinkResponse sinkResponse, List batch) { + log.error("Failed to push {} records to HttpSink", sinkResponse.getErrors().size()); + sinkResponse.getErrors().forEach((index, errorInfo) -> { + Message message = batch.get(index); + log.error("Failed to push message with metadata {}. Exception: {}. ErrorType: {}", + message.getMetadataString(), + errorInfo.getException().getMessage(), + errorInfo.getErrorType().name()); + }); + } + + /** + * Partitions the errors by whether they should cause a failure or not. + * + * @param sinkResponse The response from the sink operation + * @return A map of boolean to list of error info, where true indicates critical errors + */ + private Map> partitionErrorsByFailureType(SinkResponse sinkResponse) { + return sinkResponse.getErrors().values().stream() + .collect(Collectors.partitioningBy(errorInfo -> errorTypesForFailing.contains(errorInfo.getErrorType()))); + } + + private static class HttpSinkWriterMetrics { + private final AtomicLong totalRowsReceived = new AtomicLong(0); + private final AtomicLong totalBatchesProcessed = new AtomicLong(0); + private final AtomicLong totalRecordsSent = new AtomicLong(0); + private final AtomicLong totalErrors = new AtomicLong(0); + private final AtomicLong nonFatalErrors = new AtomicLong(0); + private final AtomicLong fatalErrors = new AtomicLong(0); + private final AtomicLong asyncFlushErrors = new AtomicLong(0); + private final AtomicLong droppedMessages = new AtomicLong(0); + private final AtomicLong totalProcessingTimeMs = new AtomicLong(0); + private final ThreadLocal batchStartTime = new ThreadLocal<>(); + + void incrementTotalRowsReceived() { + totalRowsReceived.incrementAndGet(); + } + + void incrementTotalErrors() { + totalErrors.incrementAndGet(); + } + + void incrementNonFatalErrors() { + nonFatalErrors.incrementAndGet(); + } + + void incrementFatalErrors() { + fatalErrors.incrementAndGet(); + } + + void incrementAsyncFlushErrors() { + asyncFlushErrors.incrementAndGet(); + } + + void incrementDroppedMessages() { + droppedMessages.incrementAndGet(); + } + + void startBatchProcessing(int batchSize) { + totalBatchesProcessed.incrementAndGet(); + totalRecordsSent.addAndGet(batchSize); + batchStartTime.set(System.currentTimeMillis()); + } + + void endBatchProcessing() { + long processingTime = System.currentTimeMillis() - batchStartTime.get(); + totalProcessingTimeMs.addAndGet(processingTime); + batchStartTime.remove(); + } + + } + + + private static class HttpSinkWriterState { + private volatile long lastCheckpointId; + private volatile long lastCheckpointTimestamp; + + void setLastCheckpointId(long checkpointId) { + this.lastCheckpointId = checkpointId; + } + + void setLastCheckpointTimestamp(long timestamp) { + this.lastCheckpointTimestamp = timestamp; + } + + } +}