From e16db98d1fe5ddc0a057d63b3f5d91c1d01bc882 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 9 Jul 2024 12:55:59 -0400 Subject: [PATCH] GH-2728: Add Consistent Hash Exchange support Fixes: #2728 --- .../amqp/core/BaseExchangeBuilder.java | 203 ++++++++++++++++++ .../amqp/core/ConsistentHashExchange.java | 97 +++++++++ .../amqp/core/ExchangeBuilder.java | 175 ++++----------- .../amqp/core/ExchangeTypes.java | 16 +- .../amqp/core/builder/BuilderTests.java | 23 +- .../modules/ROOT/pages/amqp/abstractions.adoc | 8 +- .../changes-in-3-1-since-3-0.adoc | 8 +- .../antora/modules/ROOT/pages/index.adoc | 6 +- .../antora/modules/ROOT/pages/whats-new.adoc | 7 +- 9 files changed, 389 insertions(+), 154 deletions(-) create mode 100644 spring-amqp/src/main/java/org/springframework/amqp/core/BaseExchangeBuilder.java create mode 100644 spring-amqp/src/main/java/org/springframework/amqp/core/ConsistentHashExchange.java diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/BaseExchangeBuilder.java b/spring-amqp/src/main/java/org/springframework/amqp/core/BaseExchangeBuilder.java new file mode 100644 index 0000000000..0e956e034c --- /dev/null +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/BaseExchangeBuilder.java @@ -0,0 +1,203 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.core; + +import java.util.Arrays; +import java.util.Map; + +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; + +/** + * An {@link AbstractBuilder} extension for generics support. + * + * @param the target class implementation type. + * + * @author Gary Russell + * @author Artem Bilan + * + * @since 3.2 + * + */ +public abstract class BaseExchangeBuilder> extends AbstractBuilder { + + protected final String name; + + protected final String type; + + protected boolean durable = true; + + protected boolean autoDelete; + + protected boolean internal; + + private boolean delayed; + + private boolean ignoreDeclarationExceptions; + + private boolean declare = true; + + private Object[] declaringAdmins; + + /** + * Construct an instance of the appropriate type. + * @param name the exchange name + * @param type the type name + * @since 1.6.7 + * @see ExchangeTypes + */ + public BaseExchangeBuilder(String name, String type) { + this.name = name; + this.type = type; + } + + + /** + * Set the auto delete flag. + * @return the builder. + */ + public B autoDelete() { + this.autoDelete = true; + return _this(); + } + + /** + * Set the durable flag. + * @param isDurable the durable flag (default true). + * @return the builder. + */ + public B durable(boolean isDurable) { + this.durable = isDurable; + return _this(); + } + + /** + * Add an argument. + * @param key the argument key. + * @param value the argument value. + * @return the builder. + */ + public B withArgument(String key, Object value) { + getOrCreateArguments().put(key, value); + return _this(); + } + + /** + * Add the arguments. + * @param arguments the arguments map. + * @return the builder. + */ + public B withArguments(Map arguments) { + this.getOrCreateArguments().putAll(arguments); + return _this(); + } + + public B alternate(String exchange) { + return withArgument("alternate-exchange", exchange); + } + + /** + * Set the internal flag. + * @return the builder. + */ + public B internal() { + this.internal = true; + return _this(); + } + + /** + * Set the delayed flag. + * @return the builder. + */ + public B delayed() { + this.delayed = true; + return _this(); + } + + /** + * Switch on ignore exceptions such as mismatched properties when declaring. + * @return the builder. + * @since 2.0 + */ + public B ignoreDeclarationExceptions() { + this.ignoreDeclarationExceptions = true; + return _this(); + } + + /** + * Switch to disable declaration of the exchange by any admin. + * @return the builder. + * @since 2.1 + */ + public B suppressDeclaration() { + this.declare = false; + return _this(); + } + + /** + * Admin instances, or admin bean names that should declare this exchange. + * @param admins the admins. + * @return the builder. + * @since 2.1 + */ + public B admins(Object... admins) { + Assert.notNull(admins, "'admins' cannot be null"); + Assert.noNullElements(admins, "'admins' can't have null elements"); + this.declaringAdmins = Arrays.copyOf(admins, admins.length); + return _this(); + } + + @SuppressWarnings("unchecked") + public T build() { + AbstractExchange exchange; + if (ExchangeTypes.DIRECT.equals(this.type)) { + exchange = new DirectExchange(this.name, this.durable, this.autoDelete, getArguments()); + } + else if (ExchangeTypes.TOPIC.equals(this.type)) { + exchange = new TopicExchange(this.name, this.durable, this.autoDelete, getArguments()); + } + else if (ExchangeTypes.FANOUT.equals(this.type)) { + exchange = new FanoutExchange(this.name, this.durable, this.autoDelete, getArguments()); + } + else if (ExchangeTypes.HEADERS.equals(this.type)) { + exchange = new HeadersExchange(this.name, this.durable, this.autoDelete, getArguments()); + } + else { + exchange = new CustomExchange(this.name, this.type, this.durable, this.autoDelete, getArguments()); + } + + return (T) configureExchange(exchange); + } + + + protected T configureExchange(T exchange) { + exchange.setInternal(this.internal); + exchange.setDelayed(this.delayed); + exchange.setIgnoreDeclarationExceptions(this.ignoreDeclarationExceptions); + exchange.setShouldDeclare(this.declare); + if (!ObjectUtils.isEmpty(this.declaringAdmins)) { + exchange.setAdminsThatShouldDeclare(this.declaringAdmins); + } + return exchange; + } + + @SuppressWarnings("unchecked") + protected final B _this() { + return (B) this; + } + +} diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/ConsistentHashExchange.java b/spring-amqp/src/main/java/org/springframework/amqp/core/ConsistentHashExchange.java new file mode 100644 index 0000000000..47eefd7a85 --- /dev/null +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/ConsistentHashExchange.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.amqp.core; + +import java.util.Map; + +import org.springframework.util.Assert; + +/** + * An {@link AbstractExchange} extension for Consistent Hash exchange type. + * + * @author Artem Bilan + * + * @since 3.2 + * + * @see AmqpAdmin + */ +public class ConsistentHashExchange extends AbstractExchange { + + /** + * Construct a new durable, non-auto-delete Exchange with the provided name. + * @param name the name of the exchange. + */ + public ConsistentHashExchange(String name) { + super(name); + } + + /** + * Construct a new Exchange, given a name, durability flag, auto-delete flag. + * @param name the name of the exchange. + * @param durable true if we are declaring a durable exchange (the exchange will + * survive a server restart) + * @param autoDelete true if the server should delete the exchange when it is no + * longer in use + */ + public ConsistentHashExchange(String name, boolean durable, boolean autoDelete) { + super(name, durable, autoDelete); + } + + /** + * Construct a new Exchange, given a name, durability flag, and auto-delete flag, and + * arguments. + * @param name the name of the exchange. + * @param durable true if we are declaring a durable exchange (the exchange will + * survive a server restart) + * @param autoDelete true if the server should delete the exchange when it is no + * longer in use + * @param arguments the arguments used to declare the exchange + */ + public ConsistentHashExchange(String name, boolean durable, boolean autoDelete, Map arguments) { + super(name, durable, autoDelete, arguments); + Assert.isTrue(!(arguments.containsKey("hash-header") && arguments.containsKey("hash-property")), + "The 'hash-header' and 'hash-property' are mutually exclusive."); + } + + /** + * Specify a header name from the message to hash. + * @param headerName the header name for hashing. + */ + public void setHashHeader(String headerName) { + Map arguments = getArguments(); + Assert.isTrue(!arguments.containsKey("hash-property"), + "The 'hash-header' and 'hash-property' are mutually exclusive."); + arguments.put("hash-header", headerName); + } + + /** + * Specify a property name from the message to hash. + * @param propertyName the property name for hashing. + */ + public void setHashProperty(String propertyName) { + Map arguments = getArguments(); + Assert.isTrue(!arguments.containsKey("hash-header"), + "The 'hash-header' and 'hash-property' are mutually exclusive."); + arguments.put("hash-property", propertyName); + } + + @Override + public String getType() { + return ExchangeTypes.CONSISTENT_HASH; + } + +} diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeBuilder.java b/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeBuilder.java index af3405fade..273a9b84ca 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeBuilder.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,6 @@ package org.springframework.amqp.core; -import java.util.Arrays; -import java.util.Map; - -import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; - /** * Builder providing a fluent API for building {@link Exchange}s. * @@ -29,27 +23,8 @@ * @author Artem Bilan * * @since 1.6 - * */ -public final class ExchangeBuilder extends AbstractBuilder { - - private final String name; - - private final String type; - - private boolean durable = true; - - private boolean autoDelete; - - private boolean internal; - - private boolean delayed; - - private boolean ignoreDeclarationExceptions; - - private boolean declare = true; - - private Object[] declaringAdmins; +public class ExchangeBuilder extends BaseExchangeBuilder { /** * Construct an instance of the appropriate type. @@ -59,8 +34,7 @@ public final class ExchangeBuilder extends AbstractBuilder { * @see ExchangeTypes */ public ExchangeBuilder(String name, String type) { - this.name = name; - this.type = type; + super(name, type); } /** @@ -100,126 +74,49 @@ public static ExchangeBuilder headersExchange(String name) { } /** - * Set the auto delete flag. - * @return the builder. - */ - public ExchangeBuilder autoDelete() { - this.autoDelete = true; - return this; - } - - /** - * Set the durable flag. - * @param isDurable the durable flag (default true). - * @return the builder. - */ - public ExchangeBuilder durable(boolean isDurable) { - this.durable = isDurable; - return this; - } - - /** - * Add an argument. - * @param key the argument key. - * @param value the argument value. - * @return the builder. - */ - public ExchangeBuilder withArgument(String key, Object value) { - getOrCreateArguments().put(key, value); - return this; - } - - /** - * Add the arguments. - * @param arguments the arguments map. - * @return the builder. - */ - public ExchangeBuilder withArguments(Map arguments) { - this.getOrCreateArguments().putAll(arguments); - return this; - } - - public ExchangeBuilder alternate(String exchange) { - return withArgument("alternate-exchange", exchange); - } - - /** - * Set the internal flag. - * @return the builder. - */ - public ExchangeBuilder internal() { - this.internal = true; - return this; - } - - /** - * Set the delayed flag. - * @return the builder. - */ - public ExchangeBuilder delayed() { - this.delayed = true; - return this; - } - - /** - * Switch on ignore exceptions such as mismatched properties when declaring. - * @return the builder. - * @since 2.0 - */ - public ExchangeBuilder ignoreDeclarationExceptions() { - this.ignoreDeclarationExceptions = true; - return this; - } - - /** - * Switch to disable declaration of the exchange by any admin. + * Return an {@code x-consistent-hash} exchange builder. + * @param name the name. * @return the builder. - * @since 2.1 + * @since 3.2 */ - public ExchangeBuilder suppressDeclaration() { - this.declare = false; - return this; + public static ConsistentHashExchangeBuilder consistentHashExchange(String name) { + return new ConsistentHashExchangeBuilder(name); } /** - * Admin instances, or admin bean names that should declare this exchange. - * @param admins the admins. - * @return the builder. - * @since 2.1 + * An {@link ExchangeBuilder} extension for the {@link ConsistentHashExchange}. + * + * @since 3.2 */ - public ExchangeBuilder admins(Object... admins) { - Assert.notNull(admins, "'admins' cannot be null"); - Assert.noNullElements(admins, "'admins' can't have null elements"); - this.declaringAdmins = Arrays.copyOf(admins, admins.length); - return this; - } - - @SuppressWarnings("unchecked") - public T build() { - AbstractExchange exchange; - if (ExchangeTypes.DIRECT.equals(this.type)) { - exchange = new DirectExchange(this.name, this.durable, this.autoDelete, getArguments()); - } - else if (ExchangeTypes.TOPIC.equals(this.type)) { - exchange = new TopicExchange(this.name, this.durable, this.autoDelete, getArguments()); + public static final class ConsistentHashExchangeBuilder extends BaseExchangeBuilder { + + /** + * Construct an instance of the builder for {@link ConsistentHashExchange}. + * + * @param name the exchange name + * @see ExchangeTypes + */ + public ConsistentHashExchangeBuilder(String name) { + super(name, ExchangeTypes.CONSISTENT_HASH); } - else if (ExchangeTypes.FANOUT.equals(this.type)) { - exchange = new FanoutExchange(this.name, this.durable, this.autoDelete, getArguments()); - } - else if (ExchangeTypes.HEADERS.equals(this.type)) { - exchange = new HeadersExchange(this.name, this.durable, this.autoDelete, getArguments()); + + public ConsistentHashExchangeBuilder hashHeader(String headerName) { + withArgument("hash-header", headerName); + return this; } - else { - exchange = new CustomExchange(this.name, this.type, this.durable, this.autoDelete, getArguments()); + + public ConsistentHashExchangeBuilder hashProperty(String propertyName) { + withArgument("hash-property", propertyName); + return this; } - exchange.setInternal(this.internal); - exchange.setDelayed(this.delayed); - exchange.setIgnoreDeclarationExceptions(this.ignoreDeclarationExceptions); - exchange.setShouldDeclare(this.declare); - if (!ObjectUtils.isEmpty(this.declaringAdmins)) { - exchange.setAdminsThatShouldDeclare(this.declaringAdmins); + + @Override + @SuppressWarnings("unchecked") + public ConsistentHashExchange build() { + return configureExchange( + new ConsistentHashExchange(this.name, this.durable, this.autoDelete, getArguments())); } - return (T) exchange; + } } diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeTypes.java b/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeTypes.java index 291d940fe9..472d9ece8b 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeTypes.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/ExchangeTypes.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,9 @@ * * @author Mark Fisher * @author Gary Russell + * @author Artem Bilan */ -public abstract class ExchangeTypes { +public final class ExchangeTypes { /** * Direct exchange. @@ -44,9 +45,20 @@ public abstract class ExchangeTypes { */ public static final String HEADERS = "headers"; + /** + * Consistent Hash exchange. + * @since 3.2 + */ + public static final String CONSISTENT_HASH = "x-consistent-hash"; + /** * System exchange. + * @deprecated with no replacement (for removal): there is no such an exchange type in AMQP. */ + @Deprecated(since = "3.2", forRemoval = true) public static final String SYSTEM = "system"; + private ExchangeTypes() { + } + } diff --git a/spring-amqp/src/test/java/org/springframework/amqp/core/builder/BuilderTests.java b/spring-amqp/src/test/java/org/springframework/amqp/core/builder/BuilderTests.java index fff6e8a996..7bf19eec29 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/core/builder/BuilderTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/core/builder/BuilderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,9 +17,11 @@ package org.springframework.amqp.core.builder; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import org.junit.jupiter.api.Test; +import org.springframework.amqp.core.ConsistentHashExchange; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.ExchangeBuilder; @@ -31,8 +33,8 @@ /** * @author Gary Russell + * @author Artem Bilan * @since 1.6 - * */ public class BuilderTests { @@ -89,6 +91,23 @@ public void testExchangeBuilder() { assertThat(exchange.isDurable()).isTrue(); assertThat(exchange.isInternal()).isFalse(); assertThat(exchange.isDelayed()).isFalse(); + + exchange = ExchangeBuilder.consistentHashExchange("foo") + .ignoreDeclarationExceptions() + .hashHeader("my_header") + .build(); + + assertThat(exchange).isInstanceOf(ConsistentHashExchange.class); + assertThat((String) exchange.getArguments().get("hash-header")).isEqualTo("my_header"); + + assertThatIllegalArgumentException() + .isThrownBy(() -> + ExchangeBuilder.consistentHashExchange("wrong_exchange") + .hashHeader("my_header") + .hashProperty("my_property") + .build()) + .withMessage("The 'hash-header' and 'hash-property' are mutually exclusive."); + } } diff --git a/src/reference/antora/modules/ROOT/pages/amqp/abstractions.adoc b/src/reference/antora/modules/ROOT/pages/amqp/abstractions.adoc index 192c5c7f13..ca252a56c1 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/abstractions.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/abstractions.adoc @@ -87,7 +87,13 @@ The behavior varies across these `Exchange` types in terms of how they handle bi For example, a `Direct` exchange lets a queue be bound by a fixed routing key (often the queue's name). A `Topic` exchange supports bindings with routing patterns that may include the '*' and '#' wildcards for 'exactly-one' and 'zero-or-more', respectively. The `Fanout` exchange publishes to all queues that are bound to it without taking any routing key into consideration. -For much more information about these and the other Exchange types, see xref:index.adoc#resources[Other Resources]. +For much more information about these and the other Exchange types, see https://www.rabbitmq.com/tutorials/amqp-concepts#exchanges[AMQP Exchanges]. + +Starting with version 3.2, the `ConsistentHashExchange` type has been introduced for convenience during application configuration phase. +It provided options like `x-consistent-hash` for an exchange type. +Allows to configure `hash-header` or `hash-property` exchange definition argument. +The respective RabbitMQ `rabbitmq_consistent_hash_exchange` plugin has to be enabled on the broker. +More information about the purpose, logic and behavior of the Consistent Hash Exchange are in the official RabbitMQ https://github.com/rabbitmq/rabbitmq-server/tree/main/deps/rabbitmq_consistent_hash_exchange[documentation]. NOTE: The AMQP specification also requires that any broker provide a "`default`" direct exchange that has no name. All queues that are declared are bound to that default `Exchange` with their names as routing keys. diff --git a/src/reference/antora/modules/ROOT/pages/appendix/previous-whats-new/changes-in-3-1-since-3-0.adoc b/src/reference/antora/modules/ROOT/pages/appendix/previous-whats-new/changes-in-3-1-since-3-0.adoc index 7c8327fcd2..171b94d77c 100644 --- a/src/reference/antora/modules/ROOT/pages/appendix/previous-whats-new/changes-in-3-1-since-3-0.adoc +++ b/src/reference/antora/modules/ROOT/pages/appendix/previous-whats-new/changes-in-3-1-since-3-0.adoc @@ -1,13 +1,13 @@ [[changes-in-3-1-since-3-0]] -== Changes in 3.1 Since 3.0 += Changes in 3.1 Since 3.0 [[java-17-spring-framework-6-1]] -=== Java 17, Spring Framework 6.1 +== Java 17, Spring Framework 6.1 This version requires Spring Framework 6.1 and Java 17. [[x31-exc]] -=== Exclusive Consumer Logging +== Exclusive Consumer Logging Log messages reporting access refusal due to exclusive consumers are now logged at DEBUG level by default. It remains possible to configure your own logging behavior by setting the `exclusiveConsumerExceptionLogger` and `closeExceptionLogger` properties on the listener container and connection factory respectively. @@ -16,7 +16,7 @@ A new method `logRestart()` has been added to the `ConditionalExceptionLogger` t See xref:amqp/receiving-messages/consumer-events.adoc[Consumer Events] and xref:amqp/connections.adoc#channel-close-logging[Logging Channel Close Events] for more information. [[x31-conn-backoff]] -=== Connections Enhancement +== Connections Enhancement Connection Factory supported backoff policy when creating connection channel. See xref:amqp/connections.adoc[Choosing a Connection Factory] for more information. diff --git a/src/reference/antora/modules/ROOT/pages/index.adoc b/src/reference/antora/modules/ROOT/pages/index.adoc index ab6ddca384..1648e79ad4 100644 --- a/src/reference/antora/modules/ROOT/pages/index.adoc +++ b/src/reference/antora/modules/ROOT/pages/index.adoc @@ -11,8 +11,4 @@ We provide a "`template`" as a high-level abstraction for sending and receiving We also provide support for message-driven POJOs. These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration. In all of these cases, you can see similarities to the JMS support in the Spring Framework. -For other project-related information, visit the Spring AMQP project https://projects.spring.io/spring-amqp/[homepage]. - -(C) 2010 - 2023 - -Copies of this document may be made for your own use and for distribution to others, provided that you do not charge any fee for such copies and further provided that each copy contains this Copyright Notice, whether distributed in print or electronically. \ No newline at end of file +For other project-related information, visit the Spring AMQP project https://projects.spring.io/spring-amqp/[homepage]. \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 5c8c40f205..3258c28f44 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -8,4 +8,9 @@ [[spring-framework-6-2]] === Spring Framework 6.1 -This version requires Spring Framework 6.2. \ No newline at end of file +This version requires Spring Framework 6.2. + +[[x32-consistent-hash-exchange]] +=== Consistent Hash Exchange + +The convenient `ConsistentHashExchange` and respective `ExchangeBuilder.consistentHashExchange()` API has been introduced. \ No newline at end of file