Skip to content

Commit

Permalink
Merge pull request #164 from crossoverJie/optimization-proto
Browse files Browse the repository at this point in the history
Protocol add properties
  • Loading branch information
crossoverJie authored Oct 21, 2024
2 parents 48adb95 + c72deb2 commit 02a8a38
Show file tree
Hide file tree
Showing 17 changed files with 79 additions and 63 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Using `CIM`, you can achieve the following requirements:
| [群聊](https://youtu.be/_9a4lIkQ5_o) [私聊](https://youtu.be/kfEfQFPLBTQ) | [群聊](https://www.bilibili.com/video/av39405501) [私聊](https://www.bilibili.com/video/av39405821) |
| <img src="https://i.loli.net//2019//05//08//5cd1d9e788004.jpg" height="295px" /> | <img src="https://i.loli.net//2019//05//08//5cd1da2f943c5.jpg" height="295px" />

![demo.gif](pic/demo.gif)

## TODO LIST

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static class Auth{

@JsonIgnore
private MessageListener messageListener =
(client, msg) -> System.out.printf("id:[%s] msg:[%s]%n \n", client.getAuth(), msg);
(client, properties, msg) -> System.out.printf("id:[%s] msg:[%s]%n \n", client.getAuth(), msg);

@JsonIgnore
private OkHttpClient okHttpClient = new OkHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import com.crossoverjie.cim.client.sdk.ReConnectManager;
import com.crossoverjie.cim.client.sdk.RouteManager;
import com.crossoverjie.cim.client.sdk.io.CIMClientHandleInitializer;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.exception.CIMException;
import com.crossoverjie.cim.common.kit.HeartBeatHandler;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.common.protocol.BaseCommand;
import com.crossoverjie.cim.common.protocol.Request;
import com.crossoverjie.cim.route.api.vo.req.ChatReqVO;
import com.crossoverjie.cim.route.api.vo.req.LoginReqVO;
Expand Down Expand Up @@ -82,7 +82,7 @@ public ClientImpl(ClientConfigurationData conf) {
heartBeatPacket = Request.newBuilder()
.setRequestId(this.conf.getAuth().getUserId())
.setReqMsg("ping")
.setType(Constants.CommandType.PING)
.setCmd(BaseCommand.PING)
.build();
client = this;

Expand Down Expand Up @@ -177,7 +177,7 @@ private void loginServer() {
Request login = Request.newBuilder()
.setRequestId(this.conf.getAuth().getUserId())
.setReqMsg(this.conf.getAuth().getUserName())
.setType(Constants.CommandType.LOGIN)
.setCmd(BaseCommand.LOGIN_REQUEST)
.build();
channel.writeAndFlush(login)
.addListener((ChannelFutureListener) channelFuture ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.crossoverjie.cim.client.sdk.ClientState;
import com.crossoverjie.cim.client.sdk.impl.ClientImpl;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.protocol.BaseCommand;
import com.crossoverjie.cim.common.protocol.Response;
import com.crossoverjie.cim.common.util.NettyAttrUtil;
import io.netty.channel.ChannelFutureListener;
Expand Down Expand Up @@ -60,15 +60,15 @@ public void channelInactive(ChannelHandlerContext ctx) {
protected void channelRead0(ChannelHandlerContext ctx, Response msg) {


if (msg.getType() == Constants.CommandType.PING) {
if (msg.getCmd() == BaseCommand.PING) {
ClientImpl.getClient().getConf().getEvent().debug("received ping from server");
NettyAttrUtil.updateReaderTime(ctx.channel(), System.currentTimeMillis());
}

if (msg.getType() != Constants.CommandType.PING) {
if (msg.getCmd() != BaseCommand.PING) {
// callback
ClientImpl.getClient().getConf().getCallbackThreadPool().execute(() -> {
ClientImpl.getClient().getConf().getMessageListener().received(ClientImpl.getClient(), msg.getResMsg());
ClientImpl.getClient().getConf().getMessageListener().received(ClientImpl.getClient(), msg.getPropertiesMap(), msg.getResMsg());
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package com.crossoverjie.cim.client.sdk.io;

import com.crossoverjie.cim.client.sdk.Client;
import java.util.Map;

public interface MessageListener {

/**
* @param client client
* @param msg msgs
* @param client client
* @param properties meta data
* @param msg msgs
*/
void received(Client client, String msg);
void received(Client client, Map<String, String> properties, String msg);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.crossoverjie.cim.client.sdk.impl.ClientConfigurationData;
import com.crossoverjie.cim.client.sdk.io.backoff.RandomBackoff;
import com.crossoverjie.cim.client.sdk.route.AbstractRouteBaseTest;
import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.pojo.CIMUserInfo;
import com.crossoverjie.cim.route.api.vo.req.P2PReqVO;
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;
Expand Down Expand Up @@ -63,7 +64,11 @@ public void groupChat() throws Exception {
Client client2 = Client.builder()
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.messageListener((client, properties, message) -> {
client2Receive.set(message);
Assertions.assertEquals(properties.get(Constants.MetaKey.USER_ID), String.valueOf(auth1.getUserId()));
Assertions.assertEquals(properties.get(Constants.MetaKey.USER_NAME), auth1.getUserName());
})
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state2 = client2.getState();
Expand Down Expand Up @@ -91,7 +96,7 @@ public void groupChat() throws Exception {
});

Awaitility.await().untilAsserted(
() -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get()));
() -> Assertions.assertEquals(msg, client2Receive.get()));
super.stopSingle();
}

Expand Down Expand Up @@ -139,7 +144,7 @@ public void testP2PChat() throws Exception {
Client client2 = Client.builder()
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.messageListener((client, properties, message) -> client2Receive.set(message))
.build();
TimeUnit.SECONDS.sleep(3);
ClientState.State state2 = client2.getState();
Expand All @@ -156,7 +161,7 @@ public void testP2PChat() throws Exception {
Client client3 = Client.builder()
.auth(auth3)
.routeUrl(routeUrl)
.messageListener((client, message) -> {
.messageListener((client, properties, message) -> {
log.info("client3 receive message = {}", message);
client3Receive.set(message);
})
Expand Down Expand Up @@ -192,7 +197,7 @@ public void testP2PChat() throws Exception {
});

Awaitility.await().untilAsserted(
() -> Assertions.assertEquals(String.format("%s:%s", cj, msg), client3Receive.get()));
() -> Assertions.assertEquals(msg, client3Receive.get()));
Awaitility.await().untilAsserted(
() -> Assertions.assertNull(client2Receive.get()));
super.stopSingle();
Expand Down Expand Up @@ -244,7 +249,7 @@ public void testReconnect() throws Exception {
Client client2 = Client.builder()
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.messageListener((client, properties, message) -> client2Receive.set(message))
.backoffStrategy(backoffStrategy)
.build();
TimeUnit.SECONDS.sleep(3);
Expand All @@ -260,7 +265,7 @@ public void testReconnect() throws Exception {
String msg = "hello";
client1.sendGroup(msg);
Awaitility.await()
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
.untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get()));
client2Receive.set("");


Expand All @@ -287,7 +292,7 @@ public void testReconnect() throws Exception {
log.info("send message again, client2Receive = {}", client2Receive.get());
client1.sendGroup(msg);
Awaitility.await()
.untilAsserted(() -> Assertions.assertEquals(String.format("cj:%s", msg), client2Receive.get()));
.untilAsserted(() -> Assertions.assertEquals(msg, client2Receive.get()));
super.stopTwoServer();
}

Expand Down Expand Up @@ -327,7 +332,7 @@ public void offLineAndOnline() throws Exception {
Client client2 = Client.builder()
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.messageListener((client, properties, message) -> client2Receive.set(message))
// Avoid auto reconnect, this test will manually close client.
.reconnectCheck((client) -> false)
.build();
Expand All @@ -344,7 +349,7 @@ public void offLineAndOnline() throws Exception {
String msg = "hello";
client1.sendGroup(msg);
Awaitility.await().untilAsserted(
() -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get()));
() -> Assertions.assertEquals(msg, client2Receive.get()));
client2Receive.set("");

// Manually offline
Expand All @@ -353,7 +358,7 @@ public void offLineAndOnline() throws Exception {
client2 = Client.builder()
.auth(auth2)
.routeUrl(routeUrl)
.messageListener((client, message) -> client2Receive.set(message))
.messageListener((client, properties, message) -> client2Receive.set(message))
// Avoid to auto reconnect, this test will manually close client.
.reconnectCheck((client) -> false)
.build();
Expand All @@ -364,7 +369,7 @@ public void offLineAndOnline() throws Exception {
// send msg again
client1.sendGroup(msg);
Awaitility.await().untilAsserted(
() -> Assertions.assertEquals(String.format("crossoverJie:%s", msg), client2Receive.get()));
() -> Assertions.assertEquals(msg, client2Receive.get()));

super.stopSingle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.crossoverjie.cim.client.sdk.Event;
import com.crossoverjie.cim.client.sdk.io.MessageListener;
import com.crossoverjie.cim.client.service.MsgLogger;
import com.crossoverjie.cim.common.constant.Constants;
import java.util.Map;

/**
* Function:自定义收到消息回调
Expand All @@ -25,8 +27,9 @@ public MsgCallBackListener(MsgLogger msgLogger, Event event) {


@Override
public void received(Client client, String msg) {
this.msgLogger.log(msg);
this.event.info(msg);
public void received(Client client, Map<String, String> properties, String msg) {
String sendUserName = properties.getOrDefault(Constants.MetaKey.USER_NAME, "nobody");
this.msgLogger.log(sendUserName + ":" + msg);
this.event.info(sendUserName + ":" + msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,9 @@ public class Constants {
*/
public static final String COUNTER_CLIENT_PUSH_COUNT = "counter.client.push.count" ;


/**
* 自定义报文类型
*/
public static class CommandType{
/**
* 登录
*/
public static final int LOGIN = 1 ;
/**
* 业务消息
*/
public static final int MSG = 2 ;

/**
* ping
*/
public static final int PING = 3 ;
public static class MetaKey {
public static final String USER_ID = "userId" ;
public static final String USER_NAME = "userName" ;
}


}
21 changes: 14 additions & 7 deletions cim-common/src/main/proto/cim.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@ option java_package = "com.crossoverjie.cim.common.protocol";
option java_multiple_files = true;

message Request{
// todo source user info
int64 requestId = 2;
string reqMsg = 1;
int32 type = 3;
int64 requestId = 2;
string reqMsg = 1;
BaseCommand cmd = 3;
map<string, string> properties = 4;
}

message Response{
int64 responseId = 2;
string resMsg = 1;
int32 type = 3;
int64 responseId = 2;
string resMsg = 1;
BaseCommand cmd = 3;
map<string, string> properties = 4;
}

enum BaseCommand{
LOGIN_REQUEST = 0;
MESSAGE = 1;
PING = 2;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.crossoverjie.cim.common.util;

import com.crossoverjie.cim.common.protocol.BaseCommand;
import com.crossoverjie.cim.common.protocol.Request;
import com.google.protobuf.InvalidProtocolBufferException;
import org.junit.Test;
Expand All @@ -11,7 +12,7 @@ public void testProtocol() throws InvalidProtocolBufferException {
Request protocol = Request.newBuilder()
.setRequestId(123L)
.setReqMsg("你好啊")
.setType(1)
.setCmd(BaseCommand.LOGIN_REQUEST)
.build();

byte[] encode = encode(protocol);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.crossoverjie.cim.route.service.impl;

import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.core.proxy.RpcProxyManager;
import com.crossoverjie.cim.common.enums.StatusEnum;
import com.crossoverjie.cim.common.exception.CIMException;
Expand All @@ -10,6 +11,7 @@
import com.crossoverjie.cim.route.api.vo.req.LoginReqVO;
import com.crossoverjie.cim.route.api.vo.res.CIMServerResVO;
import com.crossoverjie.cim.route.api.vo.res.RegisterInfoResVO;
import com.crossoverjie.cim.route.constant.Constant;
import com.crossoverjie.cim.route.service.AccountService;
import com.crossoverjie.cim.route.service.UserInfoCacheService;
import com.crossoverjie.cim.server.api.ServerApi;
Expand Down Expand Up @@ -159,7 +161,11 @@ public void pushMsg(CIMServerResVO cimServerResVO, long sendUserId, ChatReqVO gr
cimUserInfo.ifPresent(userInfo -> {
String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort();
SendMsgReqVO vo =
new SendMsgReqVO(userInfo.getUserName() + ":" + groupReqVO.getMsg(), groupReqVO.getUserId());
new SendMsgReqVO(groupReqVO.getMsg(), groupReqVO.getUserId());
vo.setProperties(Map.of(
Constants.MetaKey.USER_ID, String.valueOf(sendUserId),
Constants.MetaKey.USER_NAME, userInfo.getUserName())
);
serverApi.sendMsg(vo, url);

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public class UserInfoCacheServiceImpl implements UserInfoCacheService {
@Override
public Optional<CIMUserInfo> loadUserInfoByUserId(Long userId) {
//Retrieve user information using a second-level cache.
Optional<CIMUserInfo> cimUserInfo = userInfoMap.getUnchecked(userId);
return cimUserInfo;
return userInfoMap.getUnchecked(userId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;

/**
* Function:
Expand All @@ -22,6 +25,10 @@ public class SendMsgReqVO extends BaseRequest {
@Schema(requiredMode = Schema.RequiredMode.REQUIRED, description = "userId", example = "11")
private Long userId ;

@Setter
@Getter
private Map<String, String> properties;

public SendMsgReqVO() {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.crossoverjie.cim.server.config;

import com.crossoverjie.cim.common.constant.Constants;
import com.crossoverjie.cim.common.core.proxy.RpcProxyManager;
import com.crossoverjie.cim.common.metastore.MetaStore;
import com.crossoverjie.cim.common.metastore.ZkMetaStoreImpl;
import com.crossoverjie.cim.common.protocol.BaseCommand;
import com.crossoverjie.cim.common.protocol.Request;
import com.crossoverjie.cim.route.api.RouteApi;
import jakarta.annotation.Resource;
Expand Down Expand Up @@ -54,7 +54,7 @@ public Request heartBeat() {
return Request.newBuilder()
.setRequestId(0L)
.setReqMsg("pong")
.setType(Constants.CommandType.PING)
.setCmd(BaseCommand.PING)
.build();
}

Expand Down
Loading

0 comments on commit 02a8a38

Please sign in to comment.