Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming] Fix issues when not using try-with-resource #49

Merged
merged 106 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
d319aa9
OpenAI streaming
CharlesDuboisSAP Aug 16, 2024
69ae7eb
Added homepage and error handling todo
CharlesDuboisSAP Aug 16, 2024
7870e6d
Renamed vars
CharlesDuboisSAP Aug 19, 2024
652ec1e
Added todos
CharlesDuboisSAP Aug 19, 2024
727b3d4
Made stream generic, try-with resources, TEXT_EVENT_STREAM, exception…
CharlesDuboisSAP Aug 21, 2024
b3190a5
Formatting
bot-sdk-js Aug 21, 2024
f0fa3e6
close stream correctly
CharlesDuboisSAP Aug 21, 2024
09ca6ea
Formatting
bot-sdk-js Aug 21, 2024
d86243a
Created OpenAiStreamOutput
CharlesDuboisSAP Aug 21, 2024
2a4ce7b
Merge remote-tracking branch 'origin/streaming' into streaming
CharlesDuboisSAP Aug 21, 2024
cf6ec46
Formatting
bot-sdk-js Aug 21, 2024
a73f037
Renamed stream to streamChatCompletion, Added comments
CharlesDuboisSAP Aug 22, 2024
eb3f24a
Added total output
CharlesDuboisSAP Aug 23, 2024
fb2cdaf
Total output is printed
CharlesDuboisSAP Aug 23, 2024
fe078c7
Formatting
bot-sdk-js Aug 23, 2024
09e1be0
addDelta is propagated everywhere
CharlesDuboisSAP Aug 23, 2024
42ae946
addDelta is propagated everywhere
CharlesDuboisSAP Aug 23, 2024
e6e009a
forgotten addDeltas
CharlesDuboisSAP Aug 23, 2024
bee8fdc
Added jackson dependencies
CharlesDuboisSAP Aug 23, 2024
5f03c6f
Added Javadoc
CharlesDuboisSAP Aug 23, 2024
e79ca8e
Removed 1 TODO
CharlesDuboisSAP Aug 23, 2024
ba2c5e0
PMD
CharlesDuboisSAP Aug 27, 2024
c10eecb
PMD again
CharlesDuboisSAP Aug 27, 2024
cdae1c6
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP Aug 27, 2024
faa3b70
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP Aug 27, 2024
0e1a167
Added OpenAiClientTest.streamChatCompletion()
CharlesDuboisSAP Aug 28, 2024
31dbd52
Change return type of stream, added e2e test
CharlesDuboisSAP Aug 29, 2024
de7e7f0
Added documentation
CharlesDuboisSAP Aug 29, 2024
349936f
Added documentation framework-agnostic + throw if finish reason is in…
CharlesDuboisSAP Aug 29, 2024
58b0bc9
Merge branch 'refs/heads/main' into streaming
CharlesDuboisSAP Aug 30, 2024
3366c2e
Added error handling test
CharlesDuboisSAP Aug 30, 2024
c709d31
Updates from pair review / discussion
MatKuhr Aug 30, 2024
73031d1
Cleanup + streamChatCompletion doesn't throw
CharlesDuboisSAP Sep 2, 2024
6b1bfd0
PMD
CharlesDuboisSAP Sep 2, 2024
23474ba
Added errorHandling test
CharlesDuboisSAP Sep 2, 2024
769cd7d
Apply suggestions from code review
CharlesDuboisSAP Sep 3, 2024
118dc69
Dependency analyze
CharlesDuboisSAP Sep 3, 2024
acd21c0
Review comments
CharlesDuboisSAP Sep 3, 2024
28268b2
Make client static
CharlesDuboisSAP Sep 3, 2024
9a9a44b
Formatting
bot-sdk-js Sep 3, 2024
788db03
PMD
CharlesDuboisSAP Sep 3, 2024
0616f55
Fix tests
CharlesDuboisSAP Sep 3, 2024
3446bf0
Removed exception constructors no args
CharlesDuboisSAP Sep 3, 2024
45a20c6
Refactor exception message
CharlesDuboisSAP Sep 3, 2024
f843061
Readme sentences
CharlesDuboisSAP Sep 3, 2024
5edcf71
Remove superfluous call super
CharlesDuboisSAP Sep 3, 2024
7474fb1
reset httpclient-cache and -factory after each test case
newtork Sep 3, 2024
ac6f36c
Very minor code-style improvements in test
newtork Sep 3, 2024
ffa369a
Minor code-style in OpenAIController
newtork Sep 3, 2024
6cfeee9
Reduce README sample code
newtork Sep 3, 2024
6d4fd2f
Update OpenAiStreamingHandler.java (#43)
newtork Sep 3, 2024
a6c566a
Fix import
newtork Sep 3, 2024
543e003
Initial
newtork Sep 3, 2024
e810b52
Format
newtork Sep 3, 2024
89b7315
Improve type
newtork Sep 3, 2024
f6a4fe6
Added stream_options to model
CharlesDuboisSAP Sep 4, 2024
ead57b3
Change Executor#submit() to #execute()
newtork Sep 4, 2024
05dedf9
Change Executor#submit() to #execute()
newtork Sep 4, 2024
2604969
Merge branch 'streaming' of https://github.com/SAP/ai-sdk-java into s…
newtork Sep 4, 2024
9a3bf2f
Merge remote-tracking branch 'origin/main' into streaming
newtork Sep 4, 2024
a0ae779
Added usage testing
CharlesDuboisSAP Sep 4, 2024
2c934f7
Added beautiful Javadoc to enableStreaming
CharlesDuboisSAP Sep 4, 2024
77eb464
typo
CharlesDuboisSAP Sep 4, 2024
488f060
Fix mistake
CharlesDuboisSAP Sep 4, 2024
b5f48cd
Merge remote-tracking branch 'origin/streaming' into streaming-2
newtork Sep 4, 2024
06e3143
Merge remote-tracking branch 'origin/main' into streaming-2
newtork Sep 4, 2024
676c8ad
Syntax improvement to improve API stability.
newtork Sep 5, 2024
93e16d9
Syntax improvement to improve API stability.
newtork Sep 5, 2024
6e15131
Make exception types similar to BufferedReader original logic
newtork Sep 5, 2024
f6f122d
Format
newtork Sep 5, 2024
7aad621
Add nonnull characteristic to mirror BufferedReader original logic
newtork Sep 5, 2024
58e84b2
Merge branch 'main' into streaming-2
newtork Sep 6, 2024
db05057
Merge remote-tracking branch 'origin/main' into streaming-2
newtork Sep 10, 2024
9188893
Make buffer size accessible
a-d Sep 10, 2024
67a0489
Add test
a-d Sep 10, 2024
20cc897
Add assertion on stream count
a-d Sep 10, 2024
f4947a8
Simplify e2e code
a-d Sep 10, 2024
98e61de
Simplify README
a-d Sep 10, 2024
d034d2a
Partially revert
a-d Sep 10, 2024
aa7ae8e
Add assertion
a-d Sep 10, 2024
66ad4d7
Partially revert
a-d Sep 10, 2024
9dcb57a
Merge remote-tracking branch 'origin/main' into streaming-2
a-d Sep 24, 2024
7b275df
Minor code adjustments
a-d Sep 24, 2024
2e2c0df
Replace unnecessary nested types
a-d Sep 24, 2024
fa284ad
Merge nested type to renamed parent type
a-d Sep 24, 2024
cde54d6
Change code to ensure our lazy `hasNext()` has no unexpected side effect
a-d Sep 24, 2024
5088b99
Revert removing `emitter#complete()`
a-d Sep 24, 2024
fa8f91d
Add JavaDoc; Replace VAVR type
a-d Sep 24, 2024
e90883e
Address PMD warnings: change exception type
a-d Sep 24, 2024
4183ca3
Add unhappy-path test cases
a-d Sep 24, 2024
1463eb2
Revert code change in test app
a-d Sep 24, 2024
9fb63e3
Merge branch 'main' into streaming-2
newtork Sep 24, 2024
a235676
Merge remote-tracking branch 'origin/main' into streaming-2
a-d Sep 24, 2024
9e85828
Initial migrate coverage check to pom xml
a-d Oct 24, 2024
8423b49
Print FULL coverage report with git diff indicator
a-d Oct 24, 2024
4c463ba
Merge remote-tracking branch 'origin/main' into cicd/print-coverage-r…
a-d Oct 24, 2024
f5abe19
Merge branch 'cicd/print-coverage-report' into streaming-2
a-d Oct 28, 2024
e504650
Update foundation-models/openai/src/test/java/com/sap/ai/sdk/foundati…
newtork Oct 28, 2024
65c1108
Update foundation-models/openai/src/test/java/com/sap/ai/sdk/foundati…
newtork Oct 28, 2024
51f4ed2
Merge branch 'streaming-2' of https://github.com/SAP/ai-sdk-java into…
a-d Oct 28, 2024
15932c8
Reduce redundant method
a-d Oct 28, 2024
e3167e8
Add comment
a-d Oct 28, 2024
bdffa6d
Fix merge error
a-d Oct 28, 2024
7b34cc7
Fix JavaDoc inaccessibility warning
a-d Oct 28, 2024
1255817
Merge branch 'main' into streaming-2
newtork Oct 28, 2024
fb21544
Improve error message
a-d Oct 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.sap.ai.sdk.foundationmodels.openai;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Spliterator.NONNULL;
import static java.util.Spliterator.ORDERED;

import io.vavr.control.Try;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Spliterators;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.core5.http.HttpEntity;

/**
* Internal utility class to convert from a reading handler to {@link Iterable} and {@link Stream}.
*
* <p><strong>Note:</strong> All operations are sequential in nature. Thread safety is not
* guaranteed.
*
* @param <T> Iterated item type.
*/
@Slf4j
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class IterableStreamConverter<T> implements Iterator<T> {
/** see DEFAULT_CHAR_BUFFER_SIZE in {@link BufferedReader} * */
static final int BUFFER_SIZE = 8192;

/** Read next entry for Stream or {@code null} when no further entry can be read. */
private final Callable<T> readHandler;

/** Close handler to be called when Stream terminated. */
private final Runnable stopHandler;

/** Error handler to be called when Stream is interrupted. */
private final Function<Exception, RuntimeException> errorHandler;

private boolean isDone = false;
private boolean isNextFetched = false;
private T next = null;

@SuppressWarnings("checkstyle:IllegalCatch")
@Override
public boolean hasNext() {
if (isDone) {
return false;
}
if (isNextFetched) {
return true;
}
try {
next = readHandler.call();
isNextFetched = true;
if (next == null) {
isDone = true;
stopHandler.run();
}
} catch (final Exception e) {
isDone = true;
stopHandler.run();
log.debug("Error while reading next element.", e);
throw errorHandler.apply(e);
}
return !isDone;
}

@Override
public T next() {
if (next == null && !hasNext()) {
throw new NoSuchElementException(); // normally not reached with Stream API
}
isNextFetched = false;
return next;
}

/**
* Create a sequential Stream of lines from an HTTP response string (UTF-8). The underlying {@link
* InputStream} is closed, when the resulting Stream is closed (e.g. via try-with-resources) or
* when an exception occurred.
*
* @param entity The HTTP entity object.
* @return A sequential Stream object.
* @throws OpenAiClientException if the provided HTTP entity object is {@code null} or empty.
*/
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
@Nonnull
static Stream<String> lines(@Nullable final HttpEntity entity) throws OpenAiClientException {
if (entity == null) {
throw new OpenAiClientException("OpenAI response was empty.");
}

final InputStream inputStream;
try {
inputStream = entity.getContent();
} catch (final IOException e) {
throw new OpenAiClientException("Failed to read response content.", e);
}

final var reader = new BufferedReader(new InputStreamReader(inputStream, UTF_8), BUFFER_SIZE);
final Runnable closeHandler =
() -> Try.run(reader::close).onFailure(e -> log.error("Could not close input stream", e));
final Function<Exception, RuntimeException> errHandler =
e -> new OpenAiClientException("Parsing response content was interrupted.", e);

final var iterator = new IterableStreamConverter<>(reader::readLine, closeHandler, errHandler);
final var spliterator = Spliterators.spliteratorUnknownSize(iterator, ORDERED | NONNULL);
return StreamSupport.stream(spliterator, /* NOT PARALLEL */ false).onClose(closeHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,31 @@
import static com.sap.ai.sdk.foundationmodels.openai.OpenAiResponseHandler.parseErrorAndThrow;

import com.sap.ai.sdk.foundationmodels.openai.model.StreamedDelta;
import io.vavr.control.Try;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpEntity;

@Slf4j
@RequiredArgsConstructor
class OpenAiStreamingHandler<D extends StreamedDelta> {

@Nonnull private final Class<D> deltaType;

@Nonnull
Stream<D> handleResponse(@Nonnull final ClassicHttpResponse response)
throws OpenAiClientException {
if (response.getCode() >= 300) {
buildExceptionAndThrow(response);
}
return parseResponse(response);
}

/**
* @param response The response to process
* @return A {@link Stream} of a model class instantiated from the response
* @author stippi
*/
// The stream is closed by the user of the Stream
@SuppressWarnings("PMD.CloseResource")
private Stream<D> parseResponse(@Nonnull final ClassicHttpResponse response)
@SuppressWarnings("PMD.CloseResource") // Stream is closed automatically when consumed
@Nonnull
Stream<D> handleResponse(@Nonnull final ClassicHttpResponse response)
throws OpenAiClientException {
final HttpEntity responseEntity = response.getEntity();
if (responseEntity == null) {
throw new OpenAiClientException("Response from OpenAI model was empty.");
}
final InputStream inputStream;
try {
inputStream = responseEntity.getContent();
} catch (IOException e) {
throw new OpenAiClientException("Failed to read response content.", e);
if (response.getCode() >= 300) {
buildExceptionAndThrow(response);
}
final var br = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));

// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
return br.lines()
return IterableStreamConverter.lines(response.getEntity())
// half of the lines are empty newlines, the last line is "data: [DONE]"
.filter(line -> !line.isEmpty() && !"data: [DONE]".equals(line.trim()))
.peek(
Expand All @@ -74,10 +48,6 @@ private Stream<D> parseResponse(@Nonnull final ClassicHttpResponse response)
log.error("Failed to parse the following response from OpenAI model: {}", line);
throw new OpenAiClientException("Failed to parse delta message: " + line, e);
}
})
.onClose(
() ->
Try.run(inputStream::close)
.onFailure(e -> log.error("Could not close HTTP input stream", e)));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.sap.ai.sdk.foundationmodels.openai;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

public class IterableStreamConverterTest {
@SneakyThrows
@Test
@DisplayName("Stream is fully consumed")
void testLines() {
final var TEMPLATE = "THIS\nIS\nA\nTEST\n";
final var input = TEMPLATE.repeat(IterableStreamConverter.BUFFER_SIZE);
final var inputStream = spy(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);

final var sut = IterableStreamConverter.lines(entity);
verify(inputStream, never()).read();
verify(inputStream, never()).read(any());
verify(inputStream, never()).read(any(), anyInt(), anyInt());

final var streamCounter = new AtomicInteger(0);
sut.peek(s -> streamCounter.incrementAndGet())
.forEach(
s ->
assertThat(s)
.containsAnyOf("THIS", "IS", "A", "TEST")
.doesNotContainAnyWhitespaces());

assertThat(streamCounter).hasValue(IterableStreamConverter.BUFFER_SIZE * 4);
verify(inputStream, times(TEMPLATE.length() + 1))
.read(any(), anyInt(), eq(IterableStreamConverter.BUFFER_SIZE));
verify(inputStream, times(1)).close();
}

@SneakyThrows
@Test
@DisplayName("Stream may only read first entry without closing")
void testLinesFindFirst() {
final var TEMPLATE = "Foo Bar\n";
final var inputStream = mock(InputStream.class);
when(inputStream.read(any(), anyInt(), anyInt()))
.thenAnswer(
arg -> {
byte[] ar = arg.getArgument(0, byte[].class);
byte[] bytes = TEMPLATE.getBytes(StandardCharsets.UTF_8);
for (int i = 0; i < ar.length; i++) ar[i] = bytes[i % bytes.length];
return ar.length;
});

final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);

final var sut = IterableStreamConverter.lines(entity);
assertThat(sut.findFirst()).contains("Foo Bar");
verify(inputStream, times(1)).read(any(), anyInt(), anyInt());
verify(inputStream, never()).close();
}

@SneakyThrows
@Test
@DisplayName("Stream may close unexpectedly")
void testLinesThrows() {
final var TEMPLATE = "Foo Bar\n";
final var inputStream = mock(InputStream.class);
when(inputStream.read(any(), anyInt(), anyInt()))
.thenAnswer(
arg -> {
byte[] ar = arg.getArgument(0, byte[].class);
byte[] bytes = TEMPLATE.getBytes(StandardCharsets.UTF_8);
for (int i = 0; i < ar.length; i++) ar[i] = bytes[i % bytes.length];
return ar.length;
})
.thenThrow(new IOException("Ups!"));

final var entity = new InputStreamEntity(inputStream, ContentType.TEXT_PLAIN);

final var sut = IterableStreamConverter.lines(entity);
assertThatThrownBy(sut::count)
.isInstanceOf(OpenAiClientException.class)
.hasMessage("Parsing response content was interrupted.")
.cause()
.isInstanceOf(IOException.class)
.hasMessage("Ups!");
verify(inputStream, times(2)).read(any(), anyInt(), anyInt());
verify(inputStream, times(1)).close();
}
}