Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual client refresh required after FlightRuntimeException with FlightStatusCode.UNAVAILABLE due to ConnectTimeoutException #129

Open
mps2209 opened this issue Apr 25, 2024 · 10 comments
Assignees
Labels
bug Something isn't working

Comments

@mps2209
Copy link

mps2209 commented Apr 25, 2024

Specifications

  • Client Version: 0.7.0
  • InfluxDB Version: v3
  • Platform: spring webflux

Code sample to reproduce problem

Its hard to reproduce the error, because we do not know why the exception is thrown in the first place. We suspect that some configuration change on influx side, causes the client to run into the timeout exception.

This was our setup:
We initialize our client as a bean and inject it into our services.

    @Bean
    public InfluxDBClient influxDB(InfluxConfiguration influxConfig) {
        return InfluxDBClient.getInstance(influxConfig.getHost(), influxConfig.getToken().toCharArray(), influxConfig.getDatabase());
    }

But sometimes something goes wrong an this exception is thrown. Then it was the case, that the client got stuck in this error state and threw this exception with each request.

Caused by: org.apache.arrow.flight.FlightRuntimeException: UNAVAILABLE: io exception
	at org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:131)
	at org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:164)
	at org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:185)
	at org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:456)
	at org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:350)

The client works again after restarting the service.

Expected behavior

We expect not having to create a new client for every request. We load-tested this approach and it was noticeably slower. Therefore, we initialize the client only once, at startup of our service.

Actual behavior

Once the exception from the code example is thrown due to some change on influx side, the client gets stuck in this error mode. We had to implement a proxy/factory that creates a new client when this exception is thrown. We are unsure what is the intended way of using the client. To create a new client for every request, as the code example in the readme shows, seems not feasible:
try (InfluxDBClient client = InfluxDBClient.getInstance(host, token, database))

Additional info

No response

@mps2209 mps2209 added the bug Something isn't working label Apr 25, 2024
@bednar
Copy link
Member

bednar commented Apr 25, 2024

Hi @mps2209,

Thank you for using our client and for bringing this issue to our attention.

The client should be designed to recover from a ConnectTimeoutException, and subsequent uses should function correctly without interruption. I appreciate you pointing out that this may not works as expected, and I will prioritize investigating this issue as soon as possible.

To assist in diagnosing the problem effectively, do you know any specific steps that could help us easily replicate this behavior?

Best Regards

@bednar bednar self-assigned this Apr 25, 2024
@mps2209
Copy link
Author

mps2209 commented Apr 30, 2024

Thank you for giving our issue attention. Regrettably I cant really provide anymore information, except that we have multiple services, and when it happens, it happens to several of them. We just had it happen again, so sadly the fix on our side did not work. We are currently investigating and I will update if we find out anything.
Maybe the time frame can help you if something on influx side changed during that time. It must have happened after 27.04.24 19:50 UTC and before 28.04.24 5:14 UTC That was the last successful and the first failed request from our services.

@karel-rehor
Copy link
Contributor

Hi @mps2209

I'm trying to recreate this issue. I've managed to get a stack trace with the first six lines you've included above, however instead of a ConnectTimeoutException the recreation is with the underlying exception io.netty.channel.AbstractChannel$AnnotatedConnectException, so I doubt this is a true recreation of the original issue. I'm assuming from the issue description that the base exception thrown is io.netty.channel.ConnectTimeoutException. Can you add a full stack trace with the underlying exception ConnectTimeoutException?

Thanks

@mps2209
Copy link
Author

mps2209 commented May 14, 2024

This is the most I can extract from our logs:
Original Stack Trace:

....
        at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:241)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
        at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
        at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
        at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.arrow.flight.FlightRuntimeException: UNAVAILABLE: io exception
    at org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:131)
    at org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:164)
    at org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:185)
    at org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:456)
    at org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:350)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at org.apache.arrow.flight.grpc.ClientInterceptorAdapter$FlightClientCallListener.onClose(ClientInterceptorAdapter.java:117)
    at io.opentelemetry.javaagent.shaded.instrumentation.grpc.v1_6.TracingClientInterceptor$TracingClientCall$TracingClientCallListener.onClose(TracingClientInterceptor.java:158)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out after 30000 ms: 84984b87-7c2b-4d60-8f1a-964a1c1cda4f.a.influxdb.io/3.127.138.48:443
    at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$2.run(AbstractEpollChannel.java:613)
    at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:416)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

I hope this helps

@bednar
Copy link
Member

bednar commented May 22, 2024

Hi @mps2209,

Unfortunately, we are unable to replicate the issue you described in our testing environment. Our client successfully recovers from network exceptions as expected, which suggests there might be specific factors in your setup influencing the behavior you're encountering.

To help us understand and potentially solve the problem, could you provide more details about your implementation? Specifically:

  1. Code Integration: How are you integrating the client with reactor? Sharing the relevant parts of your code where the client is set up and used might help us identify any integration issues.

  2. Error Handling: How do you handle errors from the client within your application?

If possible, please include code snippets or a broader description of your architecture, as these details will be crucial in diagnosing the issue more effectively.

Thank you for your cooperation, and looking forward to your response.

Best regards

@mps2209
Copy link
Author

mps2209 commented May 22, 2024

Im not sure what you mean with reactor, but we used to
inject the client like this:

@Bean
public InfluxDBClient influxDB(InfluxConfiguration influxConfig) {
    return InfluxDBClient.getInstance(influxConfig.getHost(), influxConfig.getToken().toCharArray(), influxConfig.getDatabase());
}

And query influx like this:

    try (var stream = queryExecutor.execute(query, queryOptions)) {
        ...
    } catch (Exception ex) {
        ...
    }
}
 public Stream<PointValues> execute(String sqlQuery, QueryOptions queryOptions) {
    return influxClient.queryPoints(sqlQuery, queryOptions);
}

We have now changed this to circumvent the timeout error and instead of the @bean we implemented a Proxy/Factory and are calling the refresh client method of the proxy, in the catch block from above.

public class InfluxDBClientProxy {

private final InfluxConfiguration influxConfig;
private volatile InfluxDBClient influxDBClient;

public synchronized void refreshClient() {
    log.warn("refreshing the client");
    if (influxDBClient != null) {
        try {
            influxDBClient.close();
        } catch (Exception e) {
            log.debug("Could not close influx db client: {}", e.getMessage());
        }
    }
    this.influxDBClient = null;
}

public InfluxDBClient getClient() {
    if (influxDBClient == null) {
        synchronized (this) {
            if (influxDBClient == null) {
                this.influxDBClient = createNewClient();
            }
        }
    }
    return influxDBClient;
}

protected InfluxDBClient createNewClient() {
    return InfluxDBClient.getInstance(influxConfig.getHost(), influxConfig.getToken().toCharArray(), influxConfig.getDatabase());
}
}

Hope this helps!
kind regards

@bednar
Copy link
Member

bednar commented May 22, 2024

@mps2209, Thanks for providing detailed information. To further understand the context and possibly identify any compatibility or configuration issues, could you let us know which version of Spring you are using? Additionally, are you using Spring Boot in your project?

@mps2209
Copy link
Author

mps2209 commented May 22, 2024

@bednar yes we are using spring boot, here are the relevant entries in our pom.xml:

	<artifactId>spring-boot-starter-parent</artifactId>
	<version>3.2.4</version>
	
	<java.version>17</java.version>
	
	<!-- influx -->
	<dependency>
		<groupId>org.influxdb</groupId>
		<artifactId>influxdb-java</artifactId>
		<version>2.23</version>
	</dependency>
	<!-- influxdb3 used for SQL queries -->
	<dependency>
		<groupId>com.influxdb</groupId>
		<artifactId>influxdb3-java</artifactId>
		<version>0.7.0</version>
	</dependency>

@mps2209
Copy link
Author

mps2209 commented May 31, 2024

Small update from our side. We have encountered the mentioned exception again, but our fix to refresh the client once we catch an exception seems to have worked. We had an error on all of our stages,but some stages showed a different exception. I can't guarantuee that these exceptions would have led to the same behaviour, as our fix seems to work now. But I will share the stacktrace anyway, maybe it helps:
This was the first exception that was new to me:


Original Stack Trace:
        at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:241)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:59)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:97)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:91)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:59)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
        at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
        at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
        at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.arrow.flight.FlightRuntimeException: UNAVAILABLE: io exception
Channel Pipeline: [SslHandler#0, ProtocolNegotiators$ClientTlsHandler#0, WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
    at org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:131)
    at org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:164)
    at org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:185)
    at org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:456)
    at org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:350)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at org.apache.arrow.flight.grpc.ClientInterceptorAdapter$FlightClientCallListener.onClose(ClientInterceptorAdapter.java:117)
    at io.opentelemetry.javaagent.shaded.instrumentation.grpc.v1_6.TracingClientInterceptor$TracingClientCall$TracingClientCallListener.onClose(TracingClientInterceptor.java:159)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path validation failed: java.security.cert.CertPathValidatorException: validity check failed
    at java.base/sun.security.ssl.Alert.createSSLException(Unknown Source)
    at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
    at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
    at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(Unknown Source)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(Unknown Source)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(Unknown Source)
    at java.base/sun.security.ssl.SSLHandshake.consume(Unknown Source)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(Unknown Source)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(Unknown Source)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(Unknown Source)
    at java.base/java.security.AccessController.doPrivileged(Unknown Source)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(Unknown Source)
    at io.netty.handler.ssl.SslHandler$SslTasksRunner.run(SslHandler.java:1889)
    ... 3 more
Caused by: sun.security.validator.ValidatorException: PKIX path validation failed: java.security.cert.CertPathValidatorException: validity check failed
    at java.base/sun.security.validator.PKIXValidator.doValidate(Unknown Source)
    at java.base/sun.security.validator.PKIXValidator.engineValidate(Unknown Source)
    at java.base/sun.security.validator.Validator.validate(Unknown Source)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(Unknown Source)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(Unknown Source)
    ... 13 more
Caused by: java.security.cert.CertPathValidatorException: validity check failed
    at java.base/sun.security.provider.certpath.PKIXMasterCertPathValidator.validate(Unknown Source)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(Unknown Source)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(Unknown Source)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(Unknown Source)
    at java.base/java.security.cert.CertPathValidator.validate(Unknown Source)
    ... 18 more
Caused by: java.security.cert.CertificateExpiredException: NotAfter: Sun Jul 05 12:00:00 GMT 2020
    at java.base/sun.security.x509.CertificateValidity.valid(Unknown Source)
    at java.base/sun.security.x509.X509CertImpl.checkValidity(Unknown Source)
    at java.base/sun.security.provider.certpath.BasicChecker.verifyValidity(Unknown Source)
    at java.base/sun.security.provider.certpath.BasicChecker.check(Unknown Source)
    ... 23 more
"}

The second new exception looked like this:

2024-05-28 00:22:36.143	{"@timestamp":"2024-05-27T22:22:36.139Z",
"error.type":"java.lang.RuntimeException",
"error.message":"java.lang.RuntimeException: java.lang.InterruptedException",
"error.stack_trace":"java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException
    at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
    at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:241)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:59)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:97)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:91)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:59)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

Caused by: java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)
    at java.base/java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at org.apache.arrow.flight.FlightStream.next(FlightStream.java:233)
    ... 31 more
"}

Hope this helps! Thank you!

@bednar
Copy link
Member

bednar commented May 31, 2024

@mps2209 thanks for update!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants