Skip to content

Commit

Permalink
Initial HTTP support draft
Browse files Browse the repository at this point in the history
  • Loading branch information
tteofili committed Dec 4, 2023
1 parent 7f5d761 commit c5c6097
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 14 deletions.
15 changes: 14 additions & 1 deletion src/main/java/com/ibm/watson/modelmesh/ModelMesh.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,20 @@
import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList;

import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.nio.channels.ClosedByInterruptException;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
Expand Down Expand Up @@ -431,7 +436,7 @@ public abstract class ModelMesh extends ThriftService
private PayloadProcessor initPayloadProcessor() {
String payloadProcessorsDefinitions = getStringParameter(MM_PAYLOAD_PROCESSORS, null);
logger.info("Parsing PayloadProcessor definition '{}'", payloadProcessorsDefinitions);
if (payloadProcessorsDefinitions != null && payloadProcessorsDefinitions.length() > 0) {
if (payloadProcessorsDefinitions != null && !payloadProcessorsDefinitions.isEmpty()) {
List<PayloadProcessor> payloadProcessors = new ArrayList<>();
for (String processorDefinition : payloadProcessorsDefinitions.split(" ")) {
try {
Expand All @@ -442,6 +447,14 @@ private PayloadProcessor initPayloadProcessor() {
String method = uri.getFragment();
if ("http".equals(processorName)) {
processor = new RemotePayloadProcessor(uri);
} else if ("https".equals(processorName)) {
SSLContext sslContext;
try {
sslContext = SSLContext.getDefault();
} catch (NoSuchAlgorithmException missingAlgorithmException) {
throw new UncheckedIOException(new IOException(missingAlgorithmException));
}
processor = new RemotePayloadProcessor(uri, sslContext, sslContext.getDefaultSSLParameters());
} else if ("logger".equals(processorName)) {
processor = new LoggingPayloadProcessor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;

/**
* A {@link PayloadProcessor} that sends payloads to a remote service via HTTP POST.
*/
Expand All @@ -45,8 +48,19 @@ public class RemotePayloadProcessor implements PayloadProcessor {
private final HttpClient client;

public RemotePayloadProcessor(URI uri) {
this(uri, null, null);
}

public RemotePayloadProcessor(URI uri, SSLContext sslContext, SSLParameters sslParameters) {
this.uri = uri;
this.client = HttpClient.newHttpClient();
if (sslContext != null && sslParameters != null) {
this.client = HttpClient.newBuilder()
.sslContext(sslContext)
.sslParameters(sslParameters)
.build();
} else {
this.client = HttpClient.newHttpClient();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ibm.watson.modelmesh.payload;

import java.io.IOException;
import java.net.URI;

import io.grpc.Metadata;
Expand All @@ -29,17 +30,36 @@
class RemotePayloadProcessorTest {

@Test
void testDestinationUnreachable() {
RemotePayloadProcessor remotePayloadProcessor = new RemotePayloadProcessor(URI.create("http://this-does-not-exist:123"));
String id = "123";
String modelId = "456";
String method = "predict";
Status kind = Status.INVALID_ARGUMENT;
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar");
metadata.put(Metadata.Key.of("binary-bin", Metadata.BINARY_BYTE_MARSHALLER), "string".getBytes());
ByteBuf data = Unpooled.buffer(4);
Payload payload = new Payload(id, modelId, method, metadata, data, kind);
assertFalse(remotePayloadProcessor.process(payload));
void testDestinationUnreachable() throws IOException {
URI uri = URI.create("http://this-does-not-exist:123");
try (RemotePayloadProcessor remotePayloadProcessor = new RemotePayloadProcessor(uri)) {
String id = "123";
String modelId = "456";
String method = "predict";
Status kind = Status.INVALID_ARGUMENT;
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar");
metadata.put(Metadata.Key.of("binary-bin", Metadata.BINARY_BYTE_MARSHALLER), "string".getBytes());
ByteBuf data = Unpooled.buffer(4);
Payload payload = new Payload(id, modelId, method, metadata, data, kind);
assertFalse(remotePayloadProcessor.process(payload));
}
}

@Test
void testDestinationUnreachableHTTPS() throws IOException {
URI uri = URI.create("https://this-does-not-exist:123");
try (RemotePayloadProcessor remotePayloadProcessor = new RemotePayloadProcessor(uri)) {
String id = "123";
String modelId = "456";
String method = "predict";
Status kind = Status.INVALID_ARGUMENT;
Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of("foo", Metadata.ASCII_STRING_MARSHALLER), "bar");
metadata.put(Metadata.Key.of("binary-bin", Metadata.BINARY_BYTE_MARSHALLER), "string".getBytes());
ByteBuf data = Unpooled.buffer(4);
Payload payload = new Payload(id, modelId, method, metadata, data, kind);
assertFalse(remotePayloadProcessor.process(payload));
}
}
}

0 comments on commit c5c6097

Please sign in to comment.