Skip to content

Commit

Permalink
refactor: KubernetesHelper.printLogsAsync reused common tools
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Nuri <[email protected]>
  • Loading branch information
manusa committed Jan 11, 2024
1 parent b47fa4a commit 7956212
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ private static class ExecutorServiceHolder {
public static final ExecutorService INSTANCE = Executors.newCachedThreadPool();
}

public static <T> CompletableFuture<T> async(Callable<T> callable) {
public static <T> CompletableFuture<T> async(ThrowingFunction<CompletableFuture<T>, T> function) {
final CompletableFuture<T> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try {
future.complete(callable.call());
future.complete(function.apply(future));
} catch (Exception ex) {
future.completeExceptionally(ex);
}
Expand All @@ -51,6 +51,10 @@ public static <T> CompletableFuture<T> async(Callable<T> callable) {
return future;
}

public static <T> CompletableFuture<T> async(Callable<T> callable) {
return async(f -> callable.call());
}

public static <T> Function<Predicate<T>, CompletableFuture<T>> await(Supplier<T> supplier) {
return predicate -> async(() -> {
T ret;
Expand All @@ -73,4 +77,9 @@ public static <T> T get(CompletableFuture<T> completableFuture, Duration duratio
throw new IllegalStateException("Failure while waiting to get future ", e);
}
}

@FunctionalInterface
public interface ThrowingFunction<T, R> {
R apply(T t) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
package org.eclipse.jkube.kit.common.util;


import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
Expand All @@ -35,7 +33,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -45,6 +44,8 @@
import io.fabric8.kubernetes.api.model.authorization.v1.SelfSubjectAccessReview;
import io.fabric8.kubernetes.api.model.authorization.v1.SelfSubjectAccessReviewBuilder;
import io.fabric8.kubernetes.client.utils.ApiVersionUtil;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.eclipse.jkube.kit.common.KitLogger;

import io.fabric8.kubernetes.api.model.Container;
Expand Down Expand Up @@ -89,6 +90,8 @@
import io.fabric8.openshift.api.model.DeploymentConfigSpec;
import org.apache.commons.lang3.StringUtils;

import static org.eclipse.jkube.kit.common.util.AsyncUtil.async;


public class KubernetesHelper {
protected static final String DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ssX";
Expand Down Expand Up @@ -383,32 +386,23 @@ public static String getBuildStatusPhase(Build build) {
return null;
}

public static void printLogsAsync(LogWatch logWatcher, final String failureMessage, final CountDownLatch terminateLatch, final KitLogger log) {
final InputStream in = logWatcher.getOutput();
Thread thread = new Thread() {
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
while (true) {
String line = reader.readLine();
if (line == null) {
return;
}
if (terminateLatch.getCount() <= 0L) {
return;
}
log.info("[[s]]%s", line);
}
} catch (IOException e) {
// Check again the latch which could be already count down to zero in between
// so that an IO exception occurs on read
if (terminateLatch.getCount() > 0L) {
log.error("%s : %s", failureMessage, e);
}
}
}
};
thread.start();
public static CompletableFuture<Void> printLogsAsync(LogWatch logWatcher, Consumer<String> lineConsumer) {
final LineIterator it = IOUtils.lineIterator(logWatcher.getOutput(), StandardCharsets.UTF_8);
return async(cf -> {
while (it.hasNext()) {
final String line = it.nextLine();
if (line == null) {
cf.complete(null);
} else {
lineConsumer.accept(line);
}
// Can be completed internally or externally
if (cf.isDone()) {
return null;
}
}
return null;
});
}

public static String getBuildStatusReason(Build build) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,12 @@ private void watchLog(final LogWatch logWatcher, String podName, final String fa
context.getNewPodLog().info("Press Ctrl-C to " + ctrlCMessage);
context.getNewPodLog().info("");

KubernetesHelper.printLogsAsync(logWatcher, failureMessage, this.logWatchTerminateLatch, log);
KubernetesHelper.printLogsAsync(logWatcher, line -> log.info("[[s]]%s", line))
.whenComplete((v, t) -> {
if (t != null) {
log.error("%s: %s", failureMessage, t);
}
});
}

private String containerNameMessage(String containerName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,12 @@ private void waitForOpenShiftBuildToComplete(OpenShiftClient client, Build build
waitUntilPodIsReady(buildName + "-build", 120, log);
log.info("Waiting for build " + buildName + " to complete...");
try (LogWatch logWatch = client.pods().inNamespace(applicableOpenShiftNamespace).withName(buildName + "-build").watchLog()) {
KubernetesHelper.printLogsAsync(logWatch,
"Failed to tail build log", logTerminateLatch, log);
KubernetesHelper.printLogsAsync(logWatch, line -> log.info("[[s]]%s", line))
.whenComplete((v, t) -> {
if (t != null) {
log.error("Failed to tail build log: %s", t);
}
});
Watcher<Build> buildWatcher = getBuildWatcher(latch, buildName, buildHolder);
try (Watch watcher = client.builds().inNamespace(applicableOpenShiftNamespace).withName(buildName).watch(buildWatcher)) {
// Check if the build is already finished to avoid waiting indefinitely
Expand Down

0 comments on commit 7956212

Please sign in to comment.