Skip to content

Commit

Permalink
Merge pull request #681 from smallrye/observability
Browse files Browse the repository at this point in the history
Stork observability support
  • Loading branch information
cescoffier authored Oct 11, 2023
2 parents a62d668 + 5a76dfc commit 77ae850
Show file tree
Hide file tree
Showing 22 changed files with 945 additions and 54 deletions.
10 changes: 9 additions & 1 deletion api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,15 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.stork.api.Service::<init>(java.lang.String, io.smallrye.stork.api.LoadBalancer, io.smallrye.stork.api.ServiceDiscovery, io.smallrye.stork.api.ServiceRegistrar, boolean)",
"new": "method void io.smallrye.stork.api.Service::<init>(java.lang.String, java.lang.String, java.lang.String, io.smallrye.stork.api.observability.ObservationCollector, io.smallrye.stork.api.LoadBalancer, io.smallrye.stork.api.ServiceDiscovery, io.smallrye.stork.api.ServiceRegistrar<?>, boolean)",
"justification": "Implementing observability"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
64 changes: 56 additions & 8 deletions api/src/main/java/io/smallrye/stork/api/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.concurrent.Semaphore;

import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.observability.ObservationCollector;
import io.smallrye.stork.api.observability.StorkObservation;

/**
* Represents a <em>Service</em>.
Expand All @@ -16,23 +18,34 @@ public class Service {
private final Semaphore instanceSelectionLock;
private final LoadBalancer loadBalancer;
private final ServiceDiscovery serviceDiscovery;
private final ServiceRegistrar serviceRegistrar;
private final ServiceRegistrar<?> serviceRegistrar;
private final String serviceName;
private final String serviceDiscoveryType;
private final String serviceSelectionType;
private final ObservationCollector observations;

/**
* Creates a new Service.
*
* @param serviceName the name, must not be {@code null}, must not be blank
* @param serviceDiscoveryType the type of the service discovery (for observability purpose)
* @param serviceSelectionType the type of the service selection (for observability purpose)
* @param collector the observation collector, must not be {@code null}
* @param loadBalancer the load balancer, can be {@code null}
* @param serviceDiscovery the service discovery, must not be {@code null}
* @param serviceRegistrar the service registrar, can be {@code null}
* @param requiresStrictRecording whether strict recording must be enabled
*/
public Service(String serviceName, LoadBalancer loadBalancer, ServiceDiscovery serviceDiscovery,
ServiceRegistrar serviceRegistrar, boolean requiresStrictRecording) {
public Service(String serviceName,
String serviceSelectionType, String serviceDiscoveryType, ObservationCollector collector,
LoadBalancer loadBalancer, ServiceDiscovery serviceDiscovery,
ServiceRegistrar<?> serviceRegistrar, boolean requiresStrictRecording) {
this.loadBalancer = loadBalancer;
this.serviceDiscovery = serviceDiscovery;
this.serviceRegistrar = serviceRegistrar;
this.serviceDiscoveryType = serviceDiscoveryType;
this.serviceSelectionType = serviceSelectionType;
this.observations = collector;
this.serviceName = serviceName;
this.instanceSelectionLock = requiresStrictRecording ? new Semaphore(1) : null;
}
Expand All @@ -41,21 +54,37 @@ public Service(String serviceName, LoadBalancer loadBalancer, ServiceDiscovery s
* Selects a service instance.
* <p>
* The selection looks for the service instances and select the one to use using the load balancer.
*
* <p>
* <b>Note:</b> this method doesn't record a start of an operation using this load balancer and does not
* synchronize load balancer invocations even if the load balancer is not thread safe
*
* @return a Uni with a ServiceInstance, or with {@link NoServiceInstanceFoundException} if the load balancer failed to find
* a service instance capable of handling a call
*/
public Uni<ServiceInstance> selectInstance() {
StorkObservation observationPoints = observations.create(serviceName, serviceDiscoveryType,
serviceSelectionType);
return serviceDiscovery.getServiceInstances()
.map(this::selectInstance);
.onItemOrFailure().invoke((list, failure) -> {
if (failure != null) {
observationPoints.onServiceDiscoveryFailure(failure);
} else {
observationPoints.onServiceDiscoverySuccess(list);
}
})
.map(this::selectInstance)
.onItemOrFailure().invoke((selected, failure) -> {
if (failure != null) {
observationPoints.onServiceSelectionFailure(failure);
} else {
observationPoints.onServiceSelectionSuccess(selected.getId());
}
});
}

/**
* Using the underlying load balancer, select a service instance from the collection of service instances.
*
* <p>
* <b>Note:</b> this method doesn't record a start of an operation using this load balancer and does not
* synchronize load balancer invocations even if the load balancer is not thread safe
*
Expand All @@ -80,8 +109,23 @@ public ServiceInstance selectInstance(Collection<ServiceInstance> instances) {
* @see LoadBalancer#requiresStrictRecording()
*/
public Uni<ServiceInstance> selectInstanceAndRecordStart(boolean measureTime) {
return serviceDiscovery.getServiceInstances()
.map(list -> selectInstanceAndRecordStart(list, measureTime));
StorkObservation observationPoints = observations.create(serviceName, serviceDiscoveryType,
serviceSelectionType);
return serviceDiscovery.getServiceInstances().onItemOrFailure().invoke((list, failure) -> {
if (failure != null) {
observationPoints.onServiceDiscoveryFailure(failure);
} else {
observationPoints.onServiceDiscoverySuccess(list);
}
})
.map(list -> selectInstanceAndRecordStart(list, measureTime))
.onItemOrFailure().invoke((selected, failure) -> {
if (failure != null) {
observationPoints.onServiceSelectionFailure(failure);
} else {
observationPoints.onServiceSelectionSuccess(selected.getId());
}
});
}

/**
Expand Down Expand Up @@ -158,6 +202,10 @@ public ServiceRegistrar getServiceRegistrar() {
return serviceRegistrar;
}

public ObservationCollector getObservations() {
return observations;
}

/**
* @return the service name.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.smallrye.stork.api.observability;

import java.util.List;

import io.smallrye.stork.api.ServiceInstance;

public class NoopObservationCollector implements ObservationCollector {

private static final StorkEventHandler NOOP_HANDLER = ev -> {
// NOOP
};

public static final StorkObservation NOOP_STORK_EVENT = new StorkObservation(
null, null,
null, NOOP_HANDLER) {
@Override
public void onServiceDiscoverySuccess(List<ServiceInstance> instances) {
// Noop
}

@Override
public void onServiceDiscoveryFailure(Throwable throwable) {
// Noop
}

@Override
public void onServiceSelectionSuccess(long id) {
// Noop
}

@Override
public void onServiceSelectionFailure(Throwable throwable) {
// Noop
}
};

@Override
public StorkObservation create(String serviceName, String serviceDiscoveryType,
String serviceSelectionType) {
return NOOP_STORK_EVENT;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.smallrye.stork.api.observability;

public interface ObservationCollector {

StorkObservation create(String serviceName, String serviceDiscoveryType, String serviceSelectionType);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.smallrye.stork.api.observability;

public interface StorkEventHandler {
void complete(StorkObservation event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.smallrye.stork.api.observability;

import java.time.Duration;
import java.util.List;

import io.smallrye.stork.api.ServiceInstance;

public class StorkObservation {
// Handler / Reporter
private final StorkEventHandler handler;

// Metadata
private final String serviceName;
private final String serviceDiscoveryType;
private final String serviceSelectionType;

// Time
private final long begin;
private volatile long endOfServiceDiscovery;
private volatile long endOfServiceSelection;

// Service discovery data
private volatile int instancesCount = -1;

// Service selection data
private volatile long selectedInstanceId = -1L;

// Overall status
private volatile boolean done;
private volatile boolean serviceDiscoverySuccessful = false;
private volatile Throwable failure;

public StorkObservation(String serviceName, String serviceDiscoveryType, String serviceSelectionType,
StorkEventHandler handler) {
this.handler = handler;
this.serviceName = serviceName;
this.serviceDiscoveryType = serviceDiscoveryType;
this.serviceSelectionType = serviceSelectionType;
this.begin = System.nanoTime();
}

public void onServiceDiscoverySuccess(List<ServiceInstance> instances) {
this.endOfServiceDiscovery = System.nanoTime();
this.serviceDiscoverySuccessful = true;
if (instances != null) {
this.instancesCount = instances.size();
} else {
this.instancesCount = 0;
}
}

public void onServiceDiscoveryFailure(Throwable throwable) {
this.endOfServiceDiscovery = System.nanoTime();
this.failure = throwable;
}

public void onServiceSelectionSuccess(long id) {
this.endOfServiceSelection = System.nanoTime();
this.selectedInstanceId = id;
this.done = true;
this.handler.complete(this);
}

public void onServiceSelectionFailure(Throwable throwable) {
this.endOfServiceSelection = System.nanoTime();
if (failure != throwable) {
this.failure = throwable;
}
this.handler.complete(this);
}

public boolean isDone() {
return done || failure != null;
}

public Duration getOverallDuration() {
if (!isDone()) {
return null;
}
return Duration.ofNanos(endOfServiceSelection - begin);
}

public Duration getServiceDiscoveryDuration() {
return Duration.ofNanos(endOfServiceDiscovery - begin);
}

public Duration getServiceSelectionDuration() {
if (!isDone()) {
return null;
}
return Duration.ofNanos(endOfServiceSelection - endOfServiceDiscovery);
}

public String getServiceName() {
return serviceName;
}

public String getServiceDiscoveryType() {
return serviceDiscoveryType;
}

public String getServiceSelectionType() {
return serviceSelectionType;
}

public int getDiscoveredInstancesCount() {
return instancesCount;
}

public Throwable failure() {
return failure;
}

public boolean isServiceDiscoverySuccessful() {
return serviceDiscoverySuccessful;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.util.function.Supplier;

import io.smallrye.stork.api.observability.NoopObservationCollector;
import io.smallrye.stork.api.observability.ObservationCollector;

/**
* A provider for "utility" objects used by service discovery and load balancer implementations.
*
Expand All @@ -24,4 +27,8 @@ public interface StorkInfrastructure {
* @throws NullPointerException if utilityClass or defaultSupplier are null
*/
<T> T get(Class<T> utilityClass, Supplier<T> defaultSupplier);

default ObservationCollector getObservationCollector() {
return new NoopObservationCollector();
}
}
10 changes: 7 additions & 3 deletions core/src/main/java/io/smallrye/stork/Stork.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,15 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader

final var loadBalancerConfig = serviceConfig.loadBalancer();
final LoadBalancer loadBalancer;
String loadBalancerType;
if (loadBalancerConfig == null) {
// no load balancer, use round-robin
LOGGER.debug("No load balancer configured for type {}, using {}", serviceDiscoveryType,
RoundRobinLoadBalancerProvider.ROUND_ROBIN_TYPE);
loadBalancerType = RoundRobinLoadBalancerProvider.ROUND_ROBIN_TYPE;
loadBalancer = new RoundRobinLoadBalancer();
} else {
String loadBalancerType = loadBalancerConfig.type();
loadBalancerType = loadBalancerConfig.type();
final var loadBalancerProvider = loadBalancerLoaders.get(loadBalancerType);
if (loadBalancerProvider == null) {
throw new IllegalArgumentException("No LoadBalancerProvider for type " + loadBalancerType);
Expand All @@ -223,7 +225,7 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader
}

final var serviceRegistrarConfig = serviceConfig.serviceRegistrar();
ServiceRegistrar serviceRegistrar = null;
ServiceRegistrar<?> serviceRegistrar = null;
if (serviceRegistrarConfig == null) {
LOGGER.debug("No service registrar configured for service {}", serviceConfig.serviceName());
} else {
Expand All @@ -237,7 +239,9 @@ private Service createService(Map<String, LoadBalancerLoader> loadBalancerLoader
serviceConfig.serviceName(), infrastructure);
}

return new Service(serviceConfig.serviceName(), loadBalancer, serviceDiscovery, serviceRegistrar,
return new Service(serviceConfig.serviceName(),
loadBalancerType, serviceDiscoveryType, infrastructure.getObservationCollector(),
loadBalancer, serviceDiscovery, serviceRegistrar,
loadBalancer.requiresStrictRecording());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.smallrye.stork.integration;

import io.smallrye.stork.api.observability.ObservationCollector;

public class ObservableStorkInfrastructure extends DefaultStorkInfrastructure {

private final ObservationCollector observationCollector;

public ObservableStorkInfrastructure(ObservationCollector observationCollector) {
this.observationCollector = observationCollector;
}

@Override
public ObservationCollector getObservationCollector() {
return observationCollector;
}
}
Loading

0 comments on commit 77ae850

Please sign in to comment.