diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/PendingRequest.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/PendingRequest.java new file mode 100644 index 000000000..0e99af35e --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/sink/http/PendingRequest.java @@ -0,0 +1,410 @@ +import com.gotocompany.depot.config.HttpSinkConfig; +import com.gotocompany.depot.Sink; +import com.gotocompany.depot.SinkResponse; +import com.gotocompany.depot.message.Message; +import com.gotocompany.depot.error.ErrorInfo; +import com.gotocompany.depot.exception.SinkException; +import com.gotocompany.depot.http.HttpSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Tag; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Queue; +import java.util.LinkedList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.UUID; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.concurrent.ThreadLocalRandom; + +public class PendingRequest { + private static final Logger LOGGER = LoggerFactory.getLogger(PendingRequest.class); + + private final HttpSinkConfig config; + private final CompletableFuture future; + private final List messages; + private final AtomicInteger retryCount; + private final long creationTime; + private final String requestId; + private final Map metadata; + private final ReentrantLock lock; + private final Condition retryCondition; + private final Queue retryStrategies; + private final ConcurrentHashMap context; + private final AtomicReference state; + private final ScheduledExecutorService scheduler; + private final HttpSink httpSink; + private final Consumer onCompletionCallback; + private final Predicate retryPredicate; + private final CircuitBreaker circuitBreaker; + private final MeterRegistry meterRegistry; + private final Timer executionTimer; + private final Counter retryCounter; + private final Counter successCounter; + private final Counter failureCounter; + + private enum RequestState { + PENDING, IN_PROGRESS, COMPLETED, FAILED + } + + private interface RetryStrategy { + long getNextRetryDelay(int retryCount); + } + + private class ExponentialBackoffStrategy implements RetryStrategy { + @Override + public long getNextRetryDelay(int retryCount) { + return (long) (Math.pow(2, retryCount) * config.getBackoffMultiplier() * (1 + ThreadLocalRandom.current().nextDouble() * config.getJitterFactor())); + } + } + + private class LinearBackoffStrategy implements RetryStrategy { + @Override + public long getNextRetryDelay(int retryCount) { + return retryCount * config.getBackoffMultiplier() * (1 + ThreadLocalRandom.current().nextDouble() * config.getJitterFactor()); + } + } + + private class CircuitBreaker { + private final AtomicInteger failureCount = new AtomicInteger(0); + private final AtomicReference state = new AtomicReference<>(CircuitState.CLOSED); + private final long resetTimeout; + private volatile long lastFailureTime; + + private enum CircuitState { + CLOSED, OPEN, HALF_OPEN + } + + public CircuitBreaker(long resetTimeout) { + this.resetTimeout = resetTimeout; + } + + public boolean allowRequest() { + CircuitState currentState = state.get(); + if (currentState == CircuitState.CLOSED) { + return true; + } else if (currentState == CircuitState.OPEN) { + if (System.currentTimeMillis() - lastFailureTime > resetTimeout) { + if (state.compareAndSet(CircuitState.OPEN, CircuitState.HALF_OPEN)) { + LOGGER.info("Circuit breaker transitioning to HALF_OPEN state for request: {}", requestId); + return true; + } + } + return false; + } else { + return true; + } + } + + public void recordSuccess() { + failureCount.set(0); + state.set(CircuitState.CLOSED); + LOGGER.debug("Circuit breaker recorded success for request: {}", requestId); + } + + public void recordFailure() { + failureCount.incrementAndGet(); + lastFailureTime = System.currentTimeMillis(); + if (failureCount.get() > config.getMaxRetries()) { + state.set(CircuitState.OPEN); + LOGGER.warn("Circuit breaker opened for request: {}", requestId); + } + } + } + + private PendingRequest(Builder builder) { + this.config = builder.config; + this.future = new CompletableFuture<>(); + this.messages = builder.messages; + this.retryCount = new AtomicInteger(0); + this.creationTime = System.currentTimeMillis(); + this.requestId = UUID.randomUUID().toString(); + this.metadata = new ConcurrentHashMap<>(builder.metadata); + this.lock = new ReentrantLock(); + this.retryCondition = lock.newCondition(); + this.retryStrategies = new LinkedList<>(builder.retryStrategies); + this.context = new ConcurrentHashMap<>(); + this.state = new AtomicReference<>(RequestState.PENDING); + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + this.httpSink = builder.httpSink; + this.onCompletionCallback = builder.onCompletionCallback; + this.retryPredicate = builder.retryPredicate; + this.circuitBreaker = new CircuitBreaker(config.getCircuitBreakerResetTimeout()); + this.meterRegistry = builder.meterRegistry; + + List tags = List.of(Tag.of("requestId", requestId)); + this.executionTimer = Timer.builder("http.request.execution") + .tags(tags) + .register(meterRegistry); + this.retryCounter = Counter.builder("http.request.retries") + .tags(tags) + .register(meterRegistry); + this.successCounter = Counter.builder("http.request.success") + .tags(tags) + .register(meterRegistry); + this.failureCounter = Counter.builder("http.request.failure") + .tags(tags) + .register(meterRegistry); + + LOGGER.info("Created PendingRequest with ID: {}", requestId); + } + + public CompletableFuture getFuture() { + return future; + } + + public List getMessages() { + return messages; + } + + public String getRequestId() { + return requestId; + } + + public Map getMetadata() { + return new HashMap<>(metadata); + } + + public long getElapsedTime() { + return System.currentTimeMillis() - creationTime; + } + + public void execute() { + if (!state.compareAndSet(RequestState.PENDING, RequestState.IN_PROGRESS)) { + LOGGER.debug("Request {} is already in progress or completed", requestId); + return; + } + + if (!circuitBreaker.allowRequest()) { + LOGGER.warn("Circuit breaker is open for request: {}", requestId); + handleFailure(new RuntimeException("Circuit breaker is open")); + return; + } + + LOGGER.info("Executing request: {}", requestId); + executionTimer.record(() -> { + try { + SinkResponse response = httpSink.pushToSink(messages); + handleSuccess(response); + } catch (SinkException e) { + handleFailure(e); + } + }); + } + + private void handleSuccess(SinkResponse result) { + LOGGER.info("Request {} completed successfully", requestId); + circuitBreaker.recordSuccess(); + state.set(RequestState.COMPLETED); + future.complete(result); + onCompletionCallback.accept(this); + successCounter.increment(); + } + + private void handleFailure(Throwable throwable) { + LOGGER.warn("Request {} failed: {}", requestId, throwable.getMessage()); + circuitBreaker.recordFailure(); + failureCounter.increment(); + if (retryCount.incrementAndGet() <= config.getMaxRetries() && retryPredicate.test(throwable)) { + LOGGER.info("Scheduling retry {} for request {}", retryCount.get(), requestId); + retryCounter.increment(); + scheduleRetry(); + } else { + LOGGER.error("Request {} failed permanently after {} retries", requestId, retryCount.get() - 1); + state.set(RequestState.FAILED); + future.completeExceptionally(throwable); + onCompletionCallback.accept(this); + } + } + + private void scheduleRetry() { + lock.lock(); + try { + RetryStrategy strategy = retryStrategies.poll(); + if (strategy == null) { + strategy = new ExponentialBackoffStrategy(); + } + long delay = strategy.getNextRetryDelay(retryCount.get()); + LOGGER.debug("Scheduling retry for request {} with delay {}ms", requestId, delay); + scheduler.schedule(this::execute, delay, TimeUnit.MILLISECONDS); + retryCondition.await(delay, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Retry scheduling interrupted for request {}", requestId); + Thread.currentThread().interrupt(); + } finally { + lock.unlock(); + } + } + + public static class Builder { + private HttpSinkConfig config; + private List messages; + private Map metadata = new HashMap<>(); + private List retryStrategies = new ArrayList<>(); + private HttpSink httpSink; + private Consumer onCompletionCallback = request -> {}; + private Predicate retryPredicate = throwable -> true; + private MeterRegistry meterRegistry; + + public Builder withConfig(HttpSinkConfig config) { + this.config = config; + return this; + } + + public Builder withMessages(List messages) { + this.messages = messages; + return this; + } + + public Builder withMetadata(String key, Object value) { + this.metadata.put(key, value); + return this; + } + + public Builder withRetryStrategy(RetryStrategy strategy) { + this.retryStrategies.add(strategy); + return this; + } + + public Builder withHttpSink(HttpSink httpSink) { + this.httpSink = httpSink; + return this; + } + + public Builder withOnCompletionCallback(Consumer callback) { + this.onCompletionCallback = callback; + return this; + } + + public Builder withRetryPredicate(Predicate predicate) { + this.retryPredicate = predicate; + return this; + } + + public Builder withMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + return this; + } + + public PendingRequest build() { + if (config == null || messages == null || httpSink == null || meterRegistry == null) { + throw new IllegalStateException("Config, Messages, HttpSink, and MeterRegistry must be set"); + } + return new PendingRequest(this); + } + } + + public static PendingRequest create(HttpSinkConfig config, List messages, HttpSink httpSink, MeterRegistry meterRegistry) { + return new Builder() + .withConfig(config) + .withMessages(messages) + .withHttpSink(httpSink) + .withMeterRegistry(meterRegistry) + .build(); + } + + public void cancel() { + if (state.compareAndSet(RequestState.PENDING, RequestState.FAILED) || + state.compareAndSet(RequestState.IN_PROGRESS, RequestState.FAILED)) { + LOGGER.info("Cancelling request: {}", requestId); + future.cancel(true); + onCompletionCallback.accept(this); + } + } + + public boolean isCancelled() { + return future.isCancelled(); + } + + public boolean isDone() { + return future.isDone(); + } + + public RequestState getState() { + return state.get(); + } + + public void addContext(String key, Object value) { + context.put(key, value); + LOGGER.debug("Added context for request {}: {} = {}", requestId, key, value); + } + + public Optional getContext(String key) { + return Optional.ofNullable(context.get(key)); + } + + public Map getAllContext() { + return new HashMap<>(context); + } + + public int getRetryCount() { + return retryCount.get(); + } + + public void forceRetry() { + lock.lock(); + try { + if (state.get() == RequestState.FAILED) { + LOGGER.info("Forcing retry for failed request: {}", requestId); + state.set(RequestState.PENDING); + retryCount.set(0); + execute(); + } + } finally { + lock.unlock(); + } + } + + @Override + public String toString() { + return String.format("PendingRequest{requestId=%s, state=%s, retryCount=%d, elapsedTime=%d}", + requestId, state.get(), retryCount.get(), getElapsedTime()); + } + + private static class RetryContextHolder { + private static final ThreadLocal> retryContext = ThreadLocal.withInitial(HashMap::new); + + public static void set(String key, Object value) { + retryContext.get().put(key, value); + } + + public static Object get(String key) { + return retryContext.get().get(key); + } + + public static void clear() { + retryContext.get().clear(); + } + } + + public void setRetryContext(String key, Object value) { + RetryContextHolder.set(key, value); + LOGGER.debug("Set retry context for request {}: {} = {}", requestId, key, value); + } + + public Object getRetryContext(String key) { + return RetryContextHolder.get(key); + } + + public void clearRetryContext() { + RetryContextHolder.clear(); + LOGGER.debug("Cleared retry context for request {}", requestId); + } +} diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/http/PendingRequestTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/http/PendingRequestTest.java new file mode 100644 index 000000000..ca8fabfe5 --- /dev/null +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/sink/http/PendingRequestTest.java @@ -0,0 +1,388 @@ +import com.gotocompany.depot.config.HttpSinkConfig; +import com.gotocompany.depot.Sink; +import com.gotocompany.depot.SinkResponse; +import com.gotocompany.depot.message.Message; +import com.gotocompany.depot.exception.SinkException; +import com.gotocompany.depot.http.HttpSink; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class PendingRequestTest { + + @Mock + private HttpSinkConfig config; + + @Mock + private HttpSink httpSink; + + @Mock + private SinkResponse sinkResponse; + + private MeterRegistry meterRegistry; + private List messages; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + meterRegistry = new SimpleMeterRegistry(); + messages = new ArrayList<>(); + messages.add(new Message("key".getBytes(), "value".getBytes())); + when(config.getMaxRetries()).thenReturn(3); + when(config.getBackoffMultiplier()).thenReturn(1000L); + when(config.getJitterFactor()).thenReturn(0.1); + when(config.getCircuitBreakerResetTimeout()).thenReturn(60000L); + } + + @Test + public void shouldCreatePendingRequest() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertNotNull(request); + } + + @Test + public void shouldReturnFuture() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertNotNull(request.getFuture()); + } + + @Test + public void shouldReturnMessages() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertEquals(messages, request.getMessages()); + } + + @Test + public void shouldHaveUniqueRequestId() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertNotNull(request.getRequestId()); + } + + @Test + public void shouldHaveEmptyMetadataInitially() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertTrue(request.getMetadata().isEmpty()); + } + + @Test + public void shouldTrackElapsedTime() throws InterruptedException { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + Thread.sleep(100); + assertTrue(request.getElapsedTime() >= 100); + } + + @Test + public void shouldExecuteSuccessfully() throws Exception { + when(httpSink.pushToSink(messages)).thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + assertEquals(sinkResponse, request.getFuture().get(1, TimeUnit.SECONDS)); + } + + @Test + public void shouldHandleExecutionFailure() throws Exception { + when(httpSink.pushToSink(messages)).thenThrow(new SinkException("Test exception")); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + assertTrue(request.getFuture().isCompletedExceptionally()); + } + + @Test + public void shouldRetryOnFailure() throws Exception { + when(httpSink.pushToSink(messages)) + .thenThrow(new SinkException("Test exception")) + .thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + assertEquals(sinkResponse, request.getFuture().get(5, TimeUnit.SECONDS)); + } + + @Test + public void shouldFailAfterMaxRetries() throws Exception { + when(httpSink.pushToSink(messages)).thenThrow(new SinkException("Test exception")); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + assertTrue(request.getFuture().isCompletedExceptionally()); + } + + @Test + public void shouldCancelRequest() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.cancel(); + assertTrue(request.isCancelled()); + } + + @Test + public void shouldReportCancelledStatus() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertFalse(request.isCancelled()); + request.cancel(); + assertTrue(request.isCancelled()); + } + + @Test + public void shouldReportDoneStatus() throws Exception { + when(httpSink.pushToSink(messages)).thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertFalse(request.isDone()); + request.execute(); + request.getFuture().get(1, TimeUnit.SECONDS); + assertTrue(request.isDone()); + } + + @Test + public void shouldReportCorrectState() throws Exception { + when(httpSink.pushToSink(messages)).thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertEquals(PendingRequest.RequestState.PENDING, request.getState()); + request.execute(); + request.getFuture().get(1, TimeUnit.SECONDS); + assertEquals(PendingRequest.RequestState.COMPLETED, request.getState()); + } + + @Test + public void shouldAddAndRetrieveContext() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.addContext("key", "value"); + assertEquals("value", request.getContext("key").orElse(null)); + } + + @Test + public void shouldHandleNonexistentContext() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.addContext("key", "value"); + assertTrue(request.getContext("key").isPresent()); + assertFalse(request.getContext("nonexistent").isPresent()); + } + + @Test + public void shouldRetrieveAllContext() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.addContext("key1", "value1"); + request.addContext("key2", "value2"); + assertEquals(2, request.getAllContext().size()); + } + + @Test + public void shouldTrackRetryCount() throws Exception { + when(httpSink.pushToSink(messages)) + .thenThrow(new SinkException("Test exception")) + .thenThrow(new SinkException("Test exception")) + .thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + request.getFuture().get(5, TimeUnit.SECONDS); + assertEquals(2, request.getRetryCount()); + } + + @Test + public void shouldAllowForceRetry() throws Exception { + when(httpSink.pushToSink(messages)) + .thenThrow(new SinkException("Test exception")) + .thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + Thread.sleep(100); + request.forceRetry(); + assertEquals(sinkResponse, request.getFuture().get(5, TimeUnit.SECONDS)); + } + + @Test + public void shouldProvideStringRepresentation() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + assertTrue(request.toString().contains("requestId")); + assertTrue(request.toString().contains("state")); + assertTrue(request.toString().contains("retryCount")); + assertTrue(request.toString().contains("elapsedTime")); + } + + @Test + public void shouldSetAndGetRetryContext() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.setRetryContext("key", "value"); + assertEquals("value", request.getRetryContext("key")); + } + + @Test + public void shouldClearRetryContext() { + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.setRetryContext("key", "value"); + request.clearRetryContext(); + assertNull(request.getRetryContext("key")); + } + + @Test + public void shouldOpenCircuitBreaker() throws Exception { + when(httpSink.pushToSink(messages)).thenThrow(new SinkException("Test exception")); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + for (int i = 0; i < 5; i++) { + request.execute(); + Thread.sleep(100); + } + assertTrue(request.getFuture().isCompletedExceptionally()); + } + + @Test + public void shouldHalfOpenCircuitBreaker() throws Exception { + when(config.getCircuitBreakerResetTimeout()).thenReturn(100L); + when(httpSink.pushToSink(messages)) + .thenThrow(new SinkException("Test exception")) + .thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + Thread.sleep(200); + request.execute(); + assertEquals(sinkResponse, request.getFuture().get(1, TimeUnit.SECONDS)); + } + + @Test + public void shouldUseExponentialBackoffStrategy() throws Exception { + when(httpSink.pushToSink(messages)) + .thenThrow(new SinkException("Test exception")) + .thenThrow(new SinkException("Test exception")) + .thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + assertEquals(sinkResponse, request.getFuture().get(10, TimeUnit.SECONDS)); + } + + @Test + public void shouldUseLinearBackoffStrategy() throws Exception { + PendingRequest.Builder builder = new PendingRequest.Builder() + .withConfig(config) + .withMessages(messages) + .withHttpSink(httpSink) + .withMeterRegistry(meterRegistry) + .withRetryStrategy(new PendingRequest.LinearBackoffStrategy()); + when(httpSink.pushToSink(messages)) + .thenThrow(new SinkException("Test exception")) + .thenThrow(new SinkException("Test exception")) + .thenReturn(sinkResponse); + PendingRequest request = builder.build(); + request.execute(); + assertEquals(sinkResponse, request.getFuture().get(10, TimeUnit.SECONDS)); + } + + @Test + public void shouldUseCustomRetryPredicate() throws Exception { + PendingRequest.Builder builder = new PendingRequest.Builder() + .withConfig(config) + .withMessages(messages) + .withHttpSink(httpSink) + .withMeterRegistry(meterRegistry) + .withRetryPredicate(throwable -> false); + when(httpSink.pushToSink(messages)).thenThrow(new SinkException("Test exception")); + PendingRequest request = builder.build(); + request.execute(); + assertTrue(request.getFuture().isCompletedExceptionally()); + } + + @Test + public void shouldInvokeCompletionCallback() throws Exception { + CompletableFuture callbackFuture = new CompletableFuture<>(); + PendingRequest.Builder builder = new PendingRequest.Builder() + .withConfig(config) + .withMessages(messages) + .withHttpSink(httpSink) + .withMeterRegistry(meterRegistry) + .withOnCompletionCallback(callbackFuture::complete); + when(httpSink.pushToSink(messages)).thenReturn(sinkResponse); + PendingRequest request = builder.build(); + request.execute(); + assertEquals(request, callbackFuture.get(1, TimeUnit.SECONDS)); + } + + @Test + public void shouldRecordMetrics() throws Exception { + when(httpSink.pushToSink(messages)).thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + request.getFuture().get(1, TimeUnit.SECONDS); + assertTrue(meterRegistry.get("http.request.execution").timer().count() > 0); + assertEquals(1, meterRegistry.get("http.request.success").counter().count()); + } + + @Test + public void shouldRecordRetryMetrics() throws Exception { + when(httpSink.pushToSink(messages)) + .thenThrow(new SinkException("Test exception")) + .thenReturn(sinkResponse); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + request.getFuture().get(5, TimeUnit.SECONDS); + assertEquals(1, meterRegistry.get("http.request.retries").counter().count()); + } + + @Test + public void shouldRecordFailureMetrics() throws Exception { + when(httpSink.pushToSink(messages)).thenThrow(new SinkException("Test exception")); + PendingRequest request = PendingRequest.create(config, messages, httpSink, meterRegistry); + request.execute(); + Thread.sleep(1000); + assertEquals(1, meterRegistry.get("http.request.failure").counter().count()); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowExceptionWhenConfigMissing() { + new PendingRequest.Builder() + .withMessages(messages) + .withHttpSink(httpSink) + .withMeterRegistry(meterRegistry) + .build(); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowExceptionWhenMessagesMissing() { + new PendingRequest.Builder() + .withConfig(config) + .withHttpSink(httpSink) + .withMeterRegistry(meterRegistry) + .build(); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowExceptionWhenHttpSinkMissing() { + new PendingRequest.Builder() + .withConfig(config) + .withMessages(messages) + .withMeterRegistry(meterRegistry) + .build(); + } + + @Test(expected = IllegalStateException.class) + public void shouldThrowExceptionWhenMeterRegistryMissing() { + new PendingRequest.Builder() + .withConfig(config) + .withMessages(messages) + .withHttpSink(httpSink) + .build(); + } + + @Test + public void shouldBuildWithAllParameters() { + PendingRequest request = new PendingRequest.Builder() + .withConfig(config) + .withMessages(messages) + .withHttpSink(httpSink) + .withMeterRegistry(meterRegistry) + .withMetadata("key", "value") + .withRetryStrategy(new PendingRequest.LinearBackoffStrategy()) + .withOnCompletionCallback(r -> {}) + .withRetryPredicate(t -> true) + .build(); + assertNotNull(request); + } +}