From f66e00e154becac65cd1d4d54fd1de4b8a1eb003 Mon Sep 17 00:00:00 2001 From: Nikolas Schmidt-Voigt Date: Tue, 12 Nov 2024 12:33:18 +0100 Subject: [PATCH] feat: improve type safety when combining unis with limited concurrency --- implementation/revapi.json | 59 +++++++- .../smallrye/mutiny/groups/UniAndGroup2.java | 17 +++ .../smallrye/mutiny/groups/UniAndGroup3.java | 17 +++ .../smallrye/mutiny/groups/UniAndGroup4.java | 17 +++ .../smallrye/mutiny/groups/UniAndGroup5.java | 17 +++ .../smallrye/mutiny/groups/UniAndGroup6.java | 17 +++ .../smallrye/mutiny/groups/UniAndGroup7.java | 17 +++ .../smallrye/mutiny/groups/UniAndGroup8.java | 17 +++ .../smallrye/mutiny/groups/UniAndGroup9.java | 17 +++ .../smallrye/mutiny/operators/UniZipTest.java | 132 ++++++++++++++++++ 10 files changed, 326 insertions(+), 1 deletion(-) diff --git a/implementation/revapi.json b/implementation/revapi.json index d37de1ac5..acc98bac9 100644 --- a/implementation/revapi.json +++ b/implementation/revapi.json @@ -51,7 +51,64 @@ "criticality" : "highlight", "minSeverity" : "POTENTIALLY_BREAKING", "minCriticality" : "documented", - "differences" : [ ] + "differences" : [ + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup2", + "new": "method io.smallrye.mutiny.groups.UniAndGroup2 io.smallrye.mutiny.groups.UniAndGroup2::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup2` for improved type safety" + }, + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup3", + "new": "method io.smallrye.mutiny.groups.UniAndGroup3 io.smallrye.mutiny.groups.UniAndGroup3::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup3` for improved type safety" + }, + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup4", + "new": "method io.smallrye.mutiny.groups.UniAndGroup4 io.smallrye.mutiny.groups.UniAndGroup4::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup4` for improved type safety" + }, + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup5", + "new": "method io.smallrye.mutiny.groups.UniAndGroup5 io.smallrye.mutiny.groups.UniAndGroup5::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup5` for improved type safety" + }, + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup6", + "new": "method io.smallrye.mutiny.groups.UniAndGroup6 io.smallrye.mutiny.groups.UniAndGroup6::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup6` for improved type safety" + }, + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup7", + "new": "method io.smallrye.mutiny.groups.UniAndGroup7 io.smallrye.mutiny.groups.UniAndGroup7::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup7` for improved type safety" + }, + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup8", + "new": "method io.smallrye.mutiny.groups.UniAndGroup8 io.smallrye.mutiny.groups.UniAndGroup8::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup8` for improved type safety" + }, + { + "ignore": true, + "code": "java.method.returnTypeChangedCovariantly", + "old": "method io.smallrye.mutiny.groups.UniAndGroupIterable io.smallrye.mutiny.groups.UniAndGroupIterable::usingConcurrencyOf(int) @ io.smallrye.mutiny.groups.UniAndGroup9", + "new": "method io.smallrye.mutiny.groups.UniAndGroup9 io.smallrye.mutiny.groups.UniAndGroup9::usingConcurrencyOf(int)", + "justification": "Override `usingConcurrencyOf` in `UniAndGroup9` for improved type safety" + } + ] } }, { "extension" : "revapi.reporter.json", diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup2.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup2.java index eee74e45b..79cca1f31 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup2.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup2.java @@ -47,6 +47,23 @@ public Uni> asTuple() { return combine(Tuple2::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup2 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * Creates the resulting {@link Uni}. The items are combined using the given combinator function. * diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup3.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup3.java index 45d786dfa..a2d9b88b4 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup3.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup3.java @@ -30,6 +30,23 @@ public Uni> asTuple() { return combine(Tuple3::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup3 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * @deprecated use {@link #with(Functions.Function3)} instead */ diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup4.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup4.java index 38681b5a9..5ad23b245 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup4.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup4.java @@ -30,6 +30,23 @@ public Uni> asTuple() { return combine(Tuple4::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup4 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * @deprecated use {@link #with(Functions.Function4)} instead */ diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup5.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup5.java index 8a85546fc..fbf350dec 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup5.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup5.java @@ -31,6 +31,23 @@ public Uni> asTuple() { return combine(Tuple5::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup5 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * @deprecated use {@link #with(Functions.Function5)} instead */ diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup6.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup6.java index 813f52cd4..9be23bfac 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup6.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup6.java @@ -31,6 +31,23 @@ public Uni> asTuple() { return combine(Tuple6::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup6 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * @deprecated use {@link #with(Functions.Function6)} instead */ diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup7.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup7.java index bc62e2a09..e466471d2 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup7.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup7.java @@ -31,6 +31,23 @@ public Uni> asTuple() { return combine(Tuple7::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup7 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * @deprecated use {@link #with(Functions.Function7)} instead */ diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup8.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup8.java index 9886373b6..a2dcaf7b8 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup8.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup8.java @@ -33,6 +33,23 @@ public Uni> asTuple() { return combine(Tuple8::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup8 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * @deprecated use {@link #with(Functions.Function8)} instead */ diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup9.java b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup9.java index d40a5d068..adbafe854 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup9.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/UniAndGroup9.java @@ -33,6 +33,23 @@ public Uni> asTuple() { return combine(Tuple9::of); } + /** + * Limit the number of concurrent upstream subscriptions. + *

+ * When not specified all upstream {@link Uni} are being subscribed when the combining {@link Uni} is subscribed. + *

+ * Setting a limit is useful when you have a large number of {@link Uni} to combine and their simultaneous + * subscriptions might overwhelm resources (e.g., database connections, etc). + * + * @param level the concurrency level, must be strictly positive + * @return an object to configure the combination logic + */ + @CheckReturnValue + public UniAndGroup9 usingConcurrencyOf(int level) { + super.usingConcurrencyOf(level); + return this; + } + /** * @deprecated use {@link #with(Functions.Function9)} instead */ diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java index 39a4e75ca..f73d30974 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniZipTest.java @@ -636,6 +636,138 @@ void combineAllSmokeTest() { assertThat(sub.getItem()).asInstanceOf(LIST).containsExactly(1, 2, 3, 4); } + @Test + void combineAllTypesafeWith2() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b) + .usingConcurrencyOf(1) + .with((x1, x2) -> Integer.parseInt(x1) + x2) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(3); + } + + @Test + void combineAllTypesafeWith3() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + Uni c = Uni.createFrom().item(3); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b, c) + .usingConcurrencyOf(1) + .with((x1, x2, x3) -> Integer.parseInt(x1) + x2 + x3) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(6); + } + + @Test + void combineAllTypesafeWith4() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + Uni c = Uni.createFrom().item(3); + Uni d = Uni.createFrom().item(4); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b, c, d) + .usingConcurrencyOf(1) + .with((x1, x2, x3, x4) -> Integer.parseInt(x1) + x2 + x3 + x4) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(10); + } + + @Test + void combineAllTypesafeWith5() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + Uni c = Uni.createFrom().item(3); + Uni d = Uni.createFrom().item(4); + Uni e = Uni.createFrom().item(5); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b, c, d, e) + .usingConcurrencyOf(1) + .with((x1, x2, x3, x4, x5) -> Integer.parseInt(x1) + x2 + x3 + x4 + x5) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(15); + } + + @Test + void combineAllTypesafeWith6() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + Uni c = Uni.createFrom().item(3); + Uni d = Uni.createFrom().item(4); + Uni e = Uni.createFrom().item(5); + Uni f = Uni.createFrom().item(6); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b, c, d, e, f) + .usingConcurrencyOf(1) + .with((x1, x2, x3, x4, x5, x6) -> Integer.parseInt(x1) + x2 + x3 + x4 + x5 + x6) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(21); + } + + @Test + void combineAllTypesafeWith7() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + Uni c = Uni.createFrom().item(3); + Uni d = Uni.createFrom().item(4); + Uni e = Uni.createFrom().item(5); + Uni f = Uni.createFrom().item(6); + Uni g = Uni.createFrom().item(7); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b, c, d, e, f, g) + .usingConcurrencyOf(1) + .with((x1, x2, x3, x4, x5, x6, x7) -> Integer.parseInt(x1) + x2 + x3 + x4 + x5 + x6 + x7) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(28); + } + + @Test + void combineAllTypesafeWith8() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + Uni c = Uni.createFrom().item(3); + Uni d = Uni.createFrom().item(4); + Uni e = Uni.createFrom().item(5); + Uni f = Uni.createFrom().item(6); + Uni g = Uni.createFrom().item(7); + Uni h = Uni.createFrom().item(8); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b, c, d, e, f, g, h) + .usingConcurrencyOf(1) + .with((x1, x2, x3, x4, x5, x6, x7, x8) -> Integer.parseInt(x1) + x2 + x3 + x4 + x5 + x6 + x7 + x8) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(36); + } + + @Test + void combineAllTypesafeWith9() { + Uni a = Uni.createFrom().item("1"); + Uni b = Uni.createFrom().item(2); + Uni c = Uni.createFrom().item(3); + Uni d = Uni.createFrom().item(4); + Uni e = Uni.createFrom().item(5); + Uni f = Uni.createFrom().item(6); + Uni g = Uni.createFrom().item(7); + Uni h = Uni.createFrom().item(8); + Uni i = Uni.createFrom().item(9); + + UniAssertSubscriber sub = Uni.combine().all().unis(a, b, c, d, e, f, g, h, i) + .usingConcurrencyOf(1) + .with((x1, x2, x3, x4, x5, x6, x7, x8, x9) -> Integer.parseInt(x1) + x2 + x3 + x4 + x5 + x6 + x7 + x8 + x9) + .subscribe().withSubscriber(UniAssertSubscriber.create()); + + sub.awaitItem().assertItem(45); + } + @ParameterizedTest @CsvSource({ "1, 100, 1, 400",