-
-
Notifications
You must be signed in to change notification settings - Fork 67
/
DataHandler.java
161 lines (128 loc) · 5.53 KB
/
DataHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package colossus;
import colossus.data.Data;
import colossus.data.DataServiceGrpc;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.stub.StreamObserver;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.exporter.HTTPServer;
import me.dinowernli.grpc.prometheus.Configuration;
import me.dinowernli.grpc.prometheus.MonitoringServerInterceptor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import java.util.stream.IntStream;
public class DataHandler {
private static final Logger LOG = Logger.getLogger(DataHandler.class.getName());
private static final int PORT = 1111;
private static final Counter syncRequests = Counter.build()
.name("data_svc_sync_requests")
.help("Sync requests to the data service")
.labelNames("request_key")
.register();
private static final Counter streamingRequests = Counter.build()
.name("data_svc_streaming_requests")
.help("Streaming requests to the data service")
.register();
private Server grpcServer;
private static HTTPServer prometheusHttpServer;
static class StreamingResponder implements StreamObserver<Data.DataRequest> {
private StreamObserver<Data.DataResponse> observer;
private List<String> items = new ArrayList<>();
StreamingResponder(StreamObserver<Data.DataResponse> observer) {
this.observer = observer;
}
@Override
public void onNext(Data.DataRequest req) {
items.add(req.getRequest().replace("f", "9").toUpperCase());
}
@Override
public void onError(Throwable t) {
observer.onError(t);
}
@Override
public void onCompleted() {
Data.DataResponse res = Data.DataResponse.newBuilder()
.setValue(items.toString())
.build();
this.observer.onNext(res);
this.observer.onCompleted();
}
}
static class DataImpl extends DataServiceGrpc.DataServiceImplBase {
private static final Logger LOG = Logger.getLogger(DataImpl.class.getName());
@Override
public void get(Data.DataRequest req, StreamObserver<Data.DataResponse> resObserver) {
String request = req.getRequest();
LOG.info(String.format("Request received for the string: \"%s\"", request));
String computedValue = request.toUpperCase();
LOG.info(String.format("Computed value: \"%s\"", computedValue));
Data.DataResponse res = Data.DataResponse.newBuilder()
.setValue(computedValue)
.build();
syncRequests.labels(request).inc();
resObserver.onNext(res);
resObserver.onCompleted();
}
@Override
public void streamingGet(Data.EmptyRequest req, StreamObserver<Data.DataResponse> resObserver) {
LOG.info("Request received for streaming data");
Data.DataResponse.Builder resBldr = Data.DataResponse.newBuilder();
IntStream.range(0, 10).forEach(i -> {
String value = String.format("Response %d", i);
streamingRequests.inc();
resObserver.onNext(resBldr.setValue(value).build());
});
resObserver.onCompleted();
}
@Override
public StreamObserver<Data.DataRequest> streamingPut(final StreamObserver<Data.DataResponse> resObserver) {
return new StreamingResponder(resObserver);
}
}
private void blockUntilShutdown() throws InterruptedException {
if (grpcServer != null) {
grpcServer.awaitTermination();
}
}
private void stop() {
if (grpcServer != null) grpcServer.shutdown();
}
private void shutdownHook() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Shutting down gRPC data grpcServer due to JVM shutdown");
DataHandler.this.stop();
LOG.info("Server successfully shut down");
prometheusHttpServer.stop();
LOG.info("Prometheus metrics HTTP server successfully shut down");
}));
}
private void start() throws IOException {
Configuration monitoringConfig = Configuration.cheapMetricsOnly();
MonitoringServerInterceptor prometheusInterceptor = MonitoringServerInterceptor.create(
monitoringConfig.withCollectorRegistry(new CollectorRegistry()));
grpcServer = ServerBuilder.forPort(PORT)
.addService(ServerInterceptors.intercept(new DataImpl().bindService(), prometheusInterceptor))
.build()
.start();
LOG.info(String.format("gRPC server successfully started on port %d", PORT));
shutdownHook();
}
public static void main(String[] args) throws InterruptedException, IOException {
LOG.info(String.format("Starting up gRPC data grpcServer on port %d", PORT));
final DataHandler handler = new DataHandler();
try {
LOG.info("Starting Prometheus HTTP server");
prometheusHttpServer = new HTTPServer(9092);
LOG.info("Successfully started Prometheus HTTP server on port 9092");
} catch (IOException e) {
LOG.severe("Could not start Prometheus HTTP server");
System.exit(1);
}
handler.start();
handler.blockUntilShutdown();
}
}