Skip to content

Commit

Permalink
【stream-core】增加 Source、Sink、Producer、Customer
Browse files Browse the repository at this point in the history
  • Loading branch information
Yong.Teng committed Jul 15, 2023
1 parent fd67a1d commit 8c0f422
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
* =========================================================================================================
*
* This software consists of voluntary contributions made by many individuals on behalf of the
* Apache Software Foundation. For more information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* +-------------------------------------------------------------------------------------------------------+
* | License: http://www.apache.org/licenses/LICENSE-2.0.txt |
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/
package com.buession.springcloud.stream.core;

import com.buession.core.utils.Assert;
import com.buession.lang.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 消息消费者抽象类
*
* @param <M>
* 消息类型
* @param <S>
* 消息消费 {@link Sink}
*
* @author Yong.Teng
* @since 2.3.0
*/
public abstract class AbstractCustomer<M, S extends Sink> implements Customer<M> {

/**
* 消息消费 {@link Sink}
*/
protected S sink;

protected Logger logger = LoggerFactory.getLogger(getClass());

/**
* 构造函数
*
* @param sink
* 消息消费 {@link Sink}
*/
public AbstractCustomer(final S sink) {
Assert.isNull(sink, "Sink cloud not be null.");
this.sink = sink;
}

@Override
public void onMessage(final M message) {
Assert.isNull(message, "Message cloud not be null.");

if(consume(message) == Status.SUCCESS){
logger.info("Message consume success.");
}else{
logger.warn("Message consume failure.");
}
}

protected abstract Status consume(final M payload);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
* =========================================================================================================
*
* This software consists of voluntary contributions made by many individuals on behalf of the
* Apache Software Foundation. For more information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* +-------------------------------------------------------------------------------------------------------+
* | License: http://www.apache.org/licenses/LICENSE-2.0.txt |
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/
package com.buession.springcloud.stream.core;

import com.buession.core.utils.Assert;
import com.buession.lang.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

/**
* 消息生产者抽象类
*
* @param <M>
* 消息类型
* @param <S>
* 消息源 {@link Source}
*
* @author Yong.Teng
* @since 2.3.0
*/
public abstract class AbstractProducer<M, S extends Source> implements Producer<M> {

/**
* 消息源 {@link Source}
*/
protected S source;

protected Logger logger = LoggerFactory.getLogger(getClass());

/**
* 构造函数
*
* @param source
* 消息源 {@link Source}
*/
public AbstractProducer(final S source) {
Assert.isNull(source, "Source cloud not be null.");
this.source = source;
}

@Override
public Status sendMessage(final M message, final MessageHeaders headers, final long timeout) {
Assert.isNull(message, "Message cloud not be null.");

org.springframework.messaging.Message<M> actualMessage = headers == null ?
MessageBuilder.withPayload(message).build() : MessageBuilder.createMessage(message, headers);

if(source.output().send(actualMessage, timeout)){
logger.info("Message send success.");
return Status.SUCCESS;
}else{
logger.warn("Message send failure.");
return Status.FAILURE;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
* =========================================================================================================
*
* This software consists of voluntary contributions made by many individuals on behalf of the
* Apache Software Foundation. For more information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* +-------------------------------------------------------------------------------------------------------+
* | License: http://www.apache.org/licenses/LICENSE-2.0.txt |
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/
package com.buession.springcloud.stream.core;

/**
* 消息消费者
*
* @param <M>
* 消息类型
*
* @author Yong.Teng
* @since 2.3.0
*/
public interface Customer<M> {

/**
* 消息消费
*
* @param message
* 消息
*/
void onMessage(final M message);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
* =========================================================================================================
*
* This software consists of voluntary contributions made by many individuals on behalf of the
* Apache Software Foundation. For more information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* +-------------------------------------------------------------------------------------------------------+
* | License: http://www.apache.org/licenses/LICENSE-2.0.txt |
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/
package com.buession.springcloud.stream.core;

import com.buession.lang.Status;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;

/**
* 消息生产者
*
* @param <M>
* 消息类型
*
* @author Yong.Teng
* @since 2.3.0
*/
@FunctionalInterface
public interface Producer<M> {

/**
* 发送队列消息
*
* @param message
* 消息
*
* @return 消息发送结果
*/
default Status sendMessage(final M message) {
return sendMessage(message, MessageChannel.INDEFINITE_TIMEOUT);
}

/**
* 发送队列消息
*
* @param message
* 消息
* @param timeout
* 发送消息超时时间
*
* @return 消息发送结果
*/
default Status sendMessage(final M message, final long timeout) {
return sendMessage(message, null, timeout);
}

/**
* 发送队列消息
*
* @param message
* 消息
* @param headers
* 消息头
*
* @return 消息发送结果
*/
default Status sendMessage(final M message, final MessageHeaders headers) {
return sendMessage(message, headers, MessageChannel.INDEFINITE_TIMEOUT);
}

/**
* 发送队列消息
*
* @param message
* 消息
* @param headers
* 消息头
* @param timeout
* 发送消息超时时间
*
* @return 消息发送结果
*/
Status sendMessage(final M message, final MessageHeaders headers, final long timeout);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
* =========================================================================================================
*
* This software consists of voluntary contributions made by many individuals on behalf of the
* Apache Software Foundation. For more information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* +-------------------------------------------------------------------------------------------------------+
* | License: http://www.apache.org/licenses/LICENSE-2.0.txt |
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/
package com.buession.springcloud.stream.core;

import org.springframework.messaging.SubscribableChannel;

/**
* 消息消费
*
* @author Yong.Teng
* @since 2.3.0
*/
@FunctionalInterface
public interface Sink {

/**
* 消息输出通道
*
* @return {@link SubscribableChannel}
*/
SubscribableChannel input();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
* =========================================================================================================
*
* This software consists of voluntary contributions made by many individuals on behalf of the
* Apache Software Foundation. For more information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
* +-------------------------------------------------------------------------------------------------------+
* | License: http://www.apache.org/licenses/LICENSE-2.0.txt |
* | Author: Yong.Teng <[email protected]> |
* | Copyright @ 2013-2023 Buession.com Inc. |
* +-------------------------------------------------------------------------------------------------------+
*/
package com.buession.springcloud.stream.core;

import org.springframework.messaging.MessageChannel;

/**
* 消息源
*
* @author Yong.Teng
* @since 2.3.0
*/
@FunctionalInterface
public interface Source {

/**
* 消息输出通道
*
* @return {@link MessageChannel}
*/
MessageChannel output();

}

Loading

0 comments on commit 8c0f422

Please sign in to comment.