Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new rpc method for downloading files #33065

Merged
merged 5 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.yahoo.vespa.config.protocol.Payload;

import java.time.Duration;
import java.util.List;

/**
* For unit testing
Expand Down Expand Up @@ -72,6 +73,9 @@ public int getSize() {
return numSpecs;
}

@Override
public List<Connection> connections() { return List.of(this); }

public int getNumberOfRequests() {
return numberOfRequests;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.config;

import java.util.List;

/**
* @author hmusum
*/
Expand All @@ -20,4 +22,6 @@ public interface ConnectionPool extends AutoCloseable {

int getSize();

List<Connection> connections();

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,7 @@ public int getSize() {
}
}

@Override
public List<Connection> connections() { return List.copyOf(connections.values()); }

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.yahoo.config.model.api.FileDistribution;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Transport;
import com.yahoo.vespa.flags.FlagSource;

import java.io.File;

/**
Expand All @@ -19,21 +21,23 @@ public class FileDistributionFactory implements AutoCloseable {

protected final ConfigserverConfig configserverConfig;
protected final FileDirectory fileDirectory;
private final FlagSource flagSource;
private final Supervisor supervisor = new Supervisor(new Transport("filedistribution"));


@Inject
public FileDistributionFactory(ConfigserverConfig configserverConfig, FileDirectory fileDirectory) {
public FileDistributionFactory(ConfigserverConfig configserverConfig, FileDirectory fileDirectory, FlagSource flagSource) {
this.configserverConfig = configserverConfig;
this.fileDirectory = fileDirectory;
this.flagSource = flagSource;
}

public FileRegistry createFileRegistry(File applicationPackage) {
return new FileDBRegistry(createFileManager(applicationPackage));
}

public FileDistribution createFileDistribution() {
return new FileDistributionImpl(supervisor);
return new FileDistributionImpl(supervisor, flagSource);
}

public AddFileInterface createFileManager(File applicationDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import com.yahoo.jrt.RequestWaiter;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.vespa.flags.FlagSource;
import com.yahoo.vespa.flags.Flags;

import java.time.Duration;
import java.util.Set;
Expand All @@ -21,12 +24,14 @@
public class FileDistributionImpl implements FileDistribution, RequestWaiter {

private final static Logger log = Logger.getLogger(FileDistributionImpl.class.getName());
private final static Duration rpcTimeout = Duration.ofSeconds(1);
private final static Duration rpcTimeout = Duration.ofSeconds(11);

private final Supervisor supervisor;
private final FlagSource flagSource;

public FileDistributionImpl(Supervisor supervisor) {
public FileDistributionImpl(Supervisor supervisor, FlagSource flagSource) {
this.supervisor = supervisor;
this.flagSource = flagSource;
}

/**
Expand All @@ -39,11 +44,28 @@ public FileDistributionImpl(Supervisor supervisor) {
*/
@Override
public void triggerDownload(String hostName, int port, Set<FileReference> fileReferences) {
if (Flags.CONFIG_SERVER_TRIGGER_DOWNLOAD_WITH_SOURCE.bindTo(flagSource).value())
triggerDownloadIncludeHost(hostName, port, fileReferences);
else {
Target target = supervisor.connect(new Spec(hostName, port));
Request request = new Request("filedistribution.setFileReferencesToDownload");
request.setContext(target);
request.parameters()
.add(new StringArray(fileReferences.stream()
.map(FileReference::value)
.toArray(String[]::new)));
log.log(Level.FINE, () -> "Executing " + request.methodName() + " against " + target + ": " + fileReferences);
target.invokeAsync(request, rpcTimeout, this);
}
}

private void triggerDownloadIncludeHost(String hostName, int port, Set<FileReference> fileReferences) {
Target target = supervisor.connect(new Spec(hostName, port));
Request request = new Request("filedistribution.setFileReferencesToDownload");
Request request = new Request("filedistribution.triggerDownload");
request.setContext(target);
request.parameters().add(new StringArray(fileReferences.stream().map(FileReference::value).toArray(String[]::new)));
log.log(Level.FINE, () -> "Executing " + request.methodName() + " against " + target);
request.parameters().add(new StringValue(new Spec(com.yahoo.net.HostName.getLocalhost(), 19070).toString()));
log.log(Level.FINE, () -> "Executing " + request.methodName() + " against " + target + ": " + fileReferences);
target.invokeAsync(request, rpcTimeout, this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ private void setFileReferencesToDownload(Request req) {
.forEach(fileReference -> downloadFromSource(fileReference, client.toString(), peerSpec));
req.returnValues().add(new Int32Value(0));
});
req.returnValues().add(new Int32Value(0));
req.returnRequest();
}

private void triggerDownload(Request req) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public TenantRepository(HostRegistry hostRegistry,
metrics,
new StripedExecutor<>(),
new StripedExecutor<>(),
new FileDistributionFactory(configserverConfig, fileDirectory),
new FileDistributionFactory(configserverConfig, fileDirectory, flagSource),
flagSource,
Executors.newFixedThreadPool(1, ThreadFactoryFactory.getThreadFactory(TenantRepository.class.getName())),
secretStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.config.application.api.FileRegistry;
import com.yahoo.vespa.flags.InMemoryFlagSource;

import java.io.File;

Expand All @@ -12,7 +13,7 @@
public class MockFileDistributionFactory extends FileDistributionFactory {

public MockFileDistributionFactory(ConfigserverConfig configserverConfig) {
super(configserverConfig, new FileDirectory(configserverConfig));
super(configserverConfig, new FileDirectory(configserverConfig), new InMemoryFlagSource());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mock;
import org.xml.sax.SAXException;
import java.io.IOException;
import java.time.Clock;
Expand Down Expand Up @@ -220,7 +219,7 @@ public FailingDuringBootstrapTenantRepository(ConfigserverConfig configserverCon
Metrics.createTestMetrics(),
new StripedExecutor<>(new InThreadExecutorService()),
new StripedExecutor<>(new InThreadExecutorService()),
new FileDistributionFactory(configserverConfig, new FileDirectory(configserverConfig)),
new FileDistributionFactory(configserverConfig, new FileDirectory(configserverConfig), new InMemoryFlagSource()),
flagSource,
new InThreadExecutorService(),
mockSecretStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ public Builder withZone(Zone zone) {

public TenantRepository build() {
if (fileDistributionFactory == null)
fileDistributionFactory = new FileDistributionFactory(configserverConfig, new FileDirectory(configserverConfig));
fileDistributionFactory = new FileDistributionFactory(configserverConfig,
new FileDirectory(configserverConfig),
new InMemoryFlagSource());
return new TestTenantRepository(hostRegistry,
curator,
secretStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.yahoo.vespa.defaults.Defaults;
import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -184,6 +185,9 @@ public void close() { }
@Override
public int getSize() { return 0; }

@Override
public List<Connection> connections() { return List.of(); }

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.yahoo.jrt.StringValue;
import com.yahoo.vespa.config.Connection;
import com.yahoo.vespa.config.ConnectionPool;
import com.yahoo.vespa.config.JRTConnectionPool;

import java.io.File;
import java.time.Duration;
Expand Down Expand Up @@ -124,17 +123,13 @@ void startDownloadFromSource(FileReferenceDownload fileReferenceDownload, Spec s
FileReference fileReference = fileReferenceDownload.fileReference();
if (downloads.get(fileReference).isPresent()) return;

// Return early when testing (using mock connection pool)
if (! (connectionPool instanceof JRTConnectionPool)) {
log.log(Level.INFO, () -> "Cannot download using " + connectionPool.getClass().getName());
return;
}

log.log(Level.FINE, () -> "Will download " + fileReference + " with timeout " + downloadTimeout);
for (var source : ((JRTConnectionPool) connectionPool).getSources()) {
if (source.getTarget().peerSpec().equals(spec))
for (var connection : connectionPool.connections()) {
if (connection.getAddress().equals(spec.toString()))
downloadExecutor.submit(() -> {
startDownloadRpc(fileReferenceDownload, 1, source);
log.log(Level.FINE, () -> "Will download " + fileReference + " with timeout " + downloadTimeout + " from " + spec);
downloads.add(fileReferenceDownload);
startDownloadRpc(fileReferenceDownload, 1, connection);
downloads.remove(fileReference);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ public int getSize() {
return 1;
}

@Override
public List<Connection> connections() { return List.of();}

void setResponseHandler(ResponseHandler responseHandler) {
this.responseHandler = responseHandler;
}
Expand Down
6 changes: 6 additions & 0 deletions flags/src/main/java/com/yahoo/vespa/flags/Flags.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ public class Flags {
"Use legacy trust store for CA, or new one",
"Takes effect on restart of OCI containers");

public static final UnboundBooleanFlag CONFIG_SERVER_TRIGGER_DOWNLOAD_WITH_SOURCE = defineFeatureFlag(
"config-server-trigger-download-with-source", false,
List.of("hmusum"), "2024-12-25", "2025-02-01",
"Use new RPC method for triggering download of file reference",
"Takes effect immediately");

/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, List<String> owners,
String createdAt, String expiresAt, String description,
Expand Down
Loading