Skip to content

Commit

Permalink
Merge branch 'master' into feature_Support_LRU_User_Info_Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
cmgyqjj authored Sep 22, 2024
2 parents 0823d51 + 218d1d4 commit 6aed87f
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,4 +395,26 @@ public void testClose() throws Exception {
super.stopSingle();
}

@Test
public void testIncorrectUser() throws Exception {
super.starSingleServer();
super.startRoute();
String routeUrl = "http://localhost:8083";
String cj = "xx";
long id = 100L;
var auth1 = ClientConfigurationData.Auth.builder()
.userId(id)
.userName(cj)
.build();

Client client1 = Client.builder()
.auth(auth1)
.routeUrl(routeUrl)
.build();
TimeUnit.SECONDS.sleep(3);

Assertions.assertDoesNotThrow(client1::close);

super.stopSingle();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import jakarta.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -63,7 +64,7 @@ public class RouteController implements RouteApi {
@RequestMapping(value = "groupRoute", method = RequestMethod.POST)
@ResponseBody()
@Override
public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) throws Exception {
public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) {
BaseResponse<NULLBody> res = new BaseResponse();

log.info("msg=[{}]", groupReqVO.toString());
Expand All @@ -75,8 +76,8 @@ public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) thro
CIMServerResVO cimServerResVO = cimServerResVOEntry.getValue();
if (userId.equals(groupReqVO.getUserId())) {
//过滤掉自己
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
log.warn("过滤掉了发送者 userId={}", cimUserInfo.toString());
Optional<CIMUserInfo> cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
cimUserInfo.ifPresent(userInfo -> log.warn("过滤掉了发送者 userId={}", userInfo.toString()));
continue;
}

Expand All @@ -102,7 +103,7 @@ public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) thro
@RequestMapping(value = "p2pRoute", method = RequestMethod.POST)
@ResponseBody()
@Override
public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) throws Exception {
public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) {
BaseResponse<NULLBody> res = new BaseResponse();

try {
Expand Down Expand Up @@ -131,10 +132,12 @@ public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) throws
public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) {
BaseResponse<NULLBody> res = new BaseResponse();

CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
Optional<CIMUserInfo> cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());

log.info("user [{}] offline!", cimUserInfo.toString());
accountService.offLine(groupReqVO.getUserId());
cimUserInfo.ifPresent(userInfo -> {
log.info("user [{}] offline!", userInfo.toString());
accountService.offLine(groupReqVO.getUserId());
});

res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public interface AccountService {
* @param sendUserId 发送者的ID
* @throws Exception
*/
void pushMsg(CIMServerResVO cimServerResVO, long sendUserId , ChatReqVO groupReqVO) throws Exception;
void pushMsg(CIMServerResVO cimServerResVO, long sendUserId , ChatReqVO groupReqVO);

/**
* 用户下线
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.crossoverjie.cim.common.pojo.CIMUserInfo;

import java.util.Optional;
import java.util.Set;

/**
Expand All @@ -19,7 +20,7 @@ public interface UserInfoCacheService {
* @return
* @throws Exception
*/
CIMUserInfo loadUserInfoByUserId(Long userId) ;
Optional<CIMUserInfo> loadUserInfoByUserId(Long userId) ;

/**
* 保存和检查用户登录情况
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static com.crossoverjie.cim.common.enums.StatusEnum.OFF_LINE;
import static com.crossoverjie.cim.route.constant.Constant.*;
Expand All @@ -53,9 +54,6 @@ public class AccountServiceRedisImpl implements AccountService {
@Resource
private UserInfoCacheService userInfoCacheService;

@Resource
private OkHttpClient okHttpClient;

@Resource
private ServerApi serverApi;

Expand Down Expand Up @@ -139,27 +137,32 @@ public CIMServerResVO loadRouteRelatedByUserId(Long userId) {
}

RouteInfo parse = RouteInfoParseUtil.parse(value);
CIMServerResVO cimServerResVO = new CIMServerResVO(parse.getIp(), parse.getCimServerPort(), parse.getHttpPort());
CIMServerResVO cimServerResVO =
new CIMServerResVO(parse.getIp(), parse.getCimServerPort(), parse.getHttpPort());
return cimServerResVO;
}

private void parseServerInfo(Map<Long, CIMServerResVO> routes, String key) {
long userId = Long.valueOf(key.split(":")[1]);
String value = redisTemplate.opsForValue().get(key);
RouteInfo parse = RouteInfoParseUtil.parse(value);
CIMServerResVO cimServerResVO = new CIMServerResVO(parse.getIp(), parse.getCimServerPort(), parse.getHttpPort());
CIMServerResVO cimServerResVO =
new CIMServerResVO(parse.getIp(), parse.getCimServerPort(), parse.getHttpPort());
routes.put(userId, cimServerResVO);
}


@Override
public void pushMsg(CIMServerResVO cimServerResVO, long sendUserId, ChatReqVO groupReqVO) throws Exception {
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(sendUserId);
public void pushMsg(CIMServerResVO cimServerResVO, long sendUserId, ChatReqVO groupReqVO) {
Optional<CIMUserInfo> cimUserInfo = userInfoCacheService.loadUserInfoByUserId(sendUserId);

String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort();
cimUserInfo.ifPresent(userInfo -> {
String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort();
SendMsgReqVO vo =
new SendMsgReqVO(userInfo.getUserName() + ":" + groupReqVO.getMsg(), groupReqVO.getUserId());
serverApi.sendMsg(vo, url);

SendMsgReqVO vo = new SendMsgReqVO(cimUserInfo.getUserName() + ":" + groupReqVO.getMsg(), groupReqVO.getUserId());
serverApi.sendMsg(vo, url);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -37,11 +38,10 @@ public class UserInfoCacheServiceImpl implements UserInfoCacheService {
private LoadingCache<Long, CIMUserInfo> USER_INFO_MAP;

@Override
public CIMUserInfo loadUserInfoByUserId(Long userId) {
public Optional<CIMUserInfo> loadUserInfoByUserId(Long userId) {
//Retrieve user information using a second-level cache.
CIMUserInfo cimUserInfo = null;
cimUserInfo = USER_INFO_MAP.getUnchecked(userId);
return cimUserInfo;
CIMUserInfo = USER_INFO_MAP.getUnchecked(userId);
return Optional.ofNullable(cimUserInfo);
}

@Override
Expand All @@ -63,8 +63,9 @@ public Set<CIMUserInfo> onlineUser() {
if (set == null){
set = new HashSet<>(64) ;
}
CIMUserInfo cimUserInfo = loadUserInfoByUserId(Long.valueOf(member)) ;
set.add(cimUserInfo) ;
Optional<CIMUserInfo> cimUserInfo = loadUserInfoByUserId(Long.valueOf(member)) ;

cimUserInfo.ifPresent(set::add);
}

return set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ public interface RouteApi {
* @return
* @throws Exception
*/
BaseResponse<NULLBody> groupRoute(ChatReqVO groupReqVO) throws Exception;
BaseResponse<NULLBody> groupRoute(ChatReqVO groupReqVO);

/**
* Point to point chat
* @param p2pRequest
* @return
* @throws Exception
*/
BaseResponse<NULLBody> p2pRoute(P2PReqVO p2pRequest) throws Exception;
BaseResponse<NULLBody> p2pRoute(P2PReqVO p2pRequest);


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ public interface ServerApi {
* @return
* @throws Exception
*/
BaseResponse<SendMsgResVO> sendMsg(SendMsgReqVO sendMsgReqVO, @DynamicUrl String url) throws Exception;
BaseResponse<SendMsgResVO> sendMsg(SendMsgReqVO sendMsgReqVO, @DynamicUrl String url);
}

0 comments on commit 6aed87f

Please sign in to comment.