Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding backpressure b/w BatchParserCMD and GoogleGeocoder #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 62 additions & 13 deletions src/main/scala/BatchParserCmd.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import akka.actor.ActorSystem

import akka.actor._

import scala.io.StdIn

Expand All @@ -13,6 +14,9 @@ object BatchParserCmd {
tableName: String = ""
)

case class AvailableQuota(availableRequests: Int)
case object StartParsing

val parser = new scopt.OptionParser[Config]("BatchParserCmd") {
override def showUsageOnError = true

Expand Down Expand Up @@ -40,43 +44,58 @@ object BatchParserCmd {
version("version")
}

var system: ActorSystem = null
var db: ActorRef = null
var addressParser: ActorRef = null
var googleGeocoder: ActorRef = null
var batchParserCmd: ActorRef = null

var tempListAddresses: List[(Int, String)] = null

def main(args: Array[String]) {
parser.parse(args, Config()) match {
case Some(config) =>
println("+++ config: " + config)

require(config.op == "googleQueryAndParse" || config.op == "googleQueryOnly" || config.op == "parseOnly")

val system: ActorSystem = ActorSystem("System")
val parseAddress = config.op == "googleQueryAndParse"
setupActorSystem(config.googleApiKey, config.maxGoogleAPIOpenRequests, config.maxGoogleAPIFatalErrors, parseAddress, config.dbUrl, config.tableName)
try {
if (config.op == "googleQueryAndParse" || config.op == "googleQueryOnly") {
val parseAddress = config.op == "googleQueryAndParse"
googleQueryAndParse(system, config.maxEntries, config.googleApiKey, config.maxGoogleAPIOpenRequests, config.maxGoogleAPIFatalErrors, parseAddress, config.dbUrl, config.tableName)

googleQueryAndParse(config.maxEntries, config.dbUrl, config.tableName)
} else {
parseOnly(system, config.maxEntries, config.dbUrl, config.tableName)
}
println(">>> Press ENTER to exit <<<")
StdIn.readLine()
} finally {
println("Terminating Actor System")
system.terminate()
}
case None => sys.exit(1)
}
}

def googleQueryAndParse(system: ActorSystem, maxEntries: Int, googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, parseAddress: Boolean, dbUrl: String, tableName: String) {
def setupActorSystem(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, parseAddress: Boolean, dbUrl: String, tableName: String): Unit = {
system = ActorSystem("System")
db = system.actorOf(DB.props(dbUrl, tableName), "DB")
batchParserCmd = system.actorOf(BatchParserCmd.props(), "BatchParserCmd")
addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser")
googleGeocoder = system.actorOf(GoogleGeocoder.props(googleApiKey, maxOpenRequests: Int, maxFatalErrors: Int, db, addressParser, parseAddress, batchParserCmd), "GoogleAPI")
}

def googleQueryAndParse(maxEntries: Int, dbUrl: String, tableName: String) {
val conn = Utils.getDbConnection(dbUrl)
val unformattedAddresses: List[(Int, String)] = try {
DB.getAddressesWithEmptyGoogleResponseFromDatabase(tableName, maxEntries)(conn)
} finally { conn.close() }

println(s"num unformattedAddresses to query: ${unformattedAddresses.length}")

val db = system.actorOf(DB.props(dbUrl, tableName), "DB")
val addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser")
val googleGeocoder = system.actorOf(GoogleGeocoder.props(googleApiKey, maxOpenRequests: Int, maxFatalErrors: Int, db, addressParser, parseAddress), "GoogleAPI")

unformattedAddresses.foreach { case (id, unformattedAddress) => googleGeocoder ! GoogleGeocoder.GeoCode(id, unformattedAddress) }
tempListAddresses = unformattedAddresses
batchParserCmd ! StartParsing
}

def parseOnly(system: ActorSystem, maxEntries: Int, dbUrl: String, tableName: String) {
Expand All @@ -87,9 +106,39 @@ object BatchParserCmd {

println(s"num googleResponses: ${googleResponses.length}")

val db = system.actorOf(DB.props(dbUrl, tableName), "DB")
val addressParser = system.actorOf(AddressParserActor.props(db), "AddressParser")

googleResponses.foreach { case (id, googleResponse) => addressParser ! AddressParserActor.ParseAddress(id, googleResponse) }
}

def props(): Props =
Props(new BatchParserCmd())
}

case class BatchParserCmd() extends Actor with ActorLogging {
import BatchParserCmd._

def receive = {
case StartParsing => {
sendForParsing(1)
}

case AvailableQuota(availableRequests) => {
if (availableRequests <= 0) {
log.info(s"Available quota is $availableRequests")
} else {
sendForParsing(availableRequests)
}
}

case m => log.info("unexpected message: " + Utils.textSample(m))
}

def sendForParsing(batchSize: Int): Unit = {
if (tempListAddresses == null || tempListAddresses.isEmpty) {
return ;
}
tempListAddresses.slice(0, batchSize + 1).foreach {
case (id, unformattedAddresses) => googleGeocoder ! GoogleGeocoder.GeoCode(id, unformattedAddresses)
}
tempListAddresses = tempListAddresses.slice(batchSize, tempListAddresses.size)
}
}
13 changes: 10 additions & 3 deletions src/main/scala/GoogleGeocoder.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import DB.SaveGoogleResponse
import Utils.textSample
import BatchParserCmd.AvailableQuota
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Status}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import akka.util.ByteString

object GoogleGeocoder {
def props(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean): Props =
Props(new GoogleGeocoder(googleApiKey, maxOpenRequests, maxFatalErrors, db, addressParser, parseAddress))
def props(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean, batchParserCmd: ActorRef): Props =
Props(new GoogleGeocoder(googleApiKey, maxOpenRequests, maxFatalErrors, db, addressParser, parseAddress, batchParserCmd))

final case class GeoCode(id: Int, unformattedAddress: String)
}

class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean) extends Actor with ActorLogging {
class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors: Int, db: ActorRef, addressParser: ActorRef, parseAddress: Boolean, batchParserCmd: ActorRef) extends Actor with ActorLogging {
import GoogleGeocoder._
import AddressParserActor._
import akka.pattern.pipe
Expand All @@ -24,6 +25,8 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors:
val http = Http(context.system)

var numFatalErrors = 0
var numTotalRequests = 0
val controlFactor = 10

val queue = new scala.collection.mutable.Queue[(Int, String)]
var numOpenRequests = 0
Expand All @@ -32,6 +35,7 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors:

def receive = {
case GeoCode(id, unformattedAddress) =>
numTotalRequests += 1
if (numFatalErrors < maxFatalErrors) {
log.info(s"GeoCode #$id: $unformattedAddress")
if (numOpenRequests < maxOpenRequests)
Expand All @@ -41,6 +45,8 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors:
} else {
log.info(s"GeoCode. ignored because of MaxFatalErrors")
}
sender ! AvailableQuota((maxOpenRequests - controlFactor * maxOpenRequests * (numFatalErrors / (numTotalRequests * 1.0)) - numOpenRequests).toInt)


case (id: Int, resp @ HttpResponse(StatusCodes.OK, headers, entity, _)) =>
log.info(s"Success response coming for #$id")
Expand Down Expand Up @@ -94,6 +100,7 @@ class GoogleGeocoder(googleApiKey: String, maxOpenRequests: Int, maxFatalErrors:
val (id: Int, unformattedAddress: String) = queue.dequeue
query(id, unformattedAddress)
}
batchParserCmd ! AvailableQuota((maxOpenRequests - controlFactor * maxOpenRequests * (numFatalErrors / (numTotalRequests * 1.0)) - numOpenRequests).toInt)
}

def fatalError() { // TODO: I get several fatalError #0. not thead-safe?!
Expand Down