Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Baker as a Service #374

Merged
merged 91 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from 76 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
b0afeb6
Start on making the InteractionInstances be called via an Actor to al…
Tim-Linschoten Aug 26, 2019
76ede3d
Added the new InteractionManager protocol and a first implementation …
VledicFranco Aug 26, 2019
0a53529
Algorithm work
VledicFranco Sep 11, 2019
04c2cc8
InteractionAgent concept finished
VledicFranco Sep 17, 2019
ccae2cd
Connected the new quest mechanism to Baker and allow for remote inter…
Tim-Linschoten Sep 17, 2019
df2c658
Fixed unit test by changing node.
Tim-Linschoten Sep 17, 2019
add2004
Added basic logging for the new components
Tim-Linschoten Sep 18, 2019
3c3c6f9
Added some trace UUID information to the logs.
Tim-Linschoten Sep 18, 2019
99c382a
Tests updated
VledicFranco Sep 18, 2019
8c1faab
Added timeout for how long a agent can be considering.
Tim-Linschoten Sep 18, 2019
27a75ba
Added proto serialization to scheduling messages
VledicFranco Sep 20, 2019
4c93387
Implemented client and server for BaaS (only 1 method)
VledicFranco Sep 23, 2019
63abc8f
Continued work on the server/client for baas
VledicFranco Sep 26, 2019
5451cd0
Finished the proto mappings and as well the server and client
VledicFranco Sep 29, 2019
41ed5e0
Continued work on the testing of the BaaS client-server communication
VledicFranco Sep 30, 2019
adfd2ad
Finished testing the client server interactions
VledicFranco Oct 2, 2019
49bac79
minor
VledicFranco Oct 3, 2019
e81c5cd
finished the serialization of BakerExceptions
VledicFranco Oct 4, 2019
0df4849
Created Java API for the remote baker
VledicFranco Oct 4, 2019
8ffa25d
Removed println in test.
Tim-Linschoten Oct 14, 2019
8353dcf
typos and minor fixes
VledicFranco Oct 22, 2019
be0f153
minor
VledicFranco Oct 22, 2019
04ea311
WIP separation of the interface
VledicFranco Oct 23, 2019
ec39395
Finished moving the interface into its own project
VledicFranco Oct 23, 2019
2157818
Fixes typos created by Intellij refactoring tool D:
VledicFranco Oct 23, 2019
2d6498c
Fixes typos created by Intellij refactoring tool D:
VledicFranco Oct 23, 2019
bdf4eb6
Fixes typos created by Intellij refactoring tool D:
VledicFranco Oct 23, 2019
fdbf23d
Fixes typos created by Intellij refactoring tool D:
VledicFranco Oct 23, 2019
7b04648
WIP baas project separation
VledicFranco Oct 23, 2019
e26bab2
WIP baas project separation
VledicFranco Oct 23, 2019
1cfd6fe
finished baas project separation
VledicFranco Oct 23, 2019
bc756b7
Massive work on baas project separation and a proper interface for th…
VledicFranco Oct 24, 2019
c9d78c8
protobuf serialization fix
VledicFranco Oct 25, 2019
30b702f
fixed runtime tests
VledicFranco Oct 25, 2019
f58cbfe
Finished work on modules and started work on the distributed recipe e…
VledicFranco Oct 25, 2019
5c07f50
quick fix
VledicFranco Nov 1, 2019
b7ede2a
added the RecipeEventMEtadata data structure to event listeners, whic…
VledicFranco Nov 4, 2019
f611a18
Massive improvements on integration tests of baas
VledicFranco Nov 4, 2019
cdd583f
Fixed integration tests with the interaction node
VledicFranco Nov 5, 2019
3a163d7
finished testing the remote event listeners
VledicFranco Nov 5, 2019
969da97
Setup of playground
VledicFranco Nov 19, 2019
62e2a15
moved example package to fit other examples and improved on the playg…
VledicFranco Nov 20, 2019
c89bd1b
fixed tests
VledicFranco Nov 20, 2019
6499b40
added example modules and improved playground app
VledicFranco Nov 20, 2019
1064b42
work on the playground
VledicFranco Nov 22, 2019
f1aaf7e
Working basic baas example on the playground
VledicFranco Nov 22, 2019
869d548
Working on the kubernetes setup
VledicFranco Nov 25, 2019
db787fe
fixed tests
VledicFranco Nov 25, 2019
caf3e85
added baas kubernetes example
VledicFranco Nov 26, 2019
3d5c4d4
Add yaml files for akka bootsrap via kubernetes
Nov 26, 2019
2141c28
Add yaml files for akka bootsrap via kubernetes
Nov 26, 2019
e0a0862
Remove docker build
Nov 26, 2019
63a2fe6
worked on kubernetes setup
VledicFranco Nov 26, 2019
bc35331
work on kubernetes:
VledicFranco Nov 26, 2019
1e9a97f
work on kube
VledicFranco Nov 26, 2019
df62a83
Add service.yml
Nov 27, 2019
ebf0f70
Worked on the minikube example
VledicFranco Nov 27, 2019
65a01d8
Added nodes roles and openshift example
VledicFranco Dec 4, 2019
b0eba57
Fix syntax in yaml file
Dec 4, 2019
9c0bee7
Fix syntax in yaml file
Dec 4, 2019
255c779
Increase timeouts into 2 minutes
Dec 4, 2019
1b15873
image pull
VledicFranco Dec 4, 2019
405cd21
Merge branch 'playground' of github.com:ing-bank/baker into playground
VledicFranco Dec 4, 2019
15114b3
Increase timeout to 10 minutes
Dec 4, 2019
f99da59
fixed tests
VledicFranco Dec 4, 2019
c234745
Delete OpenShift specific configuration
Dec 4, 2019
52def85
Update manual with latest steps to run minikube cluster with example …
Dec 4, 2019
a0a5453
Remove corporate proxy specific steps from namespace creation.
Dec 4, 2019
a88e2d2
Remove dependency to private images hub.
Dec 4, 2019
b7d3299
Remove unused executable.
Dec 4, 2019
dbd7ffa
solved some PR comments
VledicFranco Dec 5, 2019
5e50f52
Merge pull request #369 from ing-bank/playground
stasimus Dec 5, 2019
eda0163
Merged master
VledicFranco Dec 9, 2019
d8aa97a
Merge branch 'master' into DEV-BaaS
VledicFranco Dec 10, 2019
0982719
Ignoring interface and generated protobuf code for correct code cover…
VledicFranco Dec 10, 2019
3e2b0de
Minor improvements to spec
Dec 10, 2019
9e43014
Remove unused code.
Dec 10, 2019
ba0b34d
Extract timeouts into variable
Dec 10, 2019
a30b172
Remove unused code
Dec 10, 2019
01eac0f
Minor reformat the code.
Dec 10, 2019
0278c92
Remove duplicates.
Dec 10, 2019
c15f6ae
Remove println statements.
Dec 10, 2019
ddefe53
Remove println statement.
Dec 10, 2019
5a3371e
Remove println statement.
Dec 10, 2019
02cecb1
Optimise Dependencies
Dec 11, 2019
874e237
Remove unused lines.
Dec 11, 2019
b10edd1
Reformat the code.
Dec 11, 2019
f6c5d2b
Reformat the code.
Dec 11, 2019
157e272
Remove Thread sleep
Dec 11, 2019
813e96e
Adressed a TODO, reading config from config file
VledicFranco Dec 13, 2019
e086eca
Merge branch 'master' into DEV-BaaS
stasimus Dec 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.ing.baker.baas.javadsl

import akka.actor.ActorSystem
import akka.stream.Materializer
import com.ing.baker.baas.scaladsl.{BakerClient => ScalaRemoteBaker}
import com.ing.baker.runtime.javadsl.{Baker => JavaBaker}
import com.ing.baker.runtime.serialization.Encryption

object BakerClient {

def build(hostname: String, actorSystem: ActorSystem, mat: Materializer, encryption: Encryption = Encryption.NoEncryption): JavaBaker =
new JavaBaker(ScalaRemoteBaker.build(hostname, encryption)(actorSystem, mat))
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.ing.baker.baas.akka

import akka.actor.{Actor, ActorRef, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.ing.baker.baas.protocol.ProtocolDistributedEventPublishing
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeEventMetadata}

object EventListenerAgent {

case object CommitTimeout

def apply(recipeName: String, listenerFunction: (RecipeEventMetadata, EventInstance) => Unit): Props =
Props(new EventListenerAgent(recipeName, listenerFunction))
}

class EventListenerAgent(recipeName: String, listenerFunction: (RecipeEventMetadata, EventInstance) => Unit) extends Actor {

val mediator: ActorRef =
DistributedPubSub(context.system).mediator

val eventsTopic: String =
ProtocolDistributedEventPublishing.eventsTopic(recipeName)

def subscribeToEvents(): Unit =
mediator ! DistributedPubSubMediator.Subscribe(eventsTopic, self)

def unsubscribeToEvents(): Unit =
mediator ! DistributedPubSubMediator.Unsubscribe(eventsTopic, self)

subscribeToEvents()

def receive: Receive = {
case ProtocolDistributedEventPublishing.Event(recipeEventMetadata, event) =>
listenerFunction(recipeEventMetadata, event)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.ing.baker.baas.common

import com.ing.baker.runtime.common.{EventInstance, InteractionInstance, RecipeEventMetadata}
import com.ing.baker.runtime.common.LanguageDataStructures.LanguageApi

trait BaaSEventListener[F[_]] extends LanguageApi { self =>

type EventInstanceType <: EventInstance { type Language <: self.Language }

type RecipeEventMetadataType <: RecipeEventMetadata { type Language <: self.Language }

def registerEventListener(recipeName: String, listenerFunction: language.BiConsumerFunction[RecipeEventMetadataType, EventInstanceType]): F[Unit]

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.ing.baker.baas.javadsl

import java.util.concurrent.CompletableFuture
import java.util.function.BiConsumer

import akka.actor.ActorSystem
import com.ing.baker.baas.common
import com.ing.baker.baas.scaladsl
import com.ing.baker.runtime.common.LanguageDataStructures.JavaApi
import com.ing.baker.runtime.javadsl.{EventInstance, RecipeEventMetadata}

import scala.compat.java8.FutureConverters

class BaaSEventListener(actorSystem: ActorSystem) extends common.BaaSEventListener[CompletableFuture] with JavaApi {

override type EventInstanceType = EventInstance

override type RecipeEventMetadataType = RecipeEventMetadata

override def registerEventListener(recipeName: String, listenerFunction: BiConsumer[RecipeEventMetadata, EventInstance]): CompletableFuture[Unit] =
FutureConverters.toJava(scaladsl.BaaSEventListener(actorSystem).registerEventListener(recipeName, (metadata, event) => listenerFunction.accept(metadata.asJava, event.asJava))).toCompletableFuture
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.ing.baker.baas.scaladsl

import akka.actor.ActorSystem
import com.ing.baker.baas.akka.EventListenerAgent
import com.ing.baker.baas.common
import com.ing.baker.runtime.common.LanguageDataStructures.ScalaApi
import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeEventMetadata}

import scala.concurrent.Future

case class BaaSEventListener(actorSystem: ActorSystem) extends common.BaaSEventListener[Future] with ScalaApi {

override type EventInstanceType = EventInstance

override type RecipeEventMetadataType = RecipeEventMetadata

override def registerEventListener(recipeName: String, listenerFunction: (RecipeEventMetadata, EventInstance) => Unit): Future[Unit] =
stasimus marked this conversation as resolved.
Show resolved Hide resolved
Future.successful { actorSystem.actorOf(EventListenerAgent(recipeName, listenerFunction)) }
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.ing.baker.baas.akka

import java.util.UUID

import akka.actor.{Actor, ActorRef, Props}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.ing.baker.baas.akka.InteractionAgent.{CommitTimeout, log}
import com.ing.baker.baas.protocol.{ProtocolInteractionExecution, ProtocolPushPullMatching, ProtocolQuestCommit}
import com.ing.baker.runtime.scaladsl.{EventInstance, InteractionInstance}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object InteractionAgent {

case object CommitTimeout

def apply(instance: InteractionInstance): Props =
Props(new InteractionAgent(instance))

/**
* Closes over the agent actor references, just like the pipe pattern does, except it sends a more expressive
* message in the case of failure.
*
* TODO: Handle invalid ingredients scenario
*
* @param agent actor reference
* @param result outcome of invoking the interaction instance
* @param ec execution context to use
*/
def pipeBackExecutionResponse(agent: ActorRef, mandated: ActorRef)(result: Future[Option[EventInstance]])(implicit ec: ExecutionContext): Unit = {
result.onComplete {
case Success(value) =>
mandated.tell(ProtocolInteractionExecution.InstanceExecutedSuccessfully(value), agent)
case Failure(exception) =>
mandated.tell(ProtocolInteractionExecution.InstanceExecutionFailed(), agent)
}
}

private val log = LoggerFactory.getLogger(classOf[InteractionAgent])
}

class InteractionAgent(interaction: InteractionInstance) extends Actor {

import context.dispatcher

val mediator: ActorRef = DistributedPubSub(context.system).mediator

val pullTopic: String =
ProtocolPushPullMatching.pullTopic(interaction.name)

val pushTopic: String =
ProtocolPushPullMatching.pushTopic(interaction.name)

def pull(): Unit =
mediator ! DistributedPubSubMediator.Publish(pullTopic, ProtocolPushPullMatching.Pull(self))

def subscribePush(): Unit =
mediator ! DistributedPubSubMediator.Subscribe(pushTopic, self)

def unsubscribePush(): Unit =
mediator ! DistributedPubSubMediator.Unsubscribe(pushTopic, self)

subscribePush()
pull()

def receive: Receive = {
case ProtocolPushPullMatching.Push(mandated, uuid) =>
// start Quest commit protocol
log.info(s"${interaction.name}:$uuid: Considering quest from $mandated")
mandated ! ProtocolQuestCommit.Considering(self)
unsubscribePush()
context.system.scheduler.scheduleOnce(10 seconds, self, CommitTimeout)(context.dispatcher, self)
context.become(considering(uuid))

case ProtocolPushPullMatching.AvailableQuest(mandated, uuid) =>
// start Quest commit protocol
log.info(s"${interaction.name}:$uuid: Considering quest from $mandated")
mandated ! ProtocolQuestCommit.Considering(self)
unsubscribePush()
context.system.scheduler.scheduleOnce(10 seconds, self, CommitTimeout)(context.dispatcher, self)
context.become(considering(uuid))
}

def considering(uuid: UUID): Receive = {
case ProtocolQuestCommit.Commit(mandated, executeMessage) =>
log.info(s"${interaction.name}:$uuid: Commited to quest from $mandated")
// start the execution protocol by already starting the computation and become committed
InteractionAgent.pipeBackExecutionResponse(self, mandated)(interaction.run(executeMessage.input))
subscribePush()
pull()
context.become(receive)
// context.become(committed(mandated, uuid))

case ProtocolQuestCommit.QuestTaken =>
log.info(s"${interaction.name}:$uuid: Quest taken, starting the protocol again")
// quest taIken, start all over again
subscribePush()
pull()
context.become(receive)

case CommitTimeout =>
log.info(s"${interaction.name}:$uuid: not received a response after commit timeout")
subscribePush()
pull()
context.become(receive)

}

// def committed(mandated: ActorRef, uuid: UUID): Receive = {
// case message: ProtocolInteractionExecution =>
// log.info(s"${interaction.name}:$uuid: Considering quest from $mandated")
// // Forward the result
// mandated ! message
// // Start all over again
// subscribePush()
// pull()
// context.become(receive)
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.ing.baker.baas.common

import com.ing.baker.runtime.common.InteractionInstance
import com.ing.baker.runtime.common.LanguageDataStructures.LanguageApi

trait BaaSInteractionInstance[F[_]] extends LanguageApi { self =>

type InteractionInstanceType <: InteractionInstance[F] { type Language <: self.Language }

def load(implementation: InteractionInstanceType*): Unit

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.ing.baker.baas.javadsl

import java.util.concurrent.CompletableFuture

import akka.actor.ActorSystem
import com.ing.baker.baas.common
import com.ing.baker.baas.scaladsl
import com.ing.baker.runtime.javadsl.InteractionInstance
import com.ing.baker.runtime.common.LanguageDataStructures.JavaApi

class BaaSInteractionInstance(actorSystem: ActorSystem) extends common.BaaSInteractionInstance[CompletableFuture] with JavaApi {

override type InteractionInstanceType = InteractionInstance

override def load(implementation: InteractionInstance*): Unit =
scaladsl.BaaSInteractionInstance(actorSystem).load(implementation.map(_.asScala): _*)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.ing.baker.baas.scaladsl

import akka.actor.ActorSystem
import com.ing.baker.baas.akka.InteractionAgent
import com.ing.baker.baas.common
import com.ing.baker.runtime.common.LanguageDataStructures.ScalaApi
import com.ing.baker.runtime.scaladsl.InteractionInstance

import scala.concurrent.Future

case class BaaSInteractionInstance(actorSystem: ActorSystem) extends common.BaaSInteractionInstance[Future] with ScalaApi {

override type InteractionInstanceType = InteractionInstance

override def load(implementation: InteractionInstance*): Unit =
implementation.foreach { implementation =>
actorSystem.actorOf(InteractionAgent(implementation))
}
}

98 changes: 98 additions & 0 deletions baas-node-state/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
include "baker.conf"

service {

actorSystemName = "BaaS"
actorSystemName = ${?ACTOR_SYSTEM_NAME}

clusterHost = "127.0.0.1"
clusterHost = ${?CLUSTER_HOST}

clusterPort = 2551
clusterPort = ${?CLUSTER_PORT}

seedHost = "127.0.0.1"
seedHost = ${?CLUSTER_SEED_HOST}

seedPort = 2551
seedPort = ${?CLUSTER_SEED_PORT}

httpServerPort = 8080
httpServerPort = ${?HTTP_SERVER_PORT}

memory-dump-path = "/home/demiourgos728/memdump"
memory-dump-path = ${?APP_MEMORY_DUMP_PATH}
}

baker {

interaction-manager = "remote"

actor {
provider = "cluster-sharded"
idle-timeout = 1 minute
}
}

cassandra-journal.contact-points.0 = "127.0.0.1"
cassandra-journal.contact-points.0 = ${?CASSANDRA_CONTACT_POINTS_0}

cassandra-snapshot-store.contact-points.0 = "127.0.0.1"
cassandra-snapshot-store.contact-points.0 = ${?CASSANDRA_CONTACT_POINTS_0}

akka.actor.allow-java-serialization = on

akka {

actor {
provider = "cluster"
}

persistence {
# See https://doc.akka.io/docs/akka-persistence-cassandra/current/journal.html#configuration
journal.plugin = "cassandra-journal"
# See https://doc.akka.io/docs/akka-persistence-cassandra/current/snapshots.html#configuration
snapshot-store.plugin = "cassandra-snapshot-store"
}

remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = ${service.clusterHost}
port = ${service.clusterPort}
}
}

cluster {

seed-nodes = [
"akka.tcp://"${service.actorSystemName}"@"${service.seedHost}":"${service.seedPort}]

# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
}

management.http.routes {
cluster-management = ""
}
}

kamon.instrumentation.akka.filters {

actors.track {
includes = [ ${service.actorSystemName}"/user/*" ]
excludes = []
# ${service.actorSystemName}"/system/**", ${service.actorSystemName}"/user/worker-helper"
#]
}

dispatchers {
includes = [ ${service.actorSystemName}"/akka.actor.default-dispatcher" ]
}

routers {
includes = [ ${service.actorSystemName}"/user/*" ]
}
}
Loading