Skip to content

Commit

Permalink
chore: Missing metadata in reply
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 18, 2024
1 parent 109c0e0 commit ae1bf21
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 19 deletions.
2 changes: 1 addition & 1 deletion akka-javasdk-maven/akka-javasdk-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<!-- These are dependent on runtime environment and cannot be customized by users -->
<maven.compiler.release>21</maven.compiler.release>
<kalix-runtime.version>1.3.0-f7f21f858</kalix-runtime.version>
<kalix-runtime.version>1.3.0-f7f21f858-5-2592c175-SNAPSHOT</kalix-runtime.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<skip.docker>false</skip.docker>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.SpiEntity
import akka.runtime.sdk.spi.SpiEventSourcedEntity
import akka.runtime.sdk.spi.SpiMetadata
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import org.slf4j.MDC
Expand Down Expand Up @@ -130,14 +131,15 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
.handleCommand(command.name, cmdPayload)
.asInstanceOf[EventSourcedEntityEffectImpl[AnyRef, E]] // FIXME improve?

def errorOrReply(updatedState: SpiEventSourcedEntity.State): Either[SpiEntity.Error, BytesPayload] = {
def errorOrReply(
updatedState: SpiEventSourcedEntity.State): Either[SpiEntity.Error, (BytesPayload, SpiMetadata)] = {
commandEffect.secondaryEffect(updatedState) match {
case ErrorReplyImpl(description) =>
Left(new SpiEntity.Error(description))
case MessageReplyImpl(message, _) =>
// FIXME metadata?
case MessageReplyImpl(message, m) =>
val replyPayload = serializer.toBytes(message)
Right(replyPayload)
val metadata = MetadataImpl.toSpi(m)
Right(replyPayload -> metadata)
case NoSecondaryEffectImpl =>
throw new IllegalStateException("Expected reply or error")
}
Expand All @@ -157,23 +159,28 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
errorOrReply(updatedState) match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
case Right((reply, metadata)) =>
val delete =
if (deleteEntity) Some(configuration.cleanupDeletedEventSourcedEntityAfter)
else None

val serializedEvents = events.map(event => serializer.toBytes(event)).toVector

Future.successful(
new SpiEventSourcedEntity.PersistEffect(events = serializedEvents, updatedState, reply, delete))
new SpiEventSourcedEntity.PersistEffect(
events = serializedEvents,
updatedState,
reply,
metadata,
delete))
}

case NoPrimaryEffect =>
errorOrReply(state) match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
Future.successful(new SpiEventSourcedEntity.ReplyEffect(reply))
case Right((reply, metadata)) =>
Future.successful(new SpiEventSourcedEntity.ReplyEffect(reply, metadata))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import akka.javasdk.keyvalueentity.KeyValueEntityContext
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.SpiEntity
import akka.runtime.sdk.spi.SpiEventSourcedEntity
import akka.runtime.sdk.spi.SpiMetadata
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.Tracer
import org.slf4j.MDC
Expand Down Expand Up @@ -124,14 +125,14 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]](
.handleCommand(command.name, cmdPayload)
.asInstanceOf[KeyValueEntityEffectImpl[AnyRef]] // FIXME improve?

def errorOrReply: Either[SpiEntity.Error, BytesPayload] = {
def errorOrReply: Either[SpiEntity.Error, (BytesPayload, SpiMetadata)] = {
commandEffect.secondaryEffect match {
case ErrorReplyImpl(description) =>
Left(new SpiEntity.Error(description))
case MessageReplyImpl(message, _) =>
// FIXME metadata?
case MessageReplyImpl(message, m) =>
val replyPayload = serializer.toBytes(message)
Right(replyPayload)
val metadata = MetadataImpl.toSpi(m)
Right(replyPayload -> metadata)
case NoSecondaryEffectImpl =>
throw new IllegalStateException("Expected reply or error")
}
Expand All @@ -142,32 +143,34 @@ private[impl] final class KeyValueEntityImpl[S, KV <: KeyValueEntity[S]](
errorOrReply match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
case Right((reply, metadata)) =>
val serializedState = serializer.toBytes(updatedState)

Future.successful(
new SpiEventSourcedEntity.PersistEffect(
events = Vector(serializedState),
updatedState,
reply,
metadata,
delete = None))
}

case DeleteEntity =>
errorOrReply match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
case Right((reply, metadata)) =>
val delete = Some(configuration.cleanupDeletedEventSourcedEntityAfter)
Future.successful(new SpiEventSourcedEntity.PersistEffect(events = Vector.empty, null, reply, delete))
Future.successful(
new SpiEventSourcedEntity.PersistEffect(events = Vector.empty, null, reply, metadata, delete))
}

case NoPrimaryEffect =>
errorOrReply match {
case Left(err) =>
Future.successful(new SpiEventSourcedEntity.ErrorEffect(err))
case Right(reply) =>
Future.successful(new SpiEventSourcedEntity.ReplyEffect(reply))
case Right((reply, metadata)) =>
Future.successful(new SpiEventSourcedEntity.ReplyEffect(reply, metadata))
}
}

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Dependencies {
val ProtocolVersionMinor = 1
val RuntimeImage = "gcr.io/kalix-public/kalix-runtime"
// Remember to bump kalix-runtime.version in akka-javasdk-maven/akka-javasdk-parent if bumping this
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-f7f21f858")
val RuntimeVersion = sys.props.getOrElse("kalix-runtime.version", "1.3.0-f7f21f858-5-2592c175-SNAPSHOT")
}
// NOTE: embedded SDK should have the AkkaVersion aligned, when updating RuntimeVersion, make sure to check
// if AkkaVersion and AkkaHttpVersion are aligned
Expand Down

0 comments on commit ae1bf21

Please sign in to comment.