Skip to content

Commit

Permalink
chore: Still PbAny in KeyValueEntitiesImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Dec 6, 2024
1 parent 3b3e527 commit 3238fca
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ import kalix.protocol.component
@InternalApi
private[impl] object EffectSupport {

def asProtocol(messageReply: MessageReplyImpl[JavaPbAny]): component.Reply =
component.Reply(
Some(ScalaPbAny.fromJavaProto(messageReply.message)),
MetadataImpl.toProtocol(messageReply.metadata))
def asProtocol(messageReply: MessageReplyImpl[_]): component.Reply = {
val scalaPbAny =
messageReply.message match {
case pb: ScalaPbAny => pb
case pb: JavaPbAny => ScalaPbAny.fromJavaProto(pb)
case other => throw new IllegalStateException(s"Expected PbAny, but was [${other.getClass.getName}]")
}

component.Reply(Some(scalaPbAny), MetadataImpl.toProtocol(messageReply.metadata))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.javasdk.impl.effect

import akka.annotation.InternalApi
import akka.javasdk.Metadata
import com.google.protobuf.{ Any => JavaPbAny }
import kalix.protocol.component.ClientAction

/**
Expand All @@ -16,7 +15,7 @@ import kalix.protocol.component.ClientAction
private[javasdk] sealed trait SecondaryEffectImpl {
final def replyToClientAction(commandId: Long): Option[ClientAction] = {
this match {
case message: MessageReplyImpl[JavaPbAny] @unchecked =>
case message: MessageReplyImpl[_] =>
Some(ClientAction(ClientAction.Action.Reply(EffectSupport.asProtocol(message))))
case failure: ErrorReplyImpl =>
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,10 @@ private[impl] final class KeyValueEntitiesImpl(
val cmdPayloadPbAny = command.payload.getOrElse(
// FIXME smuggling 0 arity method called from component client through here
ScalaPbAny.defaultInstance.withTypeUrl(AnySupport.JsonTypeUrlPrefix).withValue(ByteString.empty()))
val cmdBytesPayload = AnySupport.toSpiBytesPayload(cmdPayloadPbAny)
val cmd = service.serializer.fromBytes(cmdBytesPayload)
// FIXME shall we deserialize here or in the router? the router needs the contentType as well.
// val cmdBytesPayload = AnySupport.toSpiBytesPayload(cmdPayloadPbAny)
// val cmd = service.serializer.fromBytes(cmdBytesPayload)
val cmd = cmdPayloadPbAny

val context =
new CommandContextImpl(thisEntityId, command.name, command.id, metadata, span, tracerFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ final class ViewsImpl(_services: Map[String, ViewService[_]], sdkDispatcherName:
}

val commandName = receiveEvent.commandName
val bytesPayload = AnySupport.toSpiBytesPayload(receiveEvent.getPayload)
val msg = service.serializer.fromBytes(bytesPayload)
// FIXME shall we deserialize here or in the router? the router needs the contentType as well.
// val bytesPayload = AnySupport.toSpiBytesPayload(receiveEvent.getPayload)
// val msg = service.serializer.fromBytes(bytesPayload)
val msg = receiveEvent.getPayload
val metadata = MetadataImpl.of(receiveEvent.metadata.map(_.entries.toVector).getOrElse(Nil))
val addedToMDC = metadata.traceId match {
case Some(traceId) =>
Expand Down

0 comments on commit 3238fca

Please sign in to comment.