diff --git a/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/AbstractCustomer.java b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/AbstractCustomer.java new file mode 100644 index 0000000..b69257e --- /dev/null +++ b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/AbstractCustomer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.springcloud.stream.core; + +import com.buession.core.utils.Assert; +import com.buession.lang.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 消息消费者抽象类 + * + * @param + * 消息类型 + * @param + * 消息消费 {@link Sink} + * + * @author Yong.Teng + * @since 2.3.0 + */ +public abstract class AbstractCustomer implements Customer { + + /** + * 消息消费 {@link Sink} + */ + protected S sink; + + protected Logger logger = LoggerFactory.getLogger(getClass()); + + /** + * 构造函数 + * + * @param sink + * 消息消费 {@link Sink} + */ + public AbstractCustomer(final S sink) { + Assert.isNull(sink, "Sink cloud not be null."); + this.sink = sink; + } + + @Override + public void onMessage(final M message) { + Assert.isNull(message, "Message cloud not be null."); + + if(consume(message) == Status.SUCCESS){ + logger.info("Message consume success."); + }else{ + logger.warn("Message consume failure."); + } + } + + protected abstract Status consume(final M payload); + +} diff --git a/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/AbstractProducer.java b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/AbstractProducer.java new file mode 100644 index 0000000..d126330 --- /dev/null +++ b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/AbstractProducer.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.springcloud.stream.core; + +import com.buession.core.utils.Assert; +import com.buession.lang.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; + +/** + * 消息生产者抽象类 + * + * @param + * 消息类型 + * @param + * 消息源 {@link Source} + * + * @author Yong.Teng + * @since 2.3.0 + */ +public abstract class AbstractProducer implements Producer { + + /** + * 消息源 {@link Source} + */ + protected S source; + + protected Logger logger = LoggerFactory.getLogger(getClass()); + + /** + * 构造函数 + * + * @param source + * 消息源 {@link Source} + */ + public AbstractProducer(final S source) { + Assert.isNull(source, "Source cloud not be null."); + this.source = source; + } + + @Override + public Status sendMessage(final M message, final MessageHeaders headers, final long timeout) { + Assert.isNull(message, "Message cloud not be null."); + + org.springframework.messaging.Message actualMessage = headers == null ? + MessageBuilder.withPayload(message).build() : MessageBuilder.createMessage(message, headers); + + if(source.output().send(actualMessage, timeout)){ + logger.info("Message send success."); + return Status.SUCCESS; + }else{ + logger.warn("Message send failure."); + return Status.FAILURE; + } + } + +} diff --git a/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Customer.java b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Customer.java new file mode 100644 index 0000000..41ab94b --- /dev/null +++ b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Customer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.springcloud.stream.core; + +/** + * 消息消费者 + * + * @param + * 消息类型 + * + * @author Yong.Teng + * @since 2.3.0 + */ +public interface Customer { + + /** + * 消息消费 + * + * @param message + * 消息 + */ + void onMessage(final M message); + +} diff --git a/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Producer.java b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Producer.java new file mode 100644 index 0000000..f6a4a7c --- /dev/null +++ b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Producer.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.springcloud.stream.core; + +import com.buession.lang.Status; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; + +/** + * 消息生产者 + * + * @param + * 消息类型 + * + * @author Yong.Teng + * @since 2.3.0 + */ +@FunctionalInterface +public interface Producer { + + /** + * 发送队列消息 + * + * @param message + * 消息 + * + * @return 消息发送结果 + */ + default Status sendMessage(final M message) { + return sendMessage(message, MessageChannel.INDEFINITE_TIMEOUT); + } + + /** + * 发送队列消息 + * + * @param message + * 消息 + * @param timeout + * 发送消息超时时间 + * + * @return 消息发送结果 + */ + default Status sendMessage(final M message, final long timeout) { + return sendMessage(message, null, timeout); + } + + /** + * 发送队列消息 + * + * @param message + * 消息 + * @param headers + * 消息头 + * + * @return 消息发送结果 + */ + default Status sendMessage(final M message, final MessageHeaders headers) { + return sendMessage(message, headers, MessageChannel.INDEFINITE_TIMEOUT); + } + + /** + * 发送队列消息 + * + * @param message + * 消息 + * @param headers + * 消息头 + * @param timeout + * 发送消息超时时间 + * + * @return 消息发送结果 + */ + Status sendMessage(final M message, final MessageHeaders headers, final long timeout); + +} diff --git a/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Sink.java b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Sink.java new file mode 100644 index 0000000..de7b4c4 --- /dev/null +++ b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Sink.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.springcloud.stream.core; + +import org.springframework.messaging.SubscribableChannel; + +/** + * 消息消费 + * + * @author Yong.Teng + * @since 2.3.0 + */ +@FunctionalInterface +public interface Sink { + + /** + * 消息输出通道 + * + * @return {@link SubscribableChannel} + */ + SubscribableChannel input(); + +} diff --git a/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Source.java b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Source.java new file mode 100644 index 0000000..d5ea828 --- /dev/null +++ b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/Source.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +package com.buession.springcloud.stream.core; + +import org.springframework.messaging.MessageChannel; + +/** + * 消息源 + * + * @author Yong.Teng + * @since 2.3.0 + */ +@FunctionalInterface +public interface Source { + + /** + * 消息输出通道 + * + * @return {@link MessageChannel} + */ + MessageChannel output(); + +} + diff --git a/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/package-info.java b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/package-info.java new file mode 100644 index 0000000..a79e650 --- /dev/null +++ b/stream/buession-springcloud-stream-core/src/main/java/com/buession/springcloud/stream/core/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + * ========================================================================================================= + * + * This software consists of voluntary contributions made by many individuals on behalf of the + * Apache Software Foundation. For more information on the Apache Software Foundation, please see + * . + * + * +-------------------------------------------------------------------------------------------------------+ + * | License: http://www.apache.org/licenses/LICENSE-2.0.txt | + * | Author: Yong.Teng | + * | Copyright @ 2013-2023 Buession.com Inc. | + * +-------------------------------------------------------------------------------------------------------+ + */ +/** + * @author Yong.Teng + * @since 2.3.0 + */ +package com.buession.springcloud.stream.core; \ No newline at end of file