Skip to content

Commit

Permalink
performance improvement for EventHubs writer (#441)
Browse files Browse the repository at this point in the history
  • Loading branch information
sjkwak authored Apr 12, 2019
1 parent d5e2734 commit d9f4fe0
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import org.json4s.jackson.Serialization

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ ArrayBuffer, ListBuffer }
import scala.compat.java8.FutureConverters
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ Await, Future }
import scala.util.{ Failure, Success }

/**
* A [[Client]] which connects to an event hub instance. All interaction
Expand All @@ -48,6 +50,8 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)

private implicit val formats = Serialization.formats(NoTypeHints)

private var pendingWorks = new ListBuffer[Future[Any]]

private var _client: EventHubClient = _

private def client = synchronized {
Expand Down Expand Up @@ -83,17 +87,19 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
p.putAll(properties.get.asJava)
}

if (partition.isDefined) {
val sendTask = if (partition.isDefined) {
if (partitionSender.getPartitionId.toInt != partition.get) {
logInfo("Recreating partition sender.")
createPartitionSender(partition.get)
}
partitionSender.sendSync(event)
partitionSender.send(event)
} else if (partitionKey.isDefined) {
client.sendSync(event, partitionKey.get)
client.send(event, partitionKey.get)
} else {
client.sendSync(event)
client.send(event)
}

pendingWorks += FutureConverters.toScala(sendTask)
}

/**
Expand Down Expand Up @@ -177,10 +183,25 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
*/
override def close(): Unit = {
logInfo("close: Closing EventHubsClient.")

Future.sequence(pendingWorks).onComplete {
case Success(_) => cleanup()
case Failure(e) =>
logError(s"failed to complete pending tasks. $ehConf: ", e)
cleanup()

throw e
}
}

private def cleanup(): Unit = {
pendingWorks.clear()

if (partitionSender != null) {
partitionSender.closeSync()
partitionSender = null
}

if (_client != null) {
ClientConnectionPool.returnClient(ehConf, _client)
_client = null
Expand Down

0 comments on commit d9f4fe0

Please sign in to comment.