Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Cleanup JsonSupport #105

Merged
merged 6 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 100 additions & 23 deletions akka-javasdk/src/main/java/akka/javasdk/JsonSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package akka.javasdk;

import akka.Done;
import akka.annotation.InternalApi;
import akka.javasdk.annotations.Migration;
import akka.javasdk.impl.AnySupport;
import akka.javasdk.impl.ByteStringEncoding;
import akka.runtime.sdk.spi.BytesPayload;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.PropertyAccessor;
Expand Down Expand Up @@ -82,6 +82,8 @@ public static ObjectMapper getObjectMapper() {
return objectMapper;
}

private static akka.javasdk.impl.serialization.JsonSerializer jsonSerializer = new akka.javasdk.impl.serialization.JsonSerializer();

private JsonSupport() {
}

Expand All @@ -96,7 +98,10 @@ private JsonSupport() {
* for the JSON type instead.
*
* @see {{encodeJson(T, String}}
*
* @deprecated Protobuf Any with JSON is not supported
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it still make sense for message broker messages?
I'd say that then it's better to use a more explicit protobuf message that carry the raw bytes and type hint.

When we add "real" protobuf serialization support I think we should revisit this. Would be better to have the serialization support as something that is injected rather than static methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we should add protobuf as internal serialization format at all, we can trigger it by the fact that there is a Java protobuf message base types, that way we wont need explicit user serialization like this.

BTW I think we can drop these completely, as there is no way to use them that makes sense in the new SDK, they exist because of basing off of old SDK but they could never really be used here.

The only use case where this class can be used for anything useful right now, afaics, is to do streaming JSON encoding for results in a query in a HTTP endpoint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think protobuf can make sense when we add gRCP endpoints. Since that will happen later I think we can leave these as deprecated now and introduce proper serialization support at that point (and then remove these here).

*/
@Deprecated
public static <T> Any encodeJson(T value) {
return encodeJson(value, value.getClass().getName());
}
Expand All @@ -111,7 +116,10 @@ public static <T> Any encodeJson(T value) {
* JSON, useful for example when multiple different objects are passed through a pub/sub
* topic.
* @throws IllegalArgumentException if the given value cannot be turned into JSON
*
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T> Any encodeJson(T value, String jsonType) {
try {
ByteString bytes = encodeToBytes(value);
Expand All @@ -123,26 +131,81 @@ public static <T> Any encodeJson(T value, String jsonType) {
}
}

// FIXME do we really want all these to be public API?
/**
* @deprecated Use encodeToAkkaByteString
*/
@Deprecated
public static <T> ByteString encodeToBytes(T value) throws JsonProcessingException {
return UnsafeByteOperations.unsafeWrap(
objectMapper.writerFor(value.getClass()).writeValueAsBytes(value));
}

public static <T> akka.util.ByteString encodeToAkkaByteString(T value) throws JsonProcessingException {
return akka.util.ByteString.fromArrayUnsafe(objectMapper.writerFor(value.getClass()).writeValueAsBytes(value));
/**
* Encode the given value as JSON using Jackson.
*
* @param value the object to encode as JSON, must be an instance of a class properly annotated
* with the needed Jackson annotations.
* @throws IllegalArgumentException if the given value cannot be turned into JSON
*/
public static <T> akka.util.ByteString encodeToAkkaByteString(T value) {
try {
return akka.util.ByteString.fromArrayUnsafe(objectMapper.writerFor(value.getClass()).writeValueAsBytes(value));
} catch (JsonProcessingException ex) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping the JsonProcessingException is not fully compatible change, but very inconvenient to have that as checked exception, and it's not like that in other places

throw new IllegalArgumentException(
"Could not encode [" + value.getClass().getName() + "] as JSON", ex);
}
}

public static akka.util.ByteString encodeDynamicToAkkaByteString(String key, String value) throws JsonProcessingException {
ObjectNode dynamicJson = objectMapper.createObjectNode().put(key, value);
return akka.util.ByteString.fromArrayUnsafe(objectMapper.writeValueAsBytes(dynamicJson));
public static akka.util.ByteString encodeDynamicToAkkaByteString(String key, String value) {
try {
ObjectNode dynamicJson = objectMapper.createObjectNode().put(key, value);
return akka.util.ByteString.fromArrayUnsafe(objectMapper.writeValueAsBytes(dynamicJson));
} catch (JsonProcessingException ex) {
throw new IllegalArgumentException(
"Could not encode dynamic key/value as JSON", ex);
}
}

public static akka.util.ByteString encodeDynamicCollectionToAkkaByteString(String key, Collection<?> values) throws JsonProcessingException {
ObjectNode objectNode = objectMapper.createObjectNode();
ArrayNode dynamicJson = objectNode.putArray(key);
values.forEach(v -> dynamicJson.add(v.toString()));
return akka.util.ByteString.fromArrayUnsafe(objectMapper.writeValueAsBytes(objectNode));
public static akka.util.ByteString encodeDynamicCollectionToAkkaByteString(String key, Collection<?> values) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite strange that this ever was public API I think it was mostly methods we needed ourselves internally that accidentally became public API. We should reconsider that and limit the surface to intentionally public API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deprecated those, and changed to internal JsonSerializer: c9f7b3b

try {
ObjectNode objectNode = objectMapper.createObjectNode();
ArrayNode dynamicJson = objectNode.putArray(key);
values.forEach(v -> dynamicJson.add(v.toString()));
return akka.util.ByteString.fromArrayUnsafe(objectMapper.writeValueAsBytes(objectNode));
} catch (JsonProcessingException ex) {
throw new IllegalArgumentException(
"Could not encode dynamic key/values as JSON", ex);
}
}

/**
* Decode the given bytes to an instance of T using Jackson. The bytes must be
* the JSON string as bytes.
*
* @param valueClass The type of class to deserialize the object to, the class must have the
* proper Jackson annotations for deserialization.
* @param bytes The bytes to deserialize.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*
*/
public static <T> T decodeJson(Class<T> valueClass, akka.util.ByteString bytes) {
return jsonSerializer.fromBytes(valueClass, new BytesPayload(bytes, jsonSerializer.contentTypeFor(valueClass)));
}

/**
* Decode the given bytes to an instance of T using Jackson. The bytes must be
* the JSON string as bytes.
*
* @param valueClass The type of class to deserialize the object to, the class must have the
* proper Jackson annotations for deserialization.
* @param bytes The bytes to deserialize.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*
*/
public static <T> T decodeJson(Class<T> valueClass, byte[] bytes) {
return decodeJson(valueClass, akka.util.ByteString.fromArrayUnsafe(bytes));
}

/**
Expand All @@ -154,7 +217,10 @@ public static akka.util.ByteString encodeDynamicCollectionToAkkaByteString(Strin
* @param any The protobuf Any object to deserialize.
* @return The decoded object
* @throws IllegalArgumentException if the given value cannot be decoded to a T
*
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T> T decodeJson(Class<T> valueClass, Any any) {
if (!AnySupport.isJsonTypeUrl(any.getTypeUrl())) {
throw new IllegalArgumentException(
Expand All @@ -177,15 +243,15 @@ public static <T> T decodeJson(Class<T> valueClass, Any any) {
if (fromVersion < currentVersion) {
return migrate(valueClass, decodedBytes, fromVersion, migration);
} else if (fromVersion == currentVersion) {
return parseBytes(decodedBytes.toByteArray(), valueClass);
return objectMapper.readValue(decodedBytes.toByteArray(), valueClass);
} else if (fromVersion <= supportedForwardVersion) {
return migrate(valueClass, decodedBytes, fromVersion, migration);
} else {
throw new IllegalStateException("Migration version " + supportedForwardVersion + " is " +
"behind version " + fromVersion + " of deserialized type [" + valueClass.getName() + "]");
}
} else {
return parseBytes(decodedBytes.toByteArray(), valueClass);
return objectMapper.readValue(decodedBytes.toByteArray(), valueClass);
}
} catch (JsonProcessingException e) {
throw jsonProcessingException(valueClass, any, e);
Expand All @@ -196,6 +262,10 @@ public static <T> T decodeJson(Class<T> valueClass, Any any) {
}
}

/**
* @deprecated Use decodeJson
*/
@Deprecated
public static <T> T parseBytes(byte[] bytes, Class<T> valueClass) throws IOException {
return objectMapper.readValue(bytes, valueClass);
}
Expand Down Expand Up @@ -236,6 +306,11 @@ private static int parseVersion(String typeUrl) {
}
}


/**
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> valueClass, Class<C> collectionType, Any any) {
if (!AnySupport.isJsonTypeUrl(any.getTypeUrl())) {
throw new IllegalArgumentException(
Expand All @@ -257,14 +332,25 @@ public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> value
}
}

public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> valueClass, Class<C> collectionType, akka.util.ByteString bytes) {
return jsonSerializer.fromBytes(valueClass, collectionType, new BytesPayload(bytes, jsonSerializer.contentTypeFor(valueClass)));
}

public static <T, C extends Collection<T>> C decodeJsonCollection(Class<T> valueClass, Class<C> collectionType, byte[] bytes) {
return decodeJsonCollection(valueClass, collectionType, akka.util.ByteString.fromArrayUnsafe(bytes));
}

/**
* Decode the given protobuf Any to an instance of T using Jackson but only if the suffix of the
* type URL matches the given jsonType.
*
* @return An Optional containing the successfully decoded value or an empty Optional if the type
* suffix does not match.
* @throws IllegalArgumentException if the suffix matches but the Any cannot be parsed into a T
*
* @deprecated Protobuf Any with JSON is not supported
*/
@Deprecated
public static <T> Optional<T> decodeJson(Class<T> valueClass, String jsonType, Any any) {
if (any.getTypeUrl().endsWith(jsonType)) {
return Optional.of(decodeJson(valueClass, any));
Expand All @@ -273,15 +359,6 @@ public static <T> Optional<T> decodeJson(Class<T> valueClass, String jsonType, A
}
}

/**
* INTERNAL API
* @hidden
*/
@InternalApi
public static <T> T decodeJson(Class<T> valueClass, com.google.protobuf.any.Any scalaPbAny) {
var javaAny = com.google.protobuf.any.Any.toJavaProto(scalaPbAny);
return JsonSupport.decodeJson(valueClass, javaAny);
}
}

class DoneSerializer extends JsonSerializer<Done> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ public static HttpResponse ok(String text) {
*/
public static HttpResponse ok(Object object) {
if (object == null) throw new IllegalArgumentException("object must not be null");
try {
byte[] body = JsonSupport.encodeToBytes(object).toByteArray();
return HttpResponse.create().withEntity(ContentTypes.APPLICATION_JSON, body);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
var body = JsonSupport.encodeToAkkaByteString(object);
return HttpResponse.create().withEntity(ContentTypes.APPLICATION_JSON, body);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private[akka] final case class RequestBuilderImpl[R](
override def withRequestBody(`object`: AnyRef): RequestBuilder[R] = {
if (`object` eq null) throw new IllegalArgumentException("object must not be null")
try {
val body = JsonSupport.encodeToBytes(`object`).toByteArray
val body = JsonSupport.encodeToAkkaByteString(`object`)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't this even wrong? encodeToBytes expected it to be in protobuf encoding?

val requestWithBody = request.withEntity(ContentTypes.APPLICATION_JSON, body)
withRequest(requestWithBody)
} catch {
Expand Down Expand Up @@ -167,7 +167,7 @@ private[akka] final case class RequestBuilderImpl[R](
throw new RuntimeException(errorString + ": " + bytes.utf8String)
}
} else if (res.entity.getContentType == ContentTypes.APPLICATION_JSON)
new StrictResponse[T](res, JsonSupport.parseBytes(bytes.toArrayUnsafe(), `type`))
new StrictResponse[T](res, JsonSupport.decodeJson(`type`, bytes))
else if (!res.entity.getContentType.binary && (`type` eq classOf[String]))
new StrictResponse[T](
res,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class MyJsonable {
@BeanProperty var field: String = _
}

@Deprecated
class JsonSupportSpec extends AnyWordSpec with Matchers {
// FIXME move these tests to JsonSerializerSpec

val myJsonable = new MyJsonable
myJsonable.field = "foo"
Expand Down
3 changes: 1 addition & 2 deletions docs/src/modules/java/pages/serialization.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,4 @@ include::example$event-sourced-customer-registry/src/test/java/customer/domain/C
----
<1> Encodes old class in Base64 String.
<2> Decodes Base64 bytes.
<3> Parses bytes into `Any` object.
<4> Verifies JSON deserialization.
<3> Verifies JSON deserialization.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void verifyCounterEventSourcedConsumesFromKafka() {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(200));
var foundRecord = false;
for (ConsumerRecord<String, byte[]> r : records) {
var increased = JsonSupport.parseBytes(r.value(), CounterEvent.ValueIncreased.class);
var increased = JsonSupport.decodeJson(CounterEvent.ValueIncreased.class, r.value());
String subjectId = new String(r.headers().headers("ce-subject").iterator().next().value(), StandardCharsets.UTF_8);
if (subjectId.equals(counterId) && increased.value() == 20) {
foundRecord = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package customer.domain;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import akka.javasdk.JsonSupport;
import akka.util.ByteString;
import org.junit.jupiter.api.Test;

import java.util.Base64;
Expand All @@ -17,7 +15,7 @@ class CustomerEventSerializationTest {
@Test
public void shouldDeserializeWithMandatoryField() {
//given
Any serialized = JsonSupport.encodeJson(new CustomerEvent.NameChanged("andre"));
ByteString serialized = JsonSupport.encodeToAkkaByteString(new CustomerEvent.NameChanged("andre"));
Copy link
Member Author

@patriknw patriknw Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work, because I think we are loosing some migration. Somewhat strange for this test case, but understand that it will be a problem for the other migration cases.

Not sure what we shall do? Using pb Any in our docs doesn't make sense. BytesPayload could be a replacement but that is not intended to be user facing. What do you think @aludwiko ?


//when
NameChanged deserialized = JsonSupport.decodeJson(NameChanged.class, serialized);
Expand All @@ -32,7 +30,7 @@ public void shouldDeserializeWithMandatoryField() {
public void shouldDeserializeWithChangedFieldName() {
//given
Address address = new Address("Wall Street", "New York");
Any serialized = JsonSupport.encodeJson(new CustomerEvent.AddressChanged(address));
ByteString serialized = JsonSupport.encodeToAkkaByteString(new CustomerEvent.AddressChanged(address));

//when
AddressChanged deserialized = JsonSupport.decodeJson(AddressChanged.class, serialized);
Expand All @@ -44,7 +42,7 @@ public void shouldDeserializeWithChangedFieldName() {
@Test
public void shouldDeserializeWithStructureMigration() {
//given
Any serialized = JsonSupport.encodeJson(new CustomerCreatedOld("[email protected]", "bob", "Wall Street", "New York"));
ByteString serialized = JsonSupport.encodeToAkkaByteString(new CustomerCreatedOld("[email protected]", "bob", "Wall Street", "New York"));

//when
CustomerEvent.CustomerCreated deserialized = JsonSupport.decodeJson(CustomerEvent.CustomerCreated.class, serialized);
Expand All @@ -56,21 +54,20 @@ public void shouldDeserializeWithStructureMigration() {

// tag::testing-deserialization[]
@Test
public void shouldDeserializeCustomerCreated_V0() throws InvalidProtocolBufferException {
public void shouldDeserializeCustomerCreated_V0() {
// tag::testing-deserialization-encoding[]
Any serialized = JsonSupport.encodeJson(new CustomerCreatedOld("[email protected]", "bob", "Wall Street", "New York"));
String encodedBytes = new String(Base64.getEncoder().encode(serialized.toByteArray())); // <1>
ByteString serialized = JsonSupport.encodeToAkkaByteString(new CustomerCreatedOld("[email protected]", "bob", "Wall Street", "New York"));
String encodedBytes = serialized.encodeBase64().utf8String(); // <1>
// end::testing-deserialization-encoding[]

byte[] bytes = Base64.getDecoder().decode(encodedBytes.getBytes()); // <2>
Any serializedAny = Any.parseFrom(ByteString.copyFrom(bytes)); // <3>
ByteString decodedBytes = ByteString.fromString(encodedBytes).decodeBase64(); // <2>

CustomerEvent.CustomerCreated deserialized = JsonSupport.decodeJson(CustomerEvent.CustomerCreated.class,
serializedAny); // <4>
decodedBytes); // <3>

assertEquals("Wall Street", deserialized.address().street());
assertEquals("New York", deserialized.address().city());
}
// end::testing-deserialization[]

}
}
Loading