Skip to content

Commit

Permalink
Allow streaming responses directly from controller method
Browse files Browse the repository at this point in the history
This simplifies resource handling since we can just use try-with-resources in the controller. It also makes debugging easier as exceptions during streaming will be thrown in the context of the controller method.
  • Loading branch information
ato committed Aug 30, 2024
1 parent 067b01d commit 7458bdd
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 42 deletions.
13 changes: 13 additions & 0 deletions src/outbackcdx/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
public class Query {
private static final String DEFAULT_FIELDS = "urlkey,timestamp,url,mime,status,digest,redirecturl,robotflags,length,offset,filename";
private static final String DEFAULT_FIELDS_CDX14 = DEFAULT_FIELDS + ",originalLength,originalOffset,originalFilename";
private static final boolean CDX_PLUS_WORKAROUND = "1".equals(System.getenv("CDX_PLUS_WORKAROUND"));

public static final long MIN_TIMESTAMP = 0L;
public static final long MAX_TIMESTAMP = 99999999999999L;
Expand Down Expand Up @@ -184,6 +185,18 @@ CloseableIterator<Capture> execute(Index index) {
if (collapseToLastSpec != null) {
captures = Filter.collapseToLast(captures, collapseToLastSpec);
}

if (CDX_PLUS_WORKAROUND && !captures.hasNext() && url != null && (url.contains("%20") || url.contains(" "))) {
/*
* XXX: NLA has a bunch of bad WARC files that contain + instead of %20 in the URLs. This is a dirty
* workaround until we can fix them. If we found no results try again with + in place of %20.
*/
captures.close();
urlkey = null;
url = url.replace("%20", "+").replace(" ", "+");
captures = execute(index);
}

return captures;
}

Expand Down
12 changes: 11 additions & 1 deletion src/outbackcdx/UWeb.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private void dispatch(HttpServerExchange exchange) throws Exception {
Permit permit = authorizer.verify(authnHeader);
URequest request = new URequest(exchange, permit, contextPath);
Web.Response response = handler.handle(request);
sendResponse(exchange, response);
if (response != Web.Response.ALREADY_SENT) sendResponse(exchange, response);
} catch (Web.ResponseException e) {
sendResponse(exchange, e.response);
} catch (Exception e) {
Expand Down Expand Up @@ -144,5 +144,15 @@ public String username() {
public String url() {
return url;
}

@Override
public OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException {
if (headers != null) {
headers.forEach((name, value) ->
exchange.getResponseHeaders().add(HttpString.tryFromString(name), value));
}
exchange.setStatusCode(status);
return exchange.getOutputStream();
}
}
}
40 changes: 11 additions & 29 deletions src/outbackcdx/WbCdxApi.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package outbackcdx;

import com.fasterxml.jackson.core.JsonGenerator;
import outbackcdx.Web.Response;

import java.io.*;
import java.util.*;
Expand All @@ -21,7 +20,6 @@
* pywb: https://github.com/ikreymer/pywb/wiki/CDX-Server-API
*/
public class WbCdxApi {
private static final boolean CDX_PLUS_WORKAROUND = "1".equals(System.getenv("CDX_PLUS_WORKAROUND"));
private final Iterable<FilterPlugin> filterPlugins;
private final Map<String, ComputedField> computedFields;
private final QueryConfig queryConfig;
Expand All @@ -32,7 +30,7 @@ public WbCdxApi(Iterable<FilterPlugin> filterPlugins, Map<String, ComputedField>
this.queryConfig = queryConfig;
}

public Web.Response queryIndex(Web.Request request, Index index) {
public Web.Response queryIndex(Web.Request request, Index index) throws IOException {
Query query = new Query(request.params(), filterPlugins, queryConfig);

FormatFactory format;
Expand All @@ -56,26 +54,16 @@ public Web.Response queryIndex(Web.Request request, Index index) {
break;
}

CloseableIterator<Capture> captures = query.execute(index);
if (CDX_PLUS_WORKAROUND && !captures.hasNext() && query.url != null && (query.url.contains("%20") || query.url.contains(" "))) {
/*
* XXX: NLA has a bunch of bad WARC files that contain + instead of %20 in the URLs. This is a dirty
* workaround until we can fix them. If we found no results try again with + in place of %20.
*/
captures.close();
query.urlkey = null;
query.url = query.url.replace("%20", "+").replace(" ", "+");
captures = query.execute(index);
}
CloseableIterator<Capture> finalCaptures = captures;
try (CloseableIterator<Capture> captures = query.execute(index);
OutputStream outputStream = request.streamResponse(OK, contentType,
Map.of("Access-Control-Allow-Origin", "*",
"outbackcdx-urlkey", query.urlkey));
Writer out = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8))) {

Response response = new Response(OK, contentType, outputStream -> {
Writer out = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8));
OutputFormat outf = format.construct(query, computedFields, out);
long row = 0;
try (CloseableIterator<Capture> it = finalCaptures) {
while (it.hasNext()) {
Capture capture = it.next();
try (OutputFormat outf = format.construct(query, computedFields, out)) {
while (captures.hasNext()) {
Capture capture = captures.next();
if (row >= query.limit) {
break;
}
Expand All @@ -86,16 +74,10 @@ public Web.Response queryIndex(Web.Request request, Index index) {
System.err.println(new Date() + ": exception " + e + " thrown processing captures");
e.printStackTrace();
out.write("warning: output may be incomplete, error occurred processing captures\n");
} finally {
finalCaptures.close();
}
}

outf.close();
out.flush();
});
response.addHeader("Access-Control-Allow-Origin", "*");
response.addHeader("outbackcdx-urlkey", query.urlkey);
return response;
return Web.Response.ALREADY_SENT;
}

interface FormatFactory {
Expand Down
21 changes: 17 additions & 4 deletions src/outbackcdx/Web.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ interface IStreamer {
void stream(OutputStream out) throws IOException;
}

static class Response {
public static class Response {
private int status;
private final Map<String, List<String>> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private final long bodyLength;
private final IStreamer bodyWriter;

public static final Response ALREADY_SENT = new Response(-1, null, "");

public Response(int status, String mime, String body) {
this.status = status;
byte[] bodyBytes = body.getBytes(UTF_8);
Expand Down Expand Up @@ -115,9 +117,11 @@ public void handle(HttpExchange exchange) throws IOException {
response = new Response(INTERNAL_ERROR, "text/plain", e + "\n");
}

exchange.getResponseHeaders().putAll(response.headers);
exchange.sendResponseHeaders(response.status, response.bodyLength);
response.bodyWriter.stream(exchange.getResponseBody());
if (response != Response.ALREADY_SENT) {
exchange.getResponseHeaders().putAll(response.headers);
exchange.sendResponseHeaders(response.status, response.bodyLength);
response.bodyWriter.stream(exchange.getResponseBody());
}
} finally {
exchange.close();
}
Expand Down Expand Up @@ -191,6 +195,13 @@ public String username() {
public String url() {
return exchange.getRequestURI().toString();
}

@Override
public OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException {
if (headers != null) headers.forEach(exchange.getResponseHeaders()::add);
exchange.sendResponseHeaders(status, 0);
return exchange.getResponseBody();
}
}

public static class ResponseException extends Exception {
Expand Down Expand Up @@ -461,5 +472,7 @@ default String rebuildUrl() {
}

String url();

OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException;
}
}
24 changes: 22 additions & 2 deletions test/outbackcdx/DummyRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

import outbackcdx.auth.Permission;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Map;

class DummyRequest implements Web.Request {
private final MultiMap<String, String> params = new MultiMap<>();
private final Web.Method method;
private final String url;
private final String data;
private ByteArrayOutputStream streamedResponseBody;
private int streamedStatus;
private Map<String, String> streamedHeaders;
private String streamedContentType;

public DummyRequest(Web.Method method, String url) {
this(method, url, null);
Expand Down Expand Up @@ -68,7 +72,23 @@ public String url() {
return url;
}

@Override
public OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException {
this.streamedResponseBody = new ByteArrayOutputStream();
this.streamedStatus = status;
this.streamedHeaders = headers;
this.streamedContentType = contentType;
return streamedResponseBody;
}

public void parm(String name, String value) {
params.add(name, value);
}

public Web.Response streamedResponse() {
Web.Response response = new Web.Response(streamedStatus, streamedContentType,
new ByteArrayInputStream(streamedResponseBody.toByteArray()));
if (streamedHeaders != null) streamedHeaders.forEach(response::addHeader);
return response;
}
}
9 changes: 6 additions & 3 deletions test/outbackcdx/ReplicationFeaturesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ private String GET(String url, int expectedStatus) throws Exception {
private String GET(String url, int expectedStatus, String... parmKeysAndValues) throws Exception {
DummyRequest request = new DummyRequest(GET, url);
for (int i = 0; i < parmKeysAndValues.length; i += 2) {
request.parm(parmKeysAndValues[i], parmKeysAndValues[i + 1]);
}
Web.Response response = webapp.handle(request);
request.parm(parmKeysAndValues[i], parmKeysAndValues[i + 1]);
}
Web.Response response = webapp.handle(request);
if (response == Web.Response.ALREADY_SENT) {
response = request.streamedResponse();
}
assertEquals(expectedStatus, response.getStatus());
return slurp(response);
}
Expand Down
7 changes: 4 additions & 3 deletions test/outbackcdx/WebappTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,12 @@ private String POST(String url, String data, int expectedStatus, String... parmK
}

private String GET(String url, String... parmKeysAndValues) throws Exception {
DummyRequest session = new DummyRequest(GET, url);
DummyRequest request = new DummyRequest(GET, url);
for (int i = 0; i < parmKeysAndValues.length; i += 2) {
session.parm(parmKeysAndValues[i], parmKeysAndValues[i + 1]);
request.parm(parmKeysAndValues[i], parmKeysAndValues[i + 1]);
}
Web.Response response = webapp.handle(session);
Web.Response response = webapp.handle(request);
if (response == Web.Response.ALREADY_SENT) response = request.streamedResponse();
assertEquals(OK, response.getStatus());
return slurp(response);
}
Expand Down

0 comments on commit 7458bdd

Please sign in to comment.