Skip to content

Commit

Permalink
feat: flagd file polling for offline mode (#614)
Browse files Browse the repository at this point in the history
Signed-off-by: Kavindu Dodanduwa <[email protected]>
  • Loading branch information
Kavindu-Dodan authored Jan 9, 2024
1 parent 178fd42 commit 5e97b12
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,73 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* File connector reads flag configurations and expose the context through {@code Connector} contract.
* File connector reads flag configurations from a given file, polls for changes and expose the content through
* {@code Connector} contract.
* The implementation is kept minimal and suites testing, local development needs.
*/
@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "PATH_TRAVERSAL_IN"},
justification = "File connector read feature flag from a file source.")
@Slf4j
public class FileConnector implements Connector {

private static final int POLL_INTERVAL_MS = 5000;

private final String flagSourcePath;
private final BlockingQueue<StreamPayload> queue = new LinkedBlockingQueue<>(1);
private boolean shutdown = false;

public FileConnector(final String flagSourcePath) {
this.flagSourcePath = flagSourcePath;
}

/**
* Initialize file connector. Reads content of the provided source file and offer it through queue.
* Initialize file connector. Reads file content, poll for changes and offer content through the queue.
*/
public void init() throws IOException {
final String flagData = new String(Files.readAllBytes(Paths.get(flagSourcePath)), StandardCharsets.UTF_8);
Thread watcherT = new Thread(() -> {
try {
final Path filePath = Paths.get(flagSourcePath);

// initial read
String flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
log.warn("Unable to offer file content to queue: queue is full");
}

long lastTS = Files.getLastModifiedTime(filePath).toMillis();

// start polling for changes
while (!shutdown) {
long currentTS = Files.getLastModifiedTime(filePath).toMillis();

if (currentTS > lastTS) {
lastTS = currentTS;
flagData = new String(Files.readAllBytes(filePath), StandardCharsets.UTF_8);
if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
log.warn("Unable to offer file content to queue: queue is full");
}
}

Thread.sleep(POLL_INTERVAL_MS);
}

if (!queue.offer(new StreamPayload(StreamPayloadType.DATA, flagData))) {
throw new RuntimeException("Unable to write to queue. Queue is full.");
}
log.info("Shutting down file connector.");
} catch (Throwable t) {
log.error("Error from file connector. File connector will exit", t);
if (!queue.offer(new StreamPayload(StreamPayloadType.ERROR, t.toString()))) {
log.warn("Unable to offer file content to queue: queue is full");
}
}
});

watcherT.setDaemon(true);
watcherT.start();
log.info(String.format("Using feature flag configurations from file %s", flagSourcePath));
}

Expand All @@ -50,9 +87,9 @@ public BlockingQueue<StreamPayload> getStream() {
}

/**
* NO-OP shutdown.
* Shutdown file connector.
*/
public void shutdown() throws InterruptedException {
// NO-OP nothing to do here
shutdown = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class TestUtils {
public static final String VALID_LONG = "flagConfigurations/valid-long.json";
public static final String INVALID_FLAG = "flagConfigurations/invalid-flag.json";
public static final String INVALID_CFG = "flagConfigurations/invalid-configuration.json";
public static final String UPDATABLE_FILE = "flagConfigurations/updatableFlags.json";


public static String getFlagsFromResource(final String file) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@

import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;

import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.UPDATABLE_FILE;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.VALID_LONG;
import static dev.openfeature.contrib.providers.flagd.resolver.process.TestUtils.getResourcePath;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;

class FileConnectorTest {
Expand All @@ -39,12 +44,61 @@ void readAndExposeFeatureFlagsFromSource() throws IOException {
}

@Test
void throwsErrorIfInvalidFile(){
void emitErrorStateForInvalidPath() throws IOException {
// given
final FileConnector connector = new FileConnector("INVALID_PATH");

// when
connector.init();

// then
final BlockingQueue<StreamPayload> stream = connector.getStream();

// Must emit an error within considerable time
final StreamPayload[] payload = new StreamPayload[1];
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
payload[0] = stream.take();
});

assertNotNull(payload[0].getData());
assertEquals(StreamPayloadType.ERROR, payload[0].getType());
}

@Test
@Disabled("Disabled as unstable on GH Action. Useful for functionality validation")
void watchForFileUpdatesAndEmitThem() throws IOException {
final String initial = "{\"flags\":{\"myBoolFlag\":{\"state\":\"ENABLED\",\"variants\":{\"on\":true,\"off\":false},\"defaultVariant\":\"on\"}}}";
final String updatedFlags = "{\"flags\":{\"myBoolFlag\":{\"state\":\"ENABLED\",\"variants\":{\"on\":true,\"off\":false},\"defaultVariant\":\"off\"}}}";

// given
final Path updPath = Paths.get(getResourcePath(UPDATABLE_FILE));
Files.write(updPath, initial.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);

final FileConnector connector = new FileConnector(updPath.toString());

// when
connector.init();

// then
assertThrows(IOException.class, connector::init);
final BlockingQueue<StreamPayload> stream = connector.getStream();
final StreamPayload[] payload = new StreamPayload[1];

// first validate the initial payload
assertTimeoutPreemptively(Duration.ofMillis(200), () -> {
payload[0] = stream.take();
});

assertEquals(initial, payload[0].getData());

// then update the flags
Files.write(updPath, updatedFlags.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);

// finally wait for updated payload
assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
payload[0] = stream.take();
});

assertEquals(updatedFlags, payload[0].getData());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"flags": {
"myBoolFlag": {
"state": "ENABLED",
"variants": {
"on": true,
"off": false
},
"defaultVariant": "on"
}
}
}

0 comments on commit 5e97b12

Please sign in to comment.