Skip to content

Commit

Permalink
chore: ESE effect types
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 12, 2024
1 parent 8c63d4a commit 9eba014
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 63 deletions.
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-093c6f774</kalix-runtime.version>
<kalix-runtime.version>1.3.0-093c6f774-3-c81df56a-SNAPSHOT</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,16 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
.handleCommand(command.name, cmdPayload, cmdContext)
.asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve?

def replyOrError(updatedState: SpiEventSourcedEntity.State): (Option[BytesPayload], Option[SpiEntity.Error]) = {
def errorOrReply(updatedState: SpiEventSourcedEntity.State): Either[SpiEntity.Error, BytesPayload] = {
commandEffect.secondaryEffect(updatedState) match {
case ErrorReplyImpl(description) =>
(None, Some(new SpiEntity.Error(description)))
Left(new SpiEntity.Error(description))
case MessageReplyImpl(message, _) =>
// FIXME metadata?
val replyPayload = serializer.toBytes(message)
(Some(replyPayload), None)
Right(replyPayload)
case NoSecondaryEffectImpl =>
(None, None)
throw new IllegalStateException("Expected reply or error")
}
}

Expand All @@ -156,27 +156,27 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
currentSequence += 1
}

val (reply, error) = replyOrError(updatedState)
errorOrReply(updatedState) match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
val delete =
if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter)
else None

if (error.isDefined) {
Future.successful(
new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply = None, error, None))
} else {
val delete =
if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter)
else None
val serializedEvents = events.map(event => serializer.toBytes(event)).toVector

val serializedEvents = events.map(event => serializer.toBytes(event)).toVector

Future.successful(
new SpiEventSourcedEntity.Effect(events = serializedEvents, updatedState, reply, error, delete))
Future.successful(
new SpiEventSourcedEntity.PersistEffect(events = serializedEvents, updatedState, reply, delete))
}

case NoPrimaryEffect =>
val (reply, error) = replyOrError(state)

Future.successful(
new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply, error, None))
errorOrReply(state) match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
Future.successful(new SpiEventSourcedEntity.ReplyEffect(reply))
}
}

} catch {
Expand All @@ -186,13 +186,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
command.name,
s"No command handler found for command [${e.name}] on ${entity.getClass}")
case BadRequestException(msg) =>
Future.successful(
new SpiEventSourcedEntity.Effect(
events = Vector.empty,
updatedState = state,
reply = None,
error = Some(new SpiEntity.Error(msg)),
delete = None))
Future.successful(new SpiEventSourcedEntity.ErrorEffect(error = new SpiEntity.Error(msg)))
case e: EntityException =>
throw e
case NonFatal(error) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,49 +126,51 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]](
.handleCommand(command.name, cmdPayload, cmdContext)
.asInstanceOf[KeyValueEntityEffectImpl[AnyRef]] // FIXME improve?

def replyOrError: (Option[BytesPayload], Option[SpiEntity.Error]) = {
def errorOrReply: Either[SpiEntity.Error, BytesPayload] = {
commandEffect.secondaryEffect match {
case ErrorReplyImpl(description) =>
(None, Some(new SpiEntity.Error(description)))
Left(new SpiEntity.Error(description))
case MessageReplyImpl(message, _) =>
// FIXME metadata?
val replyPayload = serializer.toBytes(message)
(Some(replyPayload), None)
Right(replyPayload)
case NoSecondaryEffectImpl =>
(None, None)
throw new IllegalStateException("Expected reply or error")
}
}

commandEffect.primaryEffect match {
case UpdateState(updatedState) =>
val (reply, error) = replyOrError

if (error.isDefined) {
Future.successful(
new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply = None, error, None))
} else {
val serializedState = serializer.toBytes(updatedState)

Future.successful(
new SpiEventSourcedEntity.Effect(
events = Vector(serializedState),
updatedState,
reply,
error,
delete = None))
errorOrReply match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
val serializedState = serializer.toBytes(updatedState)

Future.successful(
new SpiEventSourcedEntity.PersistEffect(
events = Vector(serializedState),
updatedState,
reply,
delete = None))
}

case DeleteEntity =>
val (reply, error) = replyOrError

val delete = Some(configuration.cleanupDeletedEventSourcedEntityAfter)
Future.successful(new SpiEventSourcedEntity.Effect(events = Vector.empty, null, reply, error, delete))
errorOrReply match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
val delete = Some(configuration.cleanupDeletedEventSourcedEntityAfter)
Future.successful(new SpiEventSourcedEntity.PersistEffect(events = Vector.empty, null, reply, delete))
}

case NoPrimaryEffect =>
val (reply, error) = replyOrError

Future.successful(
new SpiEventSourcedEntity.Effect(events = Vector.empty, updatedState = state, reply, error, None))
errorOrReply match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
Future.successful(new SpiEventSourcedEntity.ReplyEffect(reply))
}
}

} catch {
Expand All @@ -178,13 +180,7 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]](
command.name,
s"No command handler found for command [${e.name}] on ${entity.getClass}")
case BadRequestException(msg) =>
Future.successful(
new SpiEventSourcedEntity.Effect(
events = Vector.empty,
updatedState = state,
reply = None,
error = Some(new SpiEntity.Error(msg)),
delete = None))
Future.successful(new SpiEventSourcedEntity.ErrorEffect(error = new SpiEntity.Error(msg)))
case e: EntityException =>
throw e
case NonFatal(error) =>
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-093c6f774")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-093c6f774-3-c81df56a-SNAPSHOT")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down

0 comments on commit 9eba014

Please sign in to comment.