Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Cleanup JsonSupport #105

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 68 additions & 13 deletions akka-javasdk/src/main/java/akka/javasdk/JsonSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package akka.javasdk;

import akka.Done;
import akka.annotation.InternalApi;
import akka.javasdk.annotations.Migration;
import akka.javasdk.impl.AnySupport;
import akka.javasdk.impl.ByteStringEncoding;
import akka.runtime.sdk.spi.BytesPayload;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.PropertyAccessor;
Expand Down Expand Up @@ -82,6 +82,8 @@ public static ObjectMapper getObjectMapper() {
return objectMapper;
}

private static akka.javasdk.impl.serialization.JsonSerializer jsonSerializer = new akka.javasdk.impl.serialization.JsonSerializer();

private JsonSupport() {
}

Expand All @@ -96,7 +98,10 @@ private JsonSupport() {
* for the JSON type instead.
*
* @see {{encodeJson(T, String}}
*
* @deprecated Protobuf Any with JSON is not supported
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it still make sense for message broker messages?
I'd say that then it's better to use a more explicit protobuf message that carry the raw bytes and type hint.

When we add "real" protobuf serialization support I think we should revisit this. Would be better to have the serialization support as something that is injected rather than static methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we should add protobuf as internal serialization format at all, we can trigger it by the fact that there is a Java protobuf message base types, that way we wont need explicit user serialization like this.

BTW I think we can drop these completely, as there is no way to use them that makes sense in the new SDK, they exist because of basing off of old SDK but they could never really be used here.

The only use case where this class can be used for anything useful right now, afaics, is to do streaming JSON encoding for results in a query in a HTTP endpoint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think protobuf can make sense when we add gRCP endpoints. Since that will happen later I think we can leave these as deprecated now and introduce proper serialization support at that point (and then remove these here).

*/
@Deprecated
public static <T> Any encodeJson(T value) {
return encodeJson(value, value.getClass().getName());
}
Expand All @@ -111,7 +116,10 @@ public static <T> Any encodeJson(T value) {
* JSON, useful for example when multiple different objects are passed through a pub/sub
* topic.
* @throws IllegalArgumentException if the given value cannot be turned into JSON
*
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T> Any encodeJson(T value, String jsonType) {
try {
ByteString bytes = encodeToBytes(value);
Expand All @@ -123,7 +131,10 @@ public static <T> Any encodeJson(T value, String jsonType) {
}
}

// FIXME do we really want all these to be public API?
/**
* @deprecated Use encodeToAkkaByteString
*/
@Deprecated
public static <T> ByteString encodeToBytes(T value) throws JsonProcessingException {
return UnsafeByteOperations.unsafeWrap(
objectMapper.writerFor(value.getClass()).writeValueAsBytes(value));
Expand All @@ -145,6 +156,36 @@ public static akka.util.ByteString encodeDynamicCollectionToAkkaByteString(Strin
return akka.util.ByteString.fromArrayUnsafe(objectMapper.writeValueAsBytes(objectNode));
}

/**
* Decode the given bytes to an instance of T using Jackson. The bytes must be
* the JSON string as bytes.
*
* @param valueClass The type of class to deserialize the object to, the class must have the
* proper Jackson annotations for deserialization.
* @param bytes The bytes to deserialize.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*
*/
public static <T> T decodeJson(Class<T> valueClass, akka.util.ByteString bytes) {
return jsonSerializer.fromBytes(valueClass, new BytesPayload(bytes, jsonSerializer.contentTypeFor(valueClass)));
}

/**
* Decode the given bytes to an instance of T using Jackson. The bytes must be
* the JSON string as bytes.
*
* @param valueClass The type of class to deserialize the object to, the class must have the
* proper Jackson annotations for deserialization.
* @param bytes The bytes to deserialize.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*
*/
public static <T> T decodeJson(Class<T> valueClass, byte[] bytes) {
return decodeJson(valueClass, akka.util.ByteString.fromArrayUnsafe(bytes));
}

/**
* Decode the given protobuf Any object to an instance of T using Jackson. The object must have
* the JSON string as bytes as value and a type URL starting with "json.akka.io/".
Expand All @@ -154,7 +195,10 @@ public static akka.util.ByteString encodeDynamicCollectionToAkkaByteString(Strin
* @param any The protobuf Any object to deserialize.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T> T decodeJson(Class<T> valueClass, Any any) {
if (!AnySupport.isJsonTypeUrl(any.getTypeUrl())) {
throw new IllegalArgumentException(
Expand All @@ -177,15 +221,15 @@ public static <T> T decodeJson(Class<T> valueClass, Any any) {
if (fromVersion < currentVersion) {
return migrate(valueClass, decodedBytes, fromVersion, migration);
} else if (fromVersion == currentVersion) {
return parseBytes(decodedBytes.toByteArray(), valueClass);
return objectMapper.readValue(decodedBytes.toByteArray(), valueClass);
} else if (fromVersion <= supportedForwardVersion) {
return migrate(valueClass, decodedBytes, fromVersion, migration);
} else {
throw new IllegalStateException("Migration version " + supportedForwardVersion + " is " +
"behind version " + fromVersion + " of deserialized type [" + valueClass.getName() + "]");
}
} else {
return parseBytes(decodedBytes.toByteArray(), valueClass);
return objectMapper.readValue(decodedBytes.toByteArray(), valueClass);
}
} catch (JsonProcessingException e) {
throw jsonProcessingException(valueClass, any, e);
Expand All @@ -196,6 +240,10 @@ public static <T> T decodeJson(Class<T> valueClass, Any any) {
}
}

/**
* @deprecated Use decodeJson
*/
@Deprecated
public static <T> T parseBytes(byte[] bytes, Class<T> valueClass) throws IOException {
return objectMapper.readValue(bytes, valueClass);
}
Expand Down Expand Up @@ -236,6 +284,11 @@ private static int parseVersion(String typeUrl) {
}
}


/**
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> valueClass, Class<C> collectionType, Any any) {
if (!AnySupport.isJsonTypeUrl(any.getTypeUrl())) {
throw new IllegalArgumentException(
Expand All @@ -257,14 +310,25 @@ public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> value
}
}

public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> valueClass, Class<C> collectionType, akka.util.ByteString bytes) {
return jsonSerializer.fromBytes(valueClass, collectionType, new BytesPayload(bytes, jsonSerializer.contentTypeFor(valueClass)));
}

public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> valueClass, Class<C> collectionType, byte[] bytes) {
return decodeJsonCollection(valueClass, collectionType, akka.util.ByteString.fromArrayUnsafe(bytes));
}

/**
* Decode the given protobuf Any to an instance of T using Jackson but only if the suffix of the
* type URL matches the given jsonType.
*
* @return An Optional containing the successfully decoded value or an empty Optional if the type
* suffix does not match.
* @throws IllegalArgumentException if the suffix matches but the Any cannot be parsed into a T
*
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T> Optional<T> decodeJson(Class<T> valueClass, String jsonType, Any any) {
if (any.getTypeUrl().endsWith(jsonType)) {
return Optional.of(decodeJson(valueClass, any));
Expand All @@ -273,15 +337,6 @@ public static <T> Optional<T> decodeJson(Class<T> valueClass, String jsonType, A
}
}

/**
* INTERNAL API
* @hidden
*/
@InternalApi
public static <T> T decodeJson(Class<T> valueClass, com.google.protobuf.any.Any scalaPbAny) {
var javaAny = com.google.protobuf.any.Any.toJavaProto(scalaPbAny);
return JsonSupport.decodeJson(valueClass, javaAny);
}
}

class DoneSerializer extends JsonSerializer<Done> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static HttpResponse ok(String text) {
public static HttpResponse ok(Object object) {
if (object == null) throw new IllegalArgumentException("object must not be null");
try {
byte[] body = JsonSupport.encodeToBytes(object).toByteArray();
var body = JsonSupport.encodeToAkkaByteString(object);
return HttpResponse.create().withEntity(ContentTypes.APPLICATION_JSON, body);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[akka] final case class RequestBuilderImpl[R](
override def withRequestBody(`object`: AnyRef): RequestBuilder[R] = {
if (`object` eq null) throw new IllegalArgumentException("object must not be null")
try {
val body = JsonSupport.encodeToBytes(`object`).toByteArray
val body = JsonSupport.encodeToAkkaByteString(`object`)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't this even wrong? encodeToBytes expected it to be in protobuf encoding?

val requestWithBody = request.withEntity(ContentTypes.APPLICATION_JSON, body)
withRequest(requestWithBody)
} catch {
Expand Down Expand Up @@ -167,7 +167,7 @@ private[akka] final case class RequestBuilderImpl[R](
throw new RuntimeException(errorString + ": " + bytes.utf8String)
}
} else if (res.entity.getContentType == ContentTypes.APPLICATION_JSON)
new StrictResponse[T](res, JsonSupport.parseBytes(bytes.toArrayUnsafe(), `type`))
new StrictResponse[T](res, JsonSupport.decodeJson(`type`, bytes))
else if (!res.entity.getContentType.binary && (`type` eq classOf[String]))
new StrictResponse[T](
res,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class MyJsonable {
@BeanProperty var field: String = _
}

@Deprecated
class JsonSupportSpec extends AnyWordSpec with Matchers {
// FIXME move these tests to JsonSerializerSpec

val myJsonable = new MyJsonable
myJsonable.field = "foo"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void verifyCounterEventSourcedConsumesFromKafka() {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(200));
var foundRecord = false;
for (ConsumerRecord<String, byte[]> r : records) {
var increased = JsonSupport.parseBytes(r.value(), CounterEvent.ValueIncreased.class);
var increased = JsonSupport.decodeJson(CounterEvent.ValueIncreased.class, r.value());
String subjectId = new String(r.headers().headers("ce-subject").iterator().next().value(), StandardCharsets.UTF_8);
if (subjectId.equals(counterId) && increased.value() == 20) {
foundRecord = true;
Expand Down
Loading