From 29b4160b56b3b5f71cce82080d799081611718dd Mon Sep 17 00:00:00 2001 From: ShingmoYeung <525032303@qq.com> Date: Tue, 8 Aug 2023 17:48:48 +0800 Subject: [PATCH] feat: Modify pub&sub code under cluster (#5) * [NEW] Added maven-ci.yml [NEW] Added WatcherConstant.java [NEW] Added LettuceRedisWatcher.java [NEW] Added LettuceSubscriber.java [NEW] Added LettuceSubThread.java [NEW] Added LettuceRedisWatcherTest.java [UPDATE] Updated .gitignore [NEW] Added .releaserc.json [NEW] Added maven-settings.xml [NEW] Added pom.xml [UPDATE] Updated README.md * Refactor LettuceRedisWatcherTest.java to use local IP address instead of remote IP address for testing. * Update LettuceRedisWatcherTest.java to simplify watcher initialization and update logic. * Refactor Redis connection initialization and add support for Redis Cluster. - Add Redis URI constants. - Update Redis connection initialization in LettuceRedisWatcher. - Add constructors for different Redis connection types. - Add support for Redis Cluster in LettuceRedisWatcher. - Update LettuceRedisWatcherTest to test Redis Cluster connection. * Update LettuceRedisWatcherTest.java to use localhost as the redis server instead of the specific IP address. * Update node version from 16 to 18 in maven-ci.yml and add new semantic.yml. * Refactor semantic.yml to disable validation for the PR title and all the commits. * Refactor LettuceRedisWatcher.java and LettuceRedisWatcherTest.java --- .../watcher/lettuce/LettuceRedisWatcher.java | 14 ++++++++------ .../org/casbin/test/LettuceRedisWatcherTest.java | 4 +--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java b/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java index 2776049..7ac0f4d 100644 --- a/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java +++ b/src/main/java/org/casbin/watcher/lettuce/LettuceRedisWatcher.java @@ -89,7 +89,11 @@ public void update() { if (statefulRedisPubSubConnection.isOpen()) { String msg = "Casbin policy has a new version from redis watcher: ".concat(this.localId); statefulRedisPubSubConnection.async().publish(this.redisChannelName, msg); + + Thread.sleep(100); } + } catch (InterruptedException e) { + throw new RuntimeException("Publish error! The localId: " + this.localId, e); } } @@ -110,8 +114,6 @@ private void startSub() { * @return AbstractRedisClient */ private AbstractRedisClient getLettuceRedisClient(String host, Integer port, String nodes, String password, int timeout, String type) { - // todo default standalone ? - // type = StringUtils.isEmpty(type) ? WatcherConstant.LETTUCE_REDIS_TYPE_STANDALONE : type; if (StringUtils.isNotEmpty(type) && StringUtils.equalsAnyIgnoreCase(type, WatcherConstant.LETTUCE_REDIS_TYPE_STANDALONE, WatcherConstant.LETTUCE_REDIS_TYPE_CLUSTER)) { ClientResources clientResources = DefaultClientResources.builder() @@ -146,9 +148,9 @@ private AbstractRedisClient getLettuceRedisClient(String host, Integer port, Str // cluster TimeoutOptions timeoutOptions = TimeoutOptions.builder().fixedTimeout(Duration.of(timeout, ChronoUnit.SECONDS)).build(); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder() - .enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)) - .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT, ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS) - .adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)) + .enablePeriodicRefresh(Duration.ofMinutes(10)) + .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.values()) + .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30)) .build(); ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() .autoReconnect(true) @@ -163,7 +165,7 @@ private AbstractRedisClient getLettuceRedisClient(String host, Integer port, Str WatcherConstant.REDIS_URI_PREFIX.concat(nodes); logger.info("Redis Cluster Uri: {}", redisUri); List redisURIList = RedisClusterURIUtil.toRedisURIs(URI.create(redisUri)); - RedisClusterClient redisClusterClient = RedisClusterClient.create(clientResources, redisURIList); + RedisClusterClient redisClusterClient = RedisClusterClient.create(clientResources, redisURIList.get(0)); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } diff --git a/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java b/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java index 71ee15d..e3069ab 100644 --- a/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java +++ b/src/test/java/org/casbin/test/LettuceRedisWatcherTest.java @@ -25,7 +25,7 @@ public void initWatcher() { public void initClusterWatcher() { String redisTopic = "jcasbin-topic"; - // modify your cluster nodes + // modify your cluster nodes. any one of these nodes. this.lettuceRedisWatcher = new LettuceRedisWatcher("192.168.1.234:6380,192.168.1.234:6381,192.168.1.234:6382", redisTopic, 2000, "123456"); Enforcer enforcer = new Enforcer(); enforcer.setWatcher(this.lettuceRedisWatcher); @@ -35,7 +35,6 @@ public void initClusterWatcher() { public void testUpdate() throws InterruptedException { // this.initClusterWatcher(); this.lettuceRedisWatcher.update(); - Thread.sleep(100); } @Test @@ -44,7 +43,6 @@ public void testConsumerCallback() throws InterruptedException { // while (true) { this.lettuceRedisWatcher.setUpdateCallback((s) -> System.out.println(s)); this.lettuceRedisWatcher.update(); - Thread.sleep(100); // } }