Skip to content

Commit

Permalink
Merge pull request #374 from ing-bank/DEV-BaaS
Browse files Browse the repository at this point in the history
Baker as a Service
  • Loading branch information
VledicFranco authored Dec 13, 2019
2 parents 2c6693a + e086eca commit 3cf7c09
Show file tree
Hide file tree
Showing 233 changed files with 6,181 additions and 803 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.ing.baker.baas.javadsl

import akka.actor.ActorSystem
import akka.stream.Materializer
import com.ing.baker.baas.scaladsl.{BakerClient => ScalaRemoteBaker}
import com.ing.baker.runtime.javadsl.{Baker => JavaBaker}
import com.ing.baker.runtime.serialization.Encryption

object BakerClient {

def build(hostname: String, actorSystem: ActorSystem, mat: Materializer, encryption: Encryption = Encryption.NoEncryption): JavaBaker =
new JavaBaker(ScalaRemoteBaker.build(hostname, encryption)(actorSystem, mat))
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.ing.baker.baas.akka

import akka.actor.{Actor, ActorRef, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.ing.baker.baas.protocol.ProtocolDistributedEventPublishing
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeEventMetadata}

object EventListenerAgent {

case object CommitTimeout

def apply(recipeName: String, listenerFunction: (RecipeEventMetadata, EventInstance) => Unit): Props =
Props(new EventListenerAgent(recipeName, listenerFunction))
}

class EventListenerAgent(recipeName: String, listenerFunction: (RecipeEventMetadata, EventInstance) => Unit) extends Actor {

val mediator: ActorRef =
DistributedPubSub(context.system).mediator

val eventsTopic: String =
ProtocolDistributedEventPublishing.eventsTopic(recipeName)

def subscribeToEvents(): Unit =
mediator ! DistributedPubSubMediator.Subscribe(eventsTopic, self)

def unsubscribeToEvents(): Unit =
mediator ! DistributedPubSubMediator.Unsubscribe(eventsTopic, self)

subscribeToEvents()

def receive: Receive = {
case ProtocolDistributedEventPublishing.Event(recipeEventMetadata, event) =>
listenerFunction(recipeEventMetadata, event)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.ing.baker.baas.common

import com.ing.baker.runtime.common.{EventInstance, InteractionInstance, RecipeEventMetadata}
import com.ing.baker.runtime.common.LanguageDataStructures.LanguageApi

trait BaaSEventListener[F[_]] extends LanguageApi { self =>

type EventInstanceType <: EventInstance { type Language <: self.Language }

type RecipeEventMetadataType <: RecipeEventMetadata { type Language <: self.Language }

def registerEventListener(recipeName: String, listenerFunction: language.BiConsumerFunction[RecipeEventMetadataType, EventInstanceType]): F[Unit]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.ing.baker.baas.javadsl

import java.util.concurrent.CompletableFuture
import java.util.function.BiConsumer

import akka.actor.ActorSystem
import com.ing.baker.baas.common
import com.ing.baker.baas.scaladsl
import com.ing.baker.runtime.common.LanguageDataStructures.JavaApi
import com.ing.baker.runtime.javadsl.{EventInstance, RecipeEventMetadata}

import scala.compat.java8.FutureConverters

class BaaSEventListener(actorSystem: ActorSystem) extends common.BaaSEventListener[CompletableFuture] with JavaApi {

override type EventInstanceType = EventInstance

override type RecipeEventMetadataType = RecipeEventMetadata

override def registerEventListener(recipeName: String, listenerFunction: BiConsumer[RecipeEventMetadata, EventInstance]): CompletableFuture[Unit] =
FutureConverters.toJava(scaladsl.BaaSEventListener(actorSystem).registerEventListener(recipeName, (metadata, event) => listenerFunction.accept(metadata.asJava, event.asJava))).toCompletableFuture
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.ing.baker.baas.scaladsl

import akka.actor.ActorSystem
import com.ing.baker.baas.akka.EventListenerAgent
import com.ing.baker.baas.common
import com.ing.baker.runtime.common.LanguageDataStructures.ScalaApi
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeEventMetadata}

import scala.concurrent.Future

case class BaaSEventListener(actorSystem: ActorSystem) extends common.BaaSEventListener[Future] with ScalaApi {

override type EventInstanceType = EventInstance

override type RecipeEventMetadataType = RecipeEventMetadata

override def registerEventListener(recipeName: String, listenerFunction: (RecipeEventMetadata, EventInstance) => Unit): Future[Unit] =
Future.successful { actorSystem.actorOf(EventListenerAgent(recipeName, listenerFunction)) }
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.ing.baker.baas.akka

import java.util.UUID

import akka.actor.{Actor, ActorRef, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.ing.baker.baas.akka.InteractionAgent.{CommitTimeout, log}
import com.ing.baker.baas.protocol.{ProtocolInteractionExecution, ProtocolPushPullMatching, ProtocolQuestCommit}
import com.ing.baker.runtime.scaladsl.{EventInstance, InteractionInstance}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object InteractionAgent {

case object CommitTimeout

def apply(instance: InteractionInstance): Props =
Props(new InteractionAgent(instance))

/**
* Closes over the agent actor references, just like the pipe pattern does, except it sends a more expressive
* message in the case of failure.
*
* TODO: Handle invalid ingredients scenario
*
* @param agent actor reference
* @param result outcome of invoking the interaction instance
* @param ec execution context to use
*/
def pipeBackExecutionResponse(agent: ActorRef, mandated: ActorRef)(result: Future[Option[EventInstance]])(implicit ec: ExecutionContext): Unit = {
result.onComplete {
case Success(value) =>
mandated.tell(ProtocolInteractionExecution.InstanceExecutedSuccessfully(value), agent)
case Failure(exception) =>
mandated.tell(ProtocolInteractionExecution.InstanceExecutionFailed(), agent)
}
}

private val log = LoggerFactory.getLogger(classOf[InteractionAgent])
}

class InteractionAgent(interaction: InteractionInstance) extends Actor {

import context.dispatcher

val mediator: ActorRef = DistributedPubSub(context.system).mediator

val pullTopic: String =
ProtocolPushPullMatching.pullTopic(interaction.name)

val pushTopic: String =
ProtocolPushPullMatching.pushTopic(interaction.name)

def pull(): Unit =
mediator ! DistributedPubSubMediator.Publish(pullTopic, ProtocolPushPullMatching.Pull(self))

def subscribePush(): Unit =
mediator ! DistributedPubSubMediator.Subscribe(pushTopic, self)

def unsubscribePush(): Unit =
mediator ! DistributedPubSubMediator.Unsubscribe(pushTopic, self)

subscribePush()
pull()

private val timeout: FiniteDuration = 10.seconds

def receive: Receive = {
case ProtocolPushPullMatching.Push(mandated, uuid) =>
// start Quest commit protocol
log.info(s"${interaction.name}:$uuid: Considering quest from $mandated")
mandated ! ProtocolQuestCommit.Considering(self)
unsubscribePush()
context.system.scheduler.scheduleOnce(timeout, self, CommitTimeout)(context.dispatcher, self)
context.become(considering(uuid))

case ProtocolPushPullMatching.AvailableQuest(mandated, uuid) =>
// start Quest commit protocol
log.info(s"${interaction.name}:$uuid: Considering quest from $mandated")
mandated ! ProtocolQuestCommit.Considering(self)
unsubscribePush()
context.system.scheduler.scheduleOnce(timeout, self, CommitTimeout)(context.dispatcher, self)
context.become(considering(uuid))
}

def considering(uuid: UUID): Receive = {
case ProtocolQuestCommit.Commit(mandated, executeMessage) =>
log.info(s"${interaction.name}:$uuid: Commited to quest from $mandated")
// start the execution protocol by already starting the computation and become committed
InteractionAgent.pipeBackExecutionResponse(self, mandated)(interaction.run(executeMessage.input))
subscribePush()
pull()
context.become(receive)

case ProtocolQuestCommit.QuestTaken =>
log.info(s"${interaction.name}:$uuid: Quest taken, starting the protocol again")
// quest taIken, start all over again
subscribePush()
pull()
context.become(receive)

case CommitTimeout =>
log.info(s"${interaction.name}:$uuid: not received a response after commit timeout")
subscribePush()
pull()
context.become(receive)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.ing.baker.baas.common

import com.ing.baker.runtime.common.InteractionInstance
import com.ing.baker.runtime.common.LanguageDataStructures.LanguageApi

trait BaaSInteractionInstance[F[_]] extends LanguageApi { self =>

type InteractionInstanceType <: InteractionInstance[F] { type Language <: self.Language }

def load(implementation: InteractionInstanceType*): Unit

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.ing.baker.baas.javadsl

import java.util.concurrent.CompletableFuture

import akka.actor.ActorSystem
import com.ing.baker.baas.common
import com.ing.baker.baas.scaladsl
import com.ing.baker.runtime.javadsl.InteractionInstance
import com.ing.baker.runtime.common.LanguageDataStructures.JavaApi

class BaaSInteractionInstance(actorSystem: ActorSystem) extends common.BaaSInteractionInstance[CompletableFuture] with JavaApi {

override type InteractionInstanceType = InteractionInstance

override def load(implementation: InteractionInstance*): Unit =
scaladsl.BaaSInteractionInstance(actorSystem).load(implementation.map(_.asScala): _*)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.ing.baker.baas.scaladsl

import akka.actor.ActorSystem
import com.ing.baker.baas.akka.InteractionAgent
import com.ing.baker.baas.common
import com.ing.baker.runtime.common.LanguageDataStructures.ScalaApi
import com.ing.baker.runtime.scaladsl.InteractionInstance

import scala.concurrent.Future

case class BaaSInteractionInstance(actorSystem: ActorSystem) extends common.BaaSInteractionInstance[Future] with ScalaApi {

override type InteractionInstanceType = InteractionInstance

override def load(implementation: InteractionInstance*): Unit =
implementation.foreach { implementation =>
actorSystem.actorOf(InteractionAgent(implementation))
}
}

98 changes: 98 additions & 0 deletions baas-node-state/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
include "baker.conf"

service {

actorSystemName = "BaaS"
actorSystemName = ${?ACTOR_SYSTEM_NAME}

clusterHost = "127.0.0.1"
clusterHost = ${?CLUSTER_HOST}

clusterPort = 2551
clusterPort = ${?CLUSTER_PORT}

seedHost = "127.0.0.1"
seedHost = ${?CLUSTER_SEED_HOST}

seedPort = 2551
seedPort = ${?CLUSTER_SEED_PORT}

httpServerPort = 8080
httpServerPort = ${?HTTP_SERVER_PORT}

memory-dump-path = "/home/demiourgos728/memdump"
memory-dump-path = ${?APP_MEMORY_DUMP_PATH}
}

baker {

interaction-manager = "remote"

actor {
provider = "cluster-sharded"
idle-timeout = 1 minute
}
}

cassandra-journal.contact-points.0 = "127.0.0.1"
cassandra-journal.contact-points.0 = ${?CASSANDRA_CONTACT_POINTS_0}

cassandra-snapshot-store.contact-points.0 = "127.0.0.1"
cassandra-snapshot-store.contact-points.0 = ${?CASSANDRA_CONTACT_POINTS_0}

akka.actor.allow-java-serialization = on

akka {

actor {
provider = "cluster"
}

persistence {
# See https://doc.akka.io/docs/akka-persistence-cassandra/current/journal.html#configuration
journal.plugin = "cassandra-journal"
# See https://doc.akka.io/docs/akka-persistence-cassandra/current/snapshots.html#configuration
snapshot-store.plugin = "cassandra-snapshot-store"
}

remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = ${service.clusterHost}
port = ${service.clusterPort}
}
}

cluster {

seed-nodes = [
"akka.tcp://"${service.actorSystemName}"@"${service.seedHost}":"${service.seedPort}]

# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
}

management.http.routes {
cluster-management = ""
}
}

kamon.instrumentation.akka.filters {

actors.track {
includes = [ ${service.actorSystemName}"/user/*" ]
excludes = []
# ${service.actorSystemName}"/system/**", ${service.actorSystemName}"/user/worker-helper"
#]
}

dispatchers {
includes = [ ${service.actorSystemName}"/akka.actor.default-dispatcher" ]
}

routers {
includes = [ ${service.actorSystemName}"/user/*" ]
}
}
15 changes: 15 additions & 0 deletions baas-node-state/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<configuration>

<root level="error">
<appender-ref ref="CONSOLE"/>
</root>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>

</configuration>
Loading

0 comments on commit 3cf7c09

Please sign in to comment.