Skip to content

Commit

Permalink
Add --warc-base-url option and basic replay feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ato committed Sep 3, 2024
1 parent 7458bdd commit d40af3e
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 22 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@
<artifactId>concurrent-trees</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.netpreserve</groupId>
<artifactId>jwarc</artifactId>
<version>0.29.0</version>
</dependency>
<dependency>
<groupId>org.netpreserve</groupId>
<artifactId>urlcanon</artifactId>
Expand Down
11 changes: 10 additions & 1 deletion src/outbackcdx/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static void usage() {
System.err.println(" -p port Local port to listen on");
System.err.println(" -t count Number of web server threads");
System.err.println(" -r count Cap on number of rocksdb records to scan to serve a single request");
System.err.println(" --warc-base-url URL Enables replay of WARC records by reading WARC files with this URL prefix");
System.err.println(" -x Output CDX14 by default (instead of CDX11)");
System.err.println(" -v Verbose logging");
System.err.println(" -y file Custom fuzzy match canonicalization YAML configuration file");
Expand Down Expand Up @@ -83,6 +84,7 @@ public static void main(String[] args) {
long maxNumResults = 10000;
Map<String,ComputedField> computedFields = new HashMap<>();
QueryConfig queryConfig = new QueryConfig();
String warcBaseUrl = null;

Map<String,Object> dashboardConfig = new HashMap<>();
dashboardConfig.put("featureFlags", FeatureFlags.asMap());
Expand Down Expand Up @@ -166,6 +168,9 @@ public static void main(String[] args) {
case "--batch-size":
batchSize = Long.parseLong(args[++i]);
break;
case "--warc-base-url":
warcBaseUrl = args[++i];
break;
case "-x":
FeatureFlags.setCdx14(true);
break;
Expand All @@ -180,8 +185,12 @@ public static void main(String[] args) {

try {
UrlCanonicalizer canonicalizer = new UrlCanonicalizer(fuzzyYaml);
Replay replay = null;
if (warcBaseUrl != null) {
replay = new Replay(warcBaseUrl);
}
try (DataStore dataStore = new DataStore(dataPath, maxOpenSstFiles, replicationWindow, scanCap, canonicalizer)) {
Webapp controller = new Webapp(dataStore, verbose, dashboardConfig, canonicalizer, computedFields, maxNumResults, queryConfig);
Webapp controller = new Webapp(dataStore, verbose, dashboardConfig, canonicalizer, computedFields, maxNumResults, queryConfig, replay);
if (undertow) {
UWeb.UServer server = new UWeb.UServer(host, port, contextPath, controller, authorizer);
server.start();
Expand Down
51 changes: 43 additions & 8 deletions src/outbackcdx/MultiMap.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
package outbackcdx;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;

/**
* Mostly acts like a {@code Map<K,V>}, but secretly supports a list of values
Expand All @@ -17,6 +11,15 @@ public class MultiMap<K, V> implements Map<K, V> {

protected Map<K,List<V>> inner;

@SuppressWarnings("unchecked")
public static <K,V> MultiMap<K,V> of(Object... entries) {
var map = new MultiMap<K,V>();
for (int i = 0; i < entries.length; i += 2) {
map.add((K)entries[i], (V)entries[i + 1]);
}
return map;
}

public MultiMap() {
inner = new HashMap<>();
}
Expand Down Expand Up @@ -94,7 +97,39 @@ public Collection<V> values() {

@Override
public Set<Entry<K, V>> entrySet() {
throw new RuntimeException("not implemented");
return new AbstractSet<>() {

@Override
public Iterator<Entry<K, V>> iterator() {
return new Iterator<>() {
final Iterator<Entry<K, List<V>>> entryIterator = inner.entrySet().iterator();
K key;
Iterator<V> values;

@Override
public boolean hasNext() {
while (values == null || !values.hasNext()) {
if (!entryIterator.hasNext()) return false;
Entry<K, List<V>> next = entryIterator.next();
key = next.getKey();
values = next.getValue().iterator();
}
return true;
}

@Override
public Entry<K, V> next() {
if (!hasNext()) throw new NoSuchElementException();
return new AbstractMap.SimpleEntry<>(key, values.next());
}
};
}

@Override
public int size() {
return inner.values().stream().mapToInt(List::size).sum();
}
};
}

public List<V> getAll(K k) {
Expand Down
103 changes: 103 additions & 0 deletions src/outbackcdx/Replay.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package outbackcdx;

import org.netpreserve.jwarc.*;

import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.file.Path;
import java.time.ZoneOffset;
import java.util.Locale;
import java.util.Set;

import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME;
import static outbackcdx.Web.Status.*;

public class Replay {
private static final Set<String> X_ARCHIVE_ORIG_HEADERS = Set.of("age", "alt-svc", "cache-control", "cookie",
"connection", "content-md5", "content-security-policy",
"content-security-policy-report-only", "date", "etag", "last-modified", "memento-datetime",
"p3p", "pragma", "public-key-pins", "retry-after", "server", "status", "strict-transport-security",
"trailer", "tk", "upgrade", "upgrade-insecure-requests", "vary", "via", "warning", "x-frame-options",
"x-xss-protection");
private final String warcBaseUrl;

public Replay(String warcBaseUrl) {
this.warcBaseUrl = warcBaseUrl;
}

public Web.Response replayIdentity(Index index, String date, String url, Web.Request request) throws IOException {
Capture capture = findClosestCapture(index, date, url);
if (capture == null) return new Web.Response(NOT_FOUND, "text/plain", "Not in archive");

try (WarcReader warcReader = openWarcFileAtPosition(capture.file, capture.compressedoffset, capture.length)) {
warcReader.position(capture.compressedoffset);
WarcRecord record = warcReader.next().orElse(null);
if (record == null) throw new IOException("Missing WARC record");

MultiMap<String, String> headers = new MultiMap<>();
headers.add("Access-Control-Allow-Origin", "*");
headers.add("Memento-Datetime", RFC_1123_DATE_TIME.format(record.date().atOffset(ZoneOffset.UTC)));
if (record instanceof WarcResponse) {
HttpResponse http = ((WarcResponse) record).http();
http.headers().map().forEach((name, values) -> {
if (X_ARCHIVE_ORIG_HEADERS.contains(name.toLowerCase(Locale.ROOT))) {
name = "X-Archive-Orig-" + name;
}
for (String value : values) {
headers.add(name, value);
}
});
try (OutputStream out = request.streamResponse(http.status(), headers)) {
http.body().stream().transferTo(out);
}
} else if (record instanceof WarcResource) {
WarcResource resource = (WarcResource) record;
resource.headers().sole("Content-Type").ifPresent(value -> headers.add("Content-Type", value));
try (OutputStream out = request.streamResponse(OK, headers)) {
resource.body().stream().transferTo(out);
}
} else {
throw new IOException("Unexpected WARC record type: " + record.getClass());
}
return Web.Response.ALREADY_SENT;
}
}

private WarcReader openWarcFileAtPosition(String filename, long position, Long length) throws IOException {
if (filename.contains("../")) {
throw new IllegalArgumentException("Refusing to open filename containing ../");
}

URI warcUrl = URI.create(warcBaseUrl + filename);
if (warcUrl.getScheme().equals("file")) {
var warcReader = new WarcReader(Path.of(warcUrl.getPath()));
try {
warcReader.position(position);
return warcReader;
} catch (Throwable e) {
warcReader.close();
throw e;
}
} else {
HttpURLConnection connection = (HttpURLConnection)warcUrl.toURL().openConnection();
String end = length != null && length > 0 ? String.valueOf(position + length - 1) : "";
connection.addRequestProperty("Range", "bytes=" + position + "-" + end);
return new WarcReader(connection.getInputStream());
}

}

private static Capture findClosestCapture(Index index, String date, String url) {
Query query = new Query(new MultiMap<>(), null);
query.url = url;
query.sort = Query.Sort.CLOSEST;
query.closest = date;

try (CloseableIterator<Capture> captures = query.execute(index)) {
if (!captures.hasNext()) return null;
return captures.next();
}
}
}
2 changes: 1 addition & 1 deletion src/outbackcdx/UWeb.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public String url() {
}

@Override
public OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException {
public OutputStream streamResponse(int status, MultiMap<String, String> headers) {
if (headers != null) {
headers.forEach((name, value) ->
exchange.getResponseHeaders().add(HttpString.tryFromString(name), value));
Expand Down
5 changes: 3 additions & 2 deletions src/outbackcdx/WbCdxApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ public Web.Response queryIndex(Web.Request request, Index index) throws IOExcept
}

try (CloseableIterator<Capture> captures = query.execute(index);
OutputStream outputStream = request.streamResponse(OK, contentType,
Map.of("Access-Control-Allow-Origin", "*",
OutputStream outputStream = request.streamResponse(OK,
MultiMap.of("Content-Type", contentType,
"Access-Control-Allow-Origin", "*",
"outbackcdx-urlkey", query.urlkey));
Writer out = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8))) {

Expand Down
4 changes: 2 additions & 2 deletions src/outbackcdx/Web.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public String url() {
}

@Override
public OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException {
public OutputStream streamResponse(int status, MultiMap<String, String> headers) throws IOException {
if (headers != null) headers.forEach(exchange.getResponseHeaders()::add);
exchange.sendResponseHeaders(status, 0);
return exchange.getResponseBody();
Expand Down Expand Up @@ -473,6 +473,6 @@ default String rebuildUrl() {

String url();

OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException;
OutputStream streamResponse(int status, MultiMap<String, String> headers) throws IOException;
}
}
14 changes: 12 additions & 2 deletions src/outbackcdx/Webapp.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Webapp implements Web.Handler {
private final Map<String, ComputedField> computedFields;
private final long maxNumResults;
private final WbCdxApi wbCdxApi;
private final Replay replay;

private static ServiceLoader<FilterPlugin> fpLoader = ServiceLoader.load(FilterPlugin.class);

Expand All @@ -54,7 +55,7 @@ private Response deleteAccessRule(Web.Request req) throws IOException, Web.Respo
return found ? ok() : notFound();
}

Webapp(DataStore dataStore, boolean verbose, Map<String, Object> dashboardConfig, UrlCanonicalizer canonicalizer, Map<String, ComputedField> computedFields, long maxNumResults, QueryConfig queryConfig) {
Webapp(DataStore dataStore, boolean verbose, Map<String, Object> dashboardConfig, UrlCanonicalizer canonicalizer, Map<String, ComputedField> computedFields, long maxNumResults, QueryConfig queryConfig, Replay replay) {
this.dataStore = dataStore;
this.verbose = verbose;
this.dashboardConfig = dashboardConfig;
Expand All @@ -64,6 +65,7 @@ private Response deleteAccessRule(Web.Request req) throws IOException, Web.Respo
this.canonicalizer = canonicalizer;
this.computedFields = computedFields;
this.maxNumResults = maxNumResults;
this.replay = replay;

this.filterPlugins = new ArrayList<FilterPlugin>();
if (FeatureFlags.filterPlugins()) {
Expand Down Expand Up @@ -106,6 +108,7 @@ private Response deleteAccessRule(Web.Request req) throws IOException, Web.Respo
router.on(POST, "/<collection>/truncate_replication", request -> flushWal(request));
router.on(POST, "/<collection>/compact", request -> compact(request), Permission.INDEX_EDIT);
router.on(POST, "/<collection>/upgrade", request -> upgrade(request), Permission.INDEX_EDIT);
router.on(GET, "/<collection>/<date:[0-9]+>id_/<url:.*>", this::replayIdentity);

if (FeatureFlags.experimentalAccessControl()) {
router.on(GET, "/<collection>/ap/<accesspoint>", request -> query(request));
Expand Down Expand Up @@ -663,6 +666,14 @@ Response checkAccessBulk(Web.Request request) throws IOException, ResponseExcept
return jsonResponse(responses);
}

private Response replayIdentity(Request request) throws ResponseException, IOException {
if (replay == null) return new Response(404, "text/plain", "Replay not enabled (try setting --warc-base-url)");
String date = request.param("date");
String url = request.param("url");
Index index = getIndex(request);
return replay.replayIdentity(index, date, url, request);
}

@Override
public Response handle(Web.Request request) throws Exception {
if (!request.path().startsWith(request.contextPath() + "/")) {
Expand All @@ -674,5 +685,4 @@ public Response handle(Web.Request request) throws Exception {
}
return response;
}

}
6 changes: 2 additions & 4 deletions test/outbackcdx/DummyRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class DummyRequest implements Web.Request {
private ByteArrayOutputStream streamedResponseBody;
private int streamedStatus;
private Map<String, String> streamedHeaders;
private String streamedContentType;

public DummyRequest(Web.Method method, String url) {
this(method, url, null);
Expand Down Expand Up @@ -73,11 +72,10 @@ public String url() {
}

@Override
public OutputStream streamResponse(int status, String contentType, Map<String, String> headers) throws IOException {
public OutputStream streamResponse(int status, MultiMap<String, String> headers) throws IOException {
this.streamedResponseBody = new ByteArrayOutputStream();
this.streamedStatus = status;
this.streamedHeaders = headers;
this.streamedContentType = contentType;
return streamedResponseBody;
}

Expand All @@ -86,7 +84,7 @@ public void parm(String name, String value) {
}

public Web.Response streamedResponse() {
Web.Response response = new Web.Response(streamedStatus, streamedContentType,
Web.Response response = new Web.Response(streamedStatus, streamedHeaders.get("Content-Type"),
new ByteArrayInputStream(streamedResponseBody.toByteArray()));
if (streamedHeaders != null) streamedHeaders.forEach(response::addHeader);
return response;
Expand Down
2 changes: 1 addition & 1 deletion test/outbackcdx/ReplicationFeaturesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ReplicationFeaturesTest {
public void setUp() throws IOException {
File root = folder.newFolder();
manager = new DataStore(root, 256, null, Long.MAX_VALUE, null);
webapp = new Webapp(manager, false, Collections.emptyMap(), null, Collections.emptyMap(), 10000, new QueryConfig());
webapp = new Webapp(manager, false, Collections.emptyMap(), null, Collections.emptyMap(), 10000, new QueryConfig(), null);
}

@After
Expand Down
2 changes: 1 addition & 1 deletion test/outbackcdx/WebappTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void setUp() throws IOException, ConfigurationException {
UrlCanonicalizer canon = new UrlCanonicalizer(new ByteArrayInputStream(yaml.getBytes(UTF_8)));

DataStore manager = new DataStore(root, -1, null, Long.MAX_VALUE, canon);
webapp = new Webapp(manager, false, Collections.emptyMap(), canon, Collections.emptyMap(), 10000, new QueryConfig());
webapp = new Webapp(manager, false, Collections.emptyMap(), canon, Collections.emptyMap(), 10000, new QueryConfig(), null);
}

@After
Expand Down

0 comments on commit d40af3e

Please sign in to comment.