From d40af3e9a4f4b1b69aff91b3b42316d9106379fd Mon Sep 17 00:00:00 2001 From: Alex Osborne Date: Tue, 3 Sep 2024 17:08:18 +0900 Subject: [PATCH] Add --warc-base-url option and basic replay feature --- pom.xml | 5 + src/outbackcdx/Main.java | 11 +- src/outbackcdx/MultiMap.java | 51 +++++++-- src/outbackcdx/Replay.java | 103 +++++++++++++++++++ src/outbackcdx/UWeb.java | 2 +- src/outbackcdx/WbCdxApi.java | 5 +- src/outbackcdx/Web.java | 4 +- src/outbackcdx/Webapp.java | 14 ++- test/outbackcdx/DummyRequest.java | 6 +- test/outbackcdx/ReplicationFeaturesTest.java | 2 +- test/outbackcdx/WebappTest.java | 2 +- 11 files changed, 183 insertions(+), 22 deletions(-) create mode 100644 src/outbackcdx/Replay.java diff --git a/pom.xml b/pom.xml index 0a58f06..1f785e5 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,11 @@ concurrent-trees 2.6.1 + + org.netpreserve + jwarc + 0.29.0 + org.netpreserve urlcanon diff --git a/src/outbackcdx/Main.java b/src/outbackcdx/Main.java index d10cb99..4c03e8b 100644 --- a/src/outbackcdx/Main.java +++ b/src/outbackcdx/Main.java @@ -41,6 +41,7 @@ public static void usage() { System.err.println(" -p port Local port to listen on"); System.err.println(" -t count Number of web server threads"); System.err.println(" -r count Cap on number of rocksdb records to scan to serve a single request"); + System.err.println(" --warc-base-url URL Enables replay of WARC records by reading WARC files with this URL prefix"); System.err.println(" -x Output CDX14 by default (instead of CDX11)"); System.err.println(" -v Verbose logging"); System.err.println(" -y file Custom fuzzy match canonicalization YAML configuration file"); @@ -83,6 +84,7 @@ public static void main(String[] args) { long maxNumResults = 10000; Map computedFields = new HashMap<>(); QueryConfig queryConfig = new QueryConfig(); + String warcBaseUrl = null; Map dashboardConfig = new HashMap<>(); dashboardConfig.put("featureFlags", FeatureFlags.asMap()); @@ -166,6 +168,9 @@ public static void main(String[] args) { case "--batch-size": batchSize = Long.parseLong(args[++i]); break; + case "--warc-base-url": + warcBaseUrl = args[++i]; + break; case "-x": FeatureFlags.setCdx14(true); break; @@ -180,8 +185,12 @@ public static void main(String[] args) { try { UrlCanonicalizer canonicalizer = new UrlCanonicalizer(fuzzyYaml); + Replay replay = null; + if (warcBaseUrl != null) { + replay = new Replay(warcBaseUrl); + } try (DataStore dataStore = new DataStore(dataPath, maxOpenSstFiles, replicationWindow, scanCap, canonicalizer)) { - Webapp controller = new Webapp(dataStore, verbose, dashboardConfig, canonicalizer, computedFields, maxNumResults, queryConfig); + Webapp controller = new Webapp(dataStore, verbose, dashboardConfig, canonicalizer, computedFields, maxNumResults, queryConfig, replay); if (undertow) { UWeb.UServer server = new UWeb.UServer(host, port, contextPath, controller, authorizer); server.start(); diff --git a/src/outbackcdx/MultiMap.java b/src/outbackcdx/MultiMap.java index 044b846..5ff647a 100644 --- a/src/outbackcdx/MultiMap.java +++ b/src/outbackcdx/MultiMap.java @@ -1,12 +1,6 @@ package outbackcdx; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * Mostly acts like a {@code Map}, but secretly supports a list of values @@ -17,6 +11,15 @@ public class MultiMap implements Map { protected Map> inner; + @SuppressWarnings("unchecked") + public static MultiMap of(Object... entries) { + var map = new MultiMap(); + for (int i = 0; i < entries.length; i += 2) { + map.add((K)entries[i], (V)entries[i + 1]); + } + return map; + } + public MultiMap() { inner = new HashMap<>(); } @@ -94,7 +97,39 @@ public Collection values() { @Override public Set> entrySet() { - throw new RuntimeException("not implemented"); + return new AbstractSet<>() { + + @Override + public Iterator> iterator() { + return new Iterator<>() { + final Iterator>> entryIterator = inner.entrySet().iterator(); + K key; + Iterator values; + + @Override + public boolean hasNext() { + while (values == null || !values.hasNext()) { + if (!entryIterator.hasNext()) return false; + Entry> next = entryIterator.next(); + key = next.getKey(); + values = next.getValue().iterator(); + } + return true; + } + + @Override + public Entry next() { + if (!hasNext()) throw new NoSuchElementException(); + return new AbstractMap.SimpleEntry<>(key, values.next()); + } + }; + } + + @Override + public int size() { + return inner.values().stream().mapToInt(List::size).sum(); + } + }; } public List getAll(K k) { diff --git a/src/outbackcdx/Replay.java b/src/outbackcdx/Replay.java new file mode 100644 index 0000000..9f74d5f --- /dev/null +++ b/src/outbackcdx/Replay.java @@ -0,0 +1,103 @@ +package outbackcdx; + +import org.netpreserve.jwarc.*; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.nio.file.Path; +import java.time.ZoneOffset; +import java.util.Locale; +import java.util.Set; + +import static java.time.format.DateTimeFormatter.RFC_1123_DATE_TIME; +import static outbackcdx.Web.Status.*; + +public class Replay { + private static final Set X_ARCHIVE_ORIG_HEADERS = Set.of("age", "alt-svc", "cache-control", "cookie", + "connection", "content-md5", "content-security-policy", + "content-security-policy-report-only", "date", "etag", "last-modified", "memento-datetime", + "p3p", "pragma", "public-key-pins", "retry-after", "server", "status", "strict-transport-security", + "trailer", "tk", "upgrade", "upgrade-insecure-requests", "vary", "via", "warning", "x-frame-options", + "x-xss-protection"); + private final String warcBaseUrl; + + public Replay(String warcBaseUrl) { + this.warcBaseUrl = warcBaseUrl; + } + + public Web.Response replayIdentity(Index index, String date, String url, Web.Request request) throws IOException { + Capture capture = findClosestCapture(index, date, url); + if (capture == null) return new Web.Response(NOT_FOUND, "text/plain", "Not in archive"); + + try (WarcReader warcReader = openWarcFileAtPosition(capture.file, capture.compressedoffset, capture.length)) { + warcReader.position(capture.compressedoffset); + WarcRecord record = warcReader.next().orElse(null); + if (record == null) throw new IOException("Missing WARC record"); + + MultiMap headers = new MultiMap<>(); + headers.add("Access-Control-Allow-Origin", "*"); + headers.add("Memento-Datetime", RFC_1123_DATE_TIME.format(record.date().atOffset(ZoneOffset.UTC))); + if (record instanceof WarcResponse) { + HttpResponse http = ((WarcResponse) record).http(); + http.headers().map().forEach((name, values) -> { + if (X_ARCHIVE_ORIG_HEADERS.contains(name.toLowerCase(Locale.ROOT))) { + name = "X-Archive-Orig-" + name; + } + for (String value : values) { + headers.add(name, value); + } + }); + try (OutputStream out = request.streamResponse(http.status(), headers)) { + http.body().stream().transferTo(out); + } + } else if (record instanceof WarcResource) { + WarcResource resource = (WarcResource) record; + resource.headers().sole("Content-Type").ifPresent(value -> headers.add("Content-Type", value)); + try (OutputStream out = request.streamResponse(OK, headers)) { + resource.body().stream().transferTo(out); + } + } else { + throw new IOException("Unexpected WARC record type: " + record.getClass()); + } + return Web.Response.ALREADY_SENT; + } + } + + private WarcReader openWarcFileAtPosition(String filename, long position, Long length) throws IOException { + if (filename.contains("../")) { + throw new IllegalArgumentException("Refusing to open filename containing ../"); + } + + URI warcUrl = URI.create(warcBaseUrl + filename); + if (warcUrl.getScheme().equals("file")) { + var warcReader = new WarcReader(Path.of(warcUrl.getPath())); + try { + warcReader.position(position); + return warcReader; + } catch (Throwable e) { + warcReader.close(); + throw e; + } + } else { + HttpURLConnection connection = (HttpURLConnection)warcUrl.toURL().openConnection(); + String end = length != null && length > 0 ? String.valueOf(position + length - 1) : ""; + connection.addRequestProperty("Range", "bytes=" + position + "-" + end); + return new WarcReader(connection.getInputStream()); + } + + } + + private static Capture findClosestCapture(Index index, String date, String url) { + Query query = new Query(new MultiMap<>(), null); + query.url = url; + query.sort = Query.Sort.CLOSEST; + query.closest = date; + + try (CloseableIterator captures = query.execute(index)) { + if (!captures.hasNext()) return null; + return captures.next(); + } + } +} diff --git a/src/outbackcdx/UWeb.java b/src/outbackcdx/UWeb.java index 0a5ce99..6d1dbda 100644 --- a/src/outbackcdx/UWeb.java +++ b/src/outbackcdx/UWeb.java @@ -146,7 +146,7 @@ public String url() { } @Override - public OutputStream streamResponse(int status, String contentType, Map headers) throws IOException { + public OutputStream streamResponse(int status, MultiMap headers) { if (headers != null) { headers.forEach((name, value) -> exchange.getResponseHeaders().add(HttpString.tryFromString(name), value)); diff --git a/src/outbackcdx/WbCdxApi.java b/src/outbackcdx/WbCdxApi.java index f0dd330..1c068f7 100644 --- a/src/outbackcdx/WbCdxApi.java +++ b/src/outbackcdx/WbCdxApi.java @@ -55,8 +55,9 @@ public Web.Response queryIndex(Web.Request request, Index index) throws IOExcept } try (CloseableIterator captures = query.execute(index); - OutputStream outputStream = request.streamResponse(OK, contentType, - Map.of("Access-Control-Allow-Origin", "*", + OutputStream outputStream = request.streamResponse(OK, + MultiMap.of("Content-Type", contentType, + "Access-Control-Allow-Origin", "*", "outbackcdx-urlkey", query.urlkey)); Writer out = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8))) { diff --git a/src/outbackcdx/Web.java b/src/outbackcdx/Web.java index c5ad29c..9c91538 100644 --- a/src/outbackcdx/Web.java +++ b/src/outbackcdx/Web.java @@ -197,7 +197,7 @@ public String url() { } @Override - public OutputStream streamResponse(int status, String contentType, Map headers) throws IOException { + public OutputStream streamResponse(int status, MultiMap headers) throws IOException { if (headers != null) headers.forEach(exchange.getResponseHeaders()::add); exchange.sendResponseHeaders(status, 0); return exchange.getResponseBody(); @@ -473,6 +473,6 @@ default String rebuildUrl() { String url(); - OutputStream streamResponse(int status, String contentType, Map headers) throws IOException; + OutputStream streamResponse(int status, MultiMap headers) throws IOException; } } diff --git a/src/outbackcdx/Webapp.java b/src/outbackcdx/Webapp.java index 3a8a049..e27e390 100644 --- a/src/outbackcdx/Webapp.java +++ b/src/outbackcdx/Webapp.java @@ -37,6 +37,7 @@ class Webapp implements Web.Handler { private final Map computedFields; private final long maxNumResults; private final WbCdxApi wbCdxApi; + private final Replay replay; private static ServiceLoader fpLoader = ServiceLoader.load(FilterPlugin.class); @@ -54,7 +55,7 @@ private Response deleteAccessRule(Web.Request req) throws IOException, Web.Respo return found ? ok() : notFound(); } - Webapp(DataStore dataStore, boolean verbose, Map dashboardConfig, UrlCanonicalizer canonicalizer, Map computedFields, long maxNumResults, QueryConfig queryConfig) { + Webapp(DataStore dataStore, boolean verbose, Map dashboardConfig, UrlCanonicalizer canonicalizer, Map computedFields, long maxNumResults, QueryConfig queryConfig, Replay replay) { this.dataStore = dataStore; this.verbose = verbose; this.dashboardConfig = dashboardConfig; @@ -64,6 +65,7 @@ private Response deleteAccessRule(Web.Request req) throws IOException, Web.Respo this.canonicalizer = canonicalizer; this.computedFields = computedFields; this.maxNumResults = maxNumResults; + this.replay = replay; this.filterPlugins = new ArrayList(); if (FeatureFlags.filterPlugins()) { @@ -106,6 +108,7 @@ private Response deleteAccessRule(Web.Request req) throws IOException, Web.Respo router.on(POST, "//truncate_replication", request -> flushWal(request)); router.on(POST, "//compact", request -> compact(request), Permission.INDEX_EDIT); router.on(POST, "//upgrade", request -> upgrade(request), Permission.INDEX_EDIT); + router.on(GET, "//id_/", this::replayIdentity); if (FeatureFlags.experimentalAccessControl()) { router.on(GET, "//ap/", request -> query(request)); @@ -663,6 +666,14 @@ Response checkAccessBulk(Web.Request request) throws IOException, ResponseExcept return jsonResponse(responses); } + private Response replayIdentity(Request request) throws ResponseException, IOException { + if (replay == null) return new Response(404, "text/plain", "Replay not enabled (try setting --warc-base-url)"); + String date = request.param("date"); + String url = request.param("url"); + Index index = getIndex(request); + return replay.replayIdentity(index, date, url, request); + } + @Override public Response handle(Web.Request request) throws Exception { if (!request.path().startsWith(request.contextPath() + "/")) { @@ -674,5 +685,4 @@ public Response handle(Web.Request request) throws Exception { } return response; } - } diff --git a/test/outbackcdx/DummyRequest.java b/test/outbackcdx/DummyRequest.java index bd9ff89..bead0f0 100644 --- a/test/outbackcdx/DummyRequest.java +++ b/test/outbackcdx/DummyRequest.java @@ -14,7 +14,6 @@ class DummyRequest implements Web.Request { private ByteArrayOutputStream streamedResponseBody; private int streamedStatus; private Map streamedHeaders; - private String streamedContentType; public DummyRequest(Web.Method method, String url) { this(method, url, null); @@ -73,11 +72,10 @@ public String url() { } @Override - public OutputStream streamResponse(int status, String contentType, Map headers) throws IOException { + public OutputStream streamResponse(int status, MultiMap headers) throws IOException { this.streamedResponseBody = new ByteArrayOutputStream(); this.streamedStatus = status; this.streamedHeaders = headers; - this.streamedContentType = contentType; return streamedResponseBody; } @@ -86,7 +84,7 @@ public void parm(String name, String value) { } public Web.Response streamedResponse() { - Web.Response response = new Web.Response(streamedStatus, streamedContentType, + Web.Response response = new Web.Response(streamedStatus, streamedHeaders.get("Content-Type"), new ByteArrayInputStream(streamedResponseBody.toByteArray())); if (streamedHeaders != null) streamedHeaders.forEach(response::addHeader); return response; diff --git a/test/outbackcdx/ReplicationFeaturesTest.java b/test/outbackcdx/ReplicationFeaturesTest.java index fa11098..7c4a76a 100644 --- a/test/outbackcdx/ReplicationFeaturesTest.java +++ b/test/outbackcdx/ReplicationFeaturesTest.java @@ -28,7 +28,7 @@ public class ReplicationFeaturesTest { public void setUp() throws IOException { File root = folder.newFolder(); manager = new DataStore(root, 256, null, Long.MAX_VALUE, null); - webapp = new Webapp(manager, false, Collections.emptyMap(), null, Collections.emptyMap(), 10000, new QueryConfig()); + webapp = new Webapp(manager, false, Collections.emptyMap(), null, Collections.emptyMap(), 10000, new QueryConfig(), null); } @After diff --git a/test/outbackcdx/WebappTest.java b/test/outbackcdx/WebappTest.java index d82c2e9..6c4f3d2 100644 --- a/test/outbackcdx/WebappTest.java +++ b/test/outbackcdx/WebappTest.java @@ -46,7 +46,7 @@ public void setUp() throws IOException, ConfigurationException { UrlCanonicalizer canon = new UrlCanonicalizer(new ByteArrayInputStream(yaml.getBytes(UTF_8))); DataStore manager = new DataStore(root, -1, null, Long.MAX_VALUE, canon); - webapp = new Webapp(manager, false, Collections.emptyMap(), canon, Collections.emptyMap(), 10000, new QueryConfig()); + webapp = new Webapp(manager, false, Collections.emptyMap(), canon, Collections.emptyMap(), 10000, new QueryConfig(), null); } @After