diff --git a/saturn-job-api/src/main/java/com/vip/saturn/job/AbstractSaturnBatchMsgJob.java b/saturn-job-api/src/main/java/com/vip/saturn/job/AbstractSaturnBatchMsgJob.java new file mode 100644 index 000000000..f70b1c5f1 --- /dev/null +++ b/saturn-job-api/src/main/java/com/vip/saturn/job/AbstractSaturnBatchMsgJob.java @@ -0,0 +1,164 @@ +/** + * Copyright 2016 vip.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + *

+ **/ + +package com.vip.saturn.job; + +import com.vip.saturn.job.BaseSaturnJob; +import com.vip.saturn.job.SaturnJobExecutionContext; +import com.vip.saturn.job.SaturnJobReturn; +import com.vip.saturn.job.msg.MsgHolder; + +import java.util.List; + +public abstract class AbstractSaturnBatchMsgJob extends BaseSaturnJob { + + /** + * 消息作业处理入口 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolder 消息内容 + * @param shardingContext 其它参数信息 + * @return 返回执行结果 + * @throws InterruptedException 注意处理中断异常 + */ + public abstract SaturnJobReturn handleMsgJob(String jobName, Integer shardItem, String shardParam, + MsgHolder msgHolder, SaturnJobExecutionContext shardingContext) throws InterruptedException; + + /** + * 超时强杀之前调用此方法 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolder 消息内容 + * @param shardingContext 其它参数信息 + */ + public void beforeTimeout(String jobName, Integer shardItem, String shardParam, MsgHolder msgHolder, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + + /** + * 超时强杀之后调用此方法 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolder 消息内容 + * @param shardingContext 其它参数信息 + */ + public void onTimeout(String jobName, Integer shardItem, String shardParam, MsgHolder msgHolder, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + + /** + * 在saturn-console对作业立即终止,或者优雅退出超时,或者与zk失去连接时,都会在强杀业务线程之前调用此方法。 + *

+ * 注意,作业执行超时,强杀之前不会调用此方法,而是调用{@link #beforeTimeout(String, Integer, String, MsgHolder, SaturnJobExecutionContext)}方法。 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolder 消息内容 + * @param shardingContext 其它参数信息 + */ + public void beforeForceStop(String jobName, Integer shardItem, String shardParam, MsgHolder msgHolder, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + + /** + * 在saturn-console对作业立即终止,或者优雅退出超时,或者与zk失去连接时,都会在强杀业务线程之后调用此方法。 + *

+ * 注意,作业执行超时,强杀之后不会调用此方法,而是调用{@link #onTimeout(String, Integer, String, MsgHolder, SaturnJobExecutionContext)}方法。 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolder 消息内容 + * @param shardingContext 其它参数信息 + */ + public void postForceStop(String jobName, Integer shardItem, String shardParam, MsgHolder msgHolder, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + + /** + * 批量消息作业处理入口 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolders 消息内容 + * @param shardingContext 其它参数信息 + * @return 返回执行结果 + * @throws InterruptedException 注意处理中断异常 + */ + public abstract SaturnJobReturn handleBatchMsgJob(String jobName, Integer shardItem, String shardParam, + List msgHolders, SaturnJobExecutionContext shardingContext) throws InterruptedException; + + /** + * 超时强杀之前调用此方法 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolders 消息内容 + * @param shardingContext 其它参数信息 + */ + public void beforeTimeout(String jobName, Integer shardItem, String shardParam, List msgHolders, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + + /** + * 超时强杀之后调用此方法 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolders 消息内容 + * @param shardingContext 其它参数信息 + */ + public void onTimeout(String jobName, Integer shardItem, String shardParam, List msgHolders, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + + /** + * 在saturn-console对作业立即终止,或者优雅退出超时,或者与zk失去连接时,都会在强杀业务线程之前调用此方法。 + *

+ * 注意,作业执行超时,强杀之前不会调用此方法,而是调用{@link #beforeTimeout(String, Integer, String, List, SaturnJobExecutionContext)}方法。 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolders 消息内容 + * @param shardingContext 其它参数信息 + */ + public void beforeForceStop(String jobName, Integer shardItem, String shardParam, List msgHolders, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + + /** + * 在saturn-console对作业立即终止,或者优雅退出超时,或者与zk失去连接时,都会在强杀业务线程之后调用此方法。 + *

+ * 注意,作业执行超时,强杀之后不会调用此方法,而是调用{@link #onTimeout(String, Integer, String, List, SaturnJobExecutionContext)}方法。 + * @param jobName 作业名 + * @param shardItem 分片项 + * @param shardParam 分片参数 + * @param msgHolders 消息内容 + * @param shardingContext 其它参数信息 + */ + public void postForceStop(String jobName, Integer shardItem, String shardParam, List msgHolders, + SaturnJobExecutionContext shardingContext) { + // 由作业类实现逻辑 + } + +} diff --git a/saturn-job-api/src/main/java/com/vip/saturn/job/SaturnJobReturn.java b/saturn-job-api/src/main/java/com/vip/saturn/job/SaturnJobReturn.java index 7d8357f6b..75c42b1e2 100644 --- a/saturn-job-api/src/main/java/com/vip/saturn/job/SaturnJobReturn.java +++ b/saturn-job-api/src/main/java/com/vip/saturn/job/SaturnJobReturn.java @@ -14,9 +14,12 @@ package com.vip.saturn.job; +import com.vip.saturn.job.msg.MsgHolder; +import com.vip.saturn.job.msg.SaturnDelayedLevel; + import java.io.Serializable; import java.lang.reflect.Field; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -31,9 +34,20 @@ public class SaturnJobReturn implements Serializable { public static final String MSG_CONSUME_STATUS_PROP_KEY = "consumeStatus"; + public static final String MSG_BATCH_CONSUME_SUCCESS_OFFSETS = "successOffsets"; + + public static final String MSG_BATCH_CONSUME_DISCARD_OFFSETS = "discardOffsets"; + + public static final String MSG_BATCH_CONSUME_DELAY_OFFSETS = "delayOffsets"; + + public static final String MSG_BATCH_CONSUME_DEFAULT_STATUS = "defaultConsumeStatus"; + + public static final String MSG_ALL = "MSG_ALL"; + + public static final String OFFSET_SEPERATOR = ","; + /** - * 支持16个延时等级的投递,默认情况按照重试次数依次使用不同延时来进行消息再投递;用户亦可修改每次重试的延时。 16个延时级别为: 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m - * 30m 1h; 用户可修改每次延迟的时间间隔; 其中delayLevel为1对应5s,16对应1h + * please refer to SaturnDelayedLevel */ public static final String DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY = "delayLevel"; @@ -57,6 +71,10 @@ public class SaturnJobReturn implements Serializable { */ private Map prop; + public static SaturnJobReturnBuilder builder() { + return new SaturnJobReturnBuilder(); + } + /** * returnCode默认0(成功),errorGroup默认200(成功)。 * @see SaturnSystemReturnCode @@ -153,6 +171,9 @@ public void setProp(Map prop) { this.prop = prop; } + /** + * only use for single consume + */ public void reconsumeLater() { if (prop == null) { prop = new ConcurrentHashMap<>(); @@ -160,6 +181,10 @@ public void reconsumeLater() { prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.RECONSUME_LATER.name()); } + /** + * only use for single consume + */ + @Deprecated public void reconsumeLater(int delayLevel) { if (prop == null) { prop = new ConcurrentHashMap<>(); @@ -168,10 +193,339 @@ public void reconsumeLater(int delayLevel) { prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel)); } + /** + * only use for single consume + */ + public void reconsumeLater(SaturnDelayedLevel delayLevel) { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.RECONSUME_LATER.name()); + prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel.getValue())); + } + + /** + * only use for single consume + */ + public void complete() { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.CONSUME_SUCCESS.name()); + } + + /** + * only use for single consume + */ + public void discard() { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.CONSUME_DISCARD.name()); + } + + /** + * only use for batch consume + */ + public void completeAll() { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(MSG_BATCH_CONSUME_SUCCESS_OFFSETS, MSG_ALL); + } + + /** + * only use for batch consume + */ + public boolean isCompleteAll() { + if (prop == null) { + return false; + } + return MSG_ALL.equals(prop.get(MSG_BATCH_CONSUME_SUCCESS_OFFSETS)); + } + + /** + * only use for batch consume + */ + public void completeSome(List msgHolders) { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(MSG_BATCH_CONSUME_SUCCESS_OFFSETS, collectOffsetsToString(msgHolders)); + } + + /** + * only use for batch consume + */ + public List getCompleteOffsets() { + if (prop == null) { + return Collections.emptyList(); + } + String offsetsStr = prop.get(MSG_BATCH_CONSUME_SUCCESS_OFFSETS); + return parseOffsetsStr(offsetsStr); + } + + /** + * only use for batch consume + */ + public void reconsumeSome(List msgHolders) { + reconsumeSome(msgHolders, null); + } + + /** + * only use for batch consume + */ + public void reconsumeSome(List msgHolders, SaturnDelayedLevel delayLevel) { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(MSG_BATCH_CONSUME_DELAY_OFFSETS, collectOffsetsToString(msgHolders)); + if (delayLevel != null) { + prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel.getValue())); + } + } + + /** + * only use for batch consume + */ + public List getReconsumeOffsets() { + if (prop == null) { + return Collections.emptyList(); + } + String offsetsStr = prop.get(MSG_BATCH_CONSUME_DELAY_OFFSETS); + return parseOffsetsStr(offsetsStr); + } + + /** + * only use for batch consume + */ + public void reconsumeAllLater() { + reconsumeAllLater(null); + } + + /** + * only use for batch consume + */ + public void reconsumeAllLater(SaturnDelayedLevel delayLevel) { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(MSG_BATCH_CONSUME_DELAY_OFFSETS, MSG_ALL); + if (delayLevel != null) { + prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel.getValue())); + } + } + + /** + * only use for batch consume + */ + public boolean isReconsumeAll() { + if (prop == null) { + return false; + } + return MSG_ALL.equals(prop.get(MSG_BATCH_CONSUME_DELAY_OFFSETS)); + } + + public String getDelayLevel() { + if (prop == null) { + return null; + } + return prop.get(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY); + } + + /** + * only use for batch consume + */ + public void discardSome(List msgHolders) { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(MSG_BATCH_CONSUME_DISCARD_OFFSETS, collectOffsetsToString(msgHolders)); + } + + /** + * only use for batch consume + */ + public List getDiscardOffsets() { + if (prop == null) { + return Collections.emptyList(); + } + String offsetsStr = prop.get(MSG_BATCH_CONSUME_DISCARD_OFFSETS); + return parseOffsetsStr(offsetsStr); + } + + /** + * only use for batch consume + */ + public void setBatchConsumeDefaultStatus(SaturnConsumeStatus consumeStatus) { + if (prop == null) { + prop = new ConcurrentHashMap<>(); + } + prop.put(MSG_BATCH_CONSUME_DEFAULT_STATUS, consumeStatus.name()); + } + + /** + * only use for batch consume + */ + public String getBatchConsumeDefaultStatus() { + if (prop == null) { + return null; + } + return prop.get(MSG_BATCH_CONSUME_DEFAULT_STATUS); + } + @Override public String toString() { return "SaturnJobReturn [returnCode=" + returnCode + ", returnMsg=" + returnMsg + ", errorGroup=" + errorGroup + ", prop=" + prop + "]"; } + private String collectOffsetsToString(List msgHolders) { + if (msgHolders == null && msgHolders.size() == 0) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < msgHolders.size(); i++) { + if (i > 0) { + sb.append(OFFSET_SEPERATOR); + } + sb.append(String.valueOf(msgHolders.get(i).getOffset())); + } + return sb.toString(); + } + + private List parseOffsetsStr(String offsetsStr) { + if (offsetsStr == null || offsetsStr.isEmpty()) { + return Collections.emptyList(); + } + String[] splits = offsetsStr.split(OFFSET_SEPERATOR); + return Arrays.asList(splits); + } + + + public static class SaturnJobReturnBuilder { + + private SaturnJobReturn saturnJobReturn; + + private SaturnJobReturnBuilder() { + this.saturnJobReturn = new SaturnJobReturn(); + } + + public SaturnJobReturn build() { + return saturnJobReturn; + } + + public SaturnJobReturnBuilder returnCode(int returnCode) { + saturnJobReturn.returnCode = returnCode; + return this; + } + + public SaturnJobReturnBuilder returnMsg(String returnMsg) { + saturnJobReturn.returnMsg = returnMsg; + return this; + } + + public SaturnJobReturnBuilder errorGroup(int errorGroup) { + saturnJobReturn.errorGroup = errorGroup; + return this; + } + + /** + * only use for single consume + */ + public SaturnJobReturnBuilder reconsumeLater() { + saturnJobReturn.reconsumeLater(); + return this; + } + + /** + * only use for single consume + */ + public SaturnJobReturnBuilder reconsumeLater(SaturnDelayedLevel delayLevel) { + saturnJobReturn.reconsumeLater(delayLevel); + return this; + } + + /** + * only use for single consume + */ + public SaturnJobReturnBuilder complete() { + saturnJobReturn.complete(); + return this; + } + + /** + * only use for single consume + */ + public SaturnJobReturnBuilder discard() { + saturnJobReturn.discard(); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder completeAll() { + saturnJobReturn.completeAll(); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder completeSome(List msgHolders) { + saturnJobReturn.completeSome(msgHolders); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder reconsumeSome(List msgHolders) { + saturnJobReturn.reconsumeSome(msgHolders); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder reconsumeSome(List msgHolders, SaturnDelayedLevel delayLevel) { + saturnJobReturn.reconsumeSome(msgHolders, delayLevel); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder reconsumeAll() { + saturnJobReturn.reconsumeAllLater(); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder reconsumeAll(SaturnDelayedLevel delayLevel) { + saturnJobReturn.reconsumeAllLater(delayLevel); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder discardSome(List msgHolders) { + saturnJobReturn.discardSome(msgHolders); + return this; + } + + /** + * only use for batch consume + */ + public SaturnJobReturnBuilder batchConsumeDefaultStatus(SaturnConsumeStatus consumeStatus) { + saturnJobReturn.setBatchConsumeDefaultStatus(consumeStatus); + return this; + } + + } + } diff --git a/saturn-job-api/src/main/java/com/vip/saturn/job/msg/SaturnDelayedLevel.java b/saturn-job-api/src/main/java/com/vip/saturn/job/msg/SaturnDelayedLevel.java new file mode 100644 index 000000000..7470f6886 --- /dev/null +++ b/saturn-job-api/src/main/java/com/vip/saturn/job/msg/SaturnDelayedLevel.java @@ -0,0 +1,100 @@ +/** + * Copyright 2016 vip.com. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + *

+ **/ +package com.vip.saturn.job.msg; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +/** + * 支持20个延时等级的投递,默认情况按照重试次数依次使用不同延时来进行消息再投递;用户亦可修改每次重试的延时。 + */ +public enum SaturnDelayedLevel { + DELAYED_LEVEL_5S(1, 5, TimeUnit.SECONDS), + + DELAYED_LEVEL_10S(2, 10, TimeUnit.SECONDS), + + DELAYED_LEVEL_15S(3, 15, TimeUnit.SECONDS), + + DELAYED_LEVEL_30S(4, 30, TimeUnit.SECONDS), + + DELAYED_LEVEL_45S(5, 45, TimeUnit.SECONDS), + + DELAYED_LEVEL_1M(6, 1, TimeUnit.MINUTES), + + DELAYED_LEVEL_2M(7, 2, TimeUnit.MINUTES), + + DELAYED_LEVEL_3M(8, 3, TimeUnit.MINUTES), + + DELAYED_LEVEL_4M(9, 4, TimeUnit.MINUTES), + + DELAYED_LEVEL_5M(10, 5, TimeUnit.MINUTES), + + DELAYED_LEVEL_6M(11, 6, TimeUnit.MINUTES), + + DELAYED_LEVEL_7M(12, 7, TimeUnit.MINUTES), + + DELAYED_LEVEL_8M(13, 8, TimeUnit.MINUTES), + + DELAYED_LEVEL_9M(14, 9, TimeUnit.MINUTES), + + DELAYED_LEVEL_10M(15, 10, TimeUnit.MINUTES), + + DELAYED_LEVEL_20M(16, 20, TimeUnit.MINUTES), + + DELAYED_LEVEL_30M(17, 30, TimeUnit.MINUTES), + + DELAYED_LEVEL_45M(18, 45, TimeUnit.MINUTES), + + DELAYED_LEVEL_1H(19, 1, TimeUnit.HOURS), + + DELAYED_LEVEL_2H(20, 2, TimeUnit.HOURS), + + DELAYED_LEVEL_NULL(21, 0, TimeUnit.MILLISECONDS); + + private final int value; + private final long durationMs; + + SaturnDelayedLevel(int value, long duration, TimeUnit unit) { + this.value = value; + this.durationMs = unit.toMillis(duration); + } + + public static List getAllDelayLevels() { + SaturnDelayedLevel[] values = SaturnDelayedLevel.values(); + List levels = new ArrayList<>(Arrays.asList(values)); + levels.remove(DELAYED_LEVEL_NULL); + return levels; + } + + public static SaturnDelayedLevel valueOf(int value) { + SaturnDelayedLevel[] allLevels = values(); + if (value <= 0 || value > validLength()) { + return DELAYED_LEVEL_NULL; + } + return allLevels[value - 1]; + } + + private static int validLength() { + return values().length - 1; + } + + public int getValue() { + return value; + } + + public long getDurationMs() { + return durationMs; + } + +}