From d970a1606352e3d309c1fd5f315a436d2062d59c Mon Sep 17 00:00:00 2001 From: Raghvendra Singh Date: Fri, 23 Feb 2018 00:26:29 +0530 Subject: [PATCH] adding backpressure b/w BatchParserCMD and GoogleGeocoder --- src/main/scala/BatchParserCmd.scala | 75 ++++++++++++++++++++++++----- src/main/scala/GoogleGeocoder.scala | 13 +++-- 2 files changed, 72 insertions(+), 16 deletions(-) diff --git a/src/main/scala/BatchParserCmd.scala b/src/main/scala/BatchParserCmd.scala index df5010b..0cc8d68 100644 --- a/src/main/scala/BatchParserCmd.scala +++ b/src/main/scala/BatchParserCmd.scala @@ -1,4 +1,5 @@ -import akka.actor.ActorSystem + +import akka.actor._ import scala.io.StdIn @@ -13,6 +14,9 @@ object BatchParserCmd { tableName: String = "" ) + case class AvailableQuota(availableRequests: Int) + case object StartParsing + val parser = new scopt.OptionParser[Config]("BatchParserCmd") { override def showUsageOnError = true @@ -40,6 +44,14 @@ object BatchParserCmd { version("version") } + var system: ActorSystem = null + var db: ActorRef = null + var addressParser: ActorRef = null + var googleGeocoder: ActorRef = null + var batchParserCmd: ActorRef = null + + var tempListAddresses: List[(Int, String)] = null + def main(args: Array[String]) { parser.parse(args, Config()) match { case Some(config) => @@ -47,24 +59,34 @@ object BatchParserCmd { require(config.op == "googleQueryAndParse" || config.op == "googleQueryOnly" || config.op == "parseOnly") - val system: ActorSystem = ActorSystem("System") + val parseAddress = config.op == "googleQueryAndParse" + setupActorSystem(config.googleApiKey, config.maxGoogleAPIOpenRequests, config.maxGoogleAPIFatalErrors, parseAddress, config.dbUrl, config.tableName) try { if (config.op == "googleQueryAndParse" || config.op == "googleQueryOnly") { - val parseAddress = config.op == "googleQueryAndParse" - googleQueryAndParse(system, config.maxEntries, config.googleApiKey, config.maxGoogleAPIOpenRequests, config.maxGoogleAPIFatalErrors, parseAddress, config.dbUrl, config.tableName) + + googleQueryAndParse(config.maxEntries, config.dbUrl, config.tableName) } else { parseOnly(system, config.maxEntries, config.dbUrl, config.tableName) } println(">>> Press ENTER to exit <<<") StdIn.readLine() } finally { + println("Terminating Actor System") system.terminate() } case None => sys.exit(1) } } - def googleQueryAndParse(system: ActorSystem, maxEntries: Int, googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, parseAddress: Boolean, dbUrl: String, tableName: String) { + def setupActorSystem(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, parseAddress: Boolean, dbUrl: String, tableName: String): Unit = { + system = ActorSystem("System") + db = system.actorOf(DB.props(dbUrl, tableName), "DB") + batchParserCmd = system.actorOf(BatchParserCmd.props(), "BatchParserCmd") + addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser") + googleGeocoder = system.actorOf(GoogleGeocoder.props(googleApiKey, maxOpenRequests: Int, maxFatalErrors: Int, db, addressParser, parseAddress, batchParserCmd), "GoogleAPI") + } + + def googleQueryAndParse(maxEntries: Int, dbUrl: String, tableName: String) { val conn = Utils.getDbConnection(dbUrl) val unformattedAddresses: List[(Int, String)] = try { DB.getAddressesWithEmptyGoogleResponseFromDatabase(tableName, maxEntries)(conn) @@ -72,11 +94,8 @@ object BatchParserCmd { println(s"num unformattedAddresses to query: ${unformattedAddresses.length}") - val db = system.actorOf(DB.props(dbUrl, tableName), "DB") - val addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser") - val googleGeocoder = system.actorOf(GoogleGeocoder.props(googleApiKey, maxOpenRequests: Int, maxFatalErrors: Int, db, addressParser, parseAddress), "GoogleAPI") - - unformattedAddresses.foreach { case (id, unformattedAddress) => googleGeocoder ! GoogleGeocoder.GeoCode(id, unformattedAddress) } + tempListAddresses = unformattedAddresses + batchParserCmd ! StartParsing } def parseOnly(system: ActorSystem, maxEntries: Int, dbUrl: String, tableName: String) { @@ -87,9 +106,39 @@ object BatchParserCmd { println(s"num googleResponses: ${googleResponses.length}") - val db = system.actorOf(DB.props(dbUrl, tableName), "DB") - val addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser") - googleResponses.foreach { case (id, googleResponse) => addressParser ! AddressParserActor.ParseAddress(id, googleResponse) } } + + def props(): Props = + Props(new BatchParserCmd()) +} + +case class BatchParserCmd() extends Actor with ActorLogging { + import BatchParserCmd._ + + def receive = { + case StartParsing => { + sendForParsing(1) + } + + case AvailableQuota(availableRequests) => { + if (availableRequests <= 0) { + log.info(s"Available quota is $availableRequests") + } else { + sendForParsing(availableRequests) + } + } + + case m => log.info("unexpected message: " + Utils.textSample(m)) + } + + def sendForParsing(batchSize: Int): Unit = { + if (tempListAddresses == null || tempListAddresses.isEmpty) { + return ; + } + tempListAddresses.slice(0, batchSize + 1).foreach { + case (id, unformattedAddresses) => googleGeocoder ! GoogleGeocoder.GeoCode(id, unformattedAddresses) + } + tempListAddresses = tempListAddresses.slice(batchSize, tempListAddresses.size) + } } diff --git a/src/main/scala/GoogleGeocoder.scala b/src/main/scala/GoogleGeocoder.scala index d7cf201..d541e45 100644 --- a/src/main/scala/GoogleGeocoder.scala +++ b/src/main/scala/GoogleGeocoder.scala @@ -1,5 +1,6 @@ import DB.SaveGoogleResponse import Utils.textSample +import BatchParserCmd.AvailableQuota import akka.actor.{Actor, ActorLogging, ActorRef, Props, Status} import akka.http.scaladsl.Http import akka.http.scaladsl.model._ @@ -7,13 +8,13 @@ import akka.stream.{ActorMaterializer, ActorMaterializerSettings} import akka.util.ByteString object GoogleGeocoder { - def props(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean): Props = - Props(new GoogleGeocoder(googleApiKey, maxOpenRequests, maxFatalErrors, db, addressParser, parseAddress)) + def props(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean, batchParserCmd: ActorRef): Props = + Props(new GoogleGeocoder(googleApiKey, maxOpenRequests, maxFatalErrors, db, addressParser, parseAddress, batchParserCmd)) final case class GeoCode(id: Int, unformattedAddress: String) } -class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean) extends Actor with ActorLogging { +class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean, batchParserCmd: ActorRef) extends Actor with ActorLogging { import GoogleGeocoder._ import AddressParserActor._ import akka.pattern.pipe @@ -24,6 +25,8 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: val http = Http(context.system) var numFatalErrors = 0 + var numTotalRequests = 0 + val controlFactor = 10 val queue = new scala.collection.mutable.Queue[(Int, String)] var numOpenRequests = 0 @@ -32,6 +35,7 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: def receive = { case GeoCode(id, unformattedAddress) => + numTotalRequests += 1 if (numFatalErrors < maxFatalErrors) { log.info(s"GeoCode #$id: $unformattedAddress") if (numOpenRequests < maxOpenRequests) @@ -41,6 +45,8 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: } else { log.info(s"GeoCode. ignored because of MaxFatalErrors") } + sender ! AvailableQuota((maxOpenRequests - controlFactor * maxOpenRequests * (numFatalErrors / (numTotalRequests * 1.0)) - numOpenRequests).toInt) + case (id: Int, resp @ HttpResponse(StatusCodes.OK, headers, entity, _)) => log.info(s"Success response coming for #$id") @@ -94,6 +100,7 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: val (id: Int, unformattedAddress: String) = queue.dequeue query(id, unformattedAddress) } + batchParserCmd ! AvailableQuota((maxOpenRequests - controlFactor * maxOpenRequests * (numFatalErrors / (numTotalRequests * 1.0)) - numOpenRequests).toInt) } def fatalError() { // TODO: I get several fatalError #0. not thead-safe?!