Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vertx-kafka (eventloop execute callback) is not the same one ( eventloop execute producer.write) #120

Open
jiangchuan185 opened this issue Jan 7, 2019 · 1 comment

Comments

@jiangchuan185
Copy link

when execute kafak producer write() method,eg: eventloop-1-5 execute wirte method,but the callback is executing in eventloop-3-5, in write method ,i think should add a line like
Context context = vertx.getOrCreateContext(); -->this line is get current eventloop context,
then execute context.runOnContext() method. without this line , it will cause thread safety problems

reference vertx-mongo code :

private <T> SingleResultCallback<T> wrapCallback(Handler<AsyncResult<T>> resultHandler) {
    Context context = vertx.getOrCreateContext();  -->this line is get cuttent eventloop context
    return (result, error) -> {
      context.runOnContext(v -> {
        if (error != null) {
          resultHandler.handle(Future.failedFuture(error));
        } else {
          resultHandler.handle(Future.succeededFuture(result));
        }
      });
    };
  }

KafkaProducer<K, V> write(KafkaProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler);
//Construction method
public KafkaWriteStreamImpl(Context context, Producer<K, V> producer) {
    this.producer = producer;
    this.context = context;
  }

kafka write code:

@Override
  public synchronized KafkaWriteStreamImpl<K, V> write(ProducerRecord<K, V> record, Handler<AsyncResult<RecordMetadata>> handler) {

    int len = this.len(record.value());
    this.pending += len;
    this.context.<RecordMetadata>executeBlocking(fut -> {
      try {
        this.producer.send(record, (metadata, err) -> {

          // callback from IO thread
          this.context.runOnContext(v1 -> {
            synchronized (KafkaWriteStreamImpl.this) {

              // if exception happens, no record written
              if (err != null) {

                if (this.exceptionHandler != null) {
                  Handler<Throwable> exceptionHandler = this.exceptionHandler;
                  this.context.runOnContext(v2 -> exceptionHandler.handle(err));
                }
              }

              long lowWaterMark = this.maxSize / 2;
              this.pending -= len;
              if (this.pending < lowWaterMark && this.drainHandler != null) {
                Handler<Void> drainHandler = this.drainHandler;
                this.drainHandler = null;
                this.context.runOnContext(drainHandler);
              }
            }

            if (handler != null) {
              handler.handle(err != null ? Future.failedFuture(err) : Future.succeededFuture(metadata));
            }
          });
        });
      } catch (Throwable e) {
        exceptionHandler.handle(e);
      }
    }, null);

    return this;
  }
@jiangchuan185 jiangchuan185 changed the title vertx-kakfa 回调中所使用的线程与调用kafka-client的线程不一致 vertx-kakfa (eventloop execute callback) is not the same one ( eventloop execute producer.write) Jan 7, 2019
@jiangchuan185 jiangchuan185 changed the title vertx-kakfa (eventloop execute callback) is not the same one ( eventloop execute producer.write) vertx-kafka (eventloop execute callback) is not the same one ( eventloop execute producer.write) Jan 7, 2019
@vietj vietj transferred this issue from vert-x3/issues Jan 9, 2019
@vietj
Copy link
Contributor

vietj commented Jan 9, 2019

ping @ppatierno

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants