From 1fea71b7a63814ac4a067a6a066956dbfaa771be Mon Sep 17 00:00:00 2001 From: fuchaohong Date: Tue, 23 Jul 2024 11:55:46 +0800 Subject: [PATCH 1/5] RBF: RouterObserverReadProxyProvider should perform an msync before executing the first read. --- .../ha/RouterObserverReadProxyProvider.java | 34 ++++++++++++- .../router/TestObserverWithRouter.java | 50 +++++++++++++++++++ 2 files changed, 83 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java index 9707a2a91c5c1..2af5feb2eb574 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java @@ -82,6 +82,15 @@ public class RouterObserverReadProxyProvider extends AbstractNNFailoverProxyP */ private volatile long lastMsyncTimeMs = -1; + /** + * A client using RouterObserverReadProxyProvider should first sync with the + * Active NameNode on startup. This ensures that the client reads data which + * is consistent with the state of the world as of the time of its + * instantiation. This variable will be true after this initial sync has + * been performed. + */ + private volatile boolean msynced = false; + public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class xface, HAProxyFactory factory) { this(conf, uri, xface, factory, new IPFailoverProxyProvider<>(conf, uri, xface, factory)); @@ -154,6 +163,23 @@ private ClientProtocol getProxyAsClientProtocol(T proxy) { return (ClientProtocol) proxy; } + /** + * This will call {@link ClientProtocol#msync()} on the active NameNode + * (via the {@link #innerProxy}) to initialize the state of this client. + * Calling it multiple times is a no-op; only the first will perform an + * msync. + * + * @see #msynced + */ + private synchronized void initializeMsync() throws IOException { + if (msynced) { + return; // No need for an msync + } + getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync(); + msynced = true; + lastMsyncTimeMs = Time.monotonicNow(); + } + /** * This will call {@link ClientProtocol#msync()} on the active NameNode * (via the {@link #innerProxy}) to update the state of this client, only @@ -209,7 +235,13 @@ public void close() throws IOException { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (observerReadEnabled && isRead(method)) { - autoMsyncIfNecessary(); + if (!msynced) { + // An msync() must first be performed to ensure that this client is + // up-to-date with the active's state. This will only be done once. + initializeMsync(); + } else { + autoMsyncIfNecessary(); + } } try { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 3f773efd63dc0..85c91584a9a72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -815,6 +815,56 @@ public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSett initialLengthOfRootListing + 10, rootFolderAfterMkdir.length); } + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testAutoMsyncDefault(ConfigSetting configSetting) throws Exception { + Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); + fileSystem = routerContext.getFileSystem(clientConfiguration); + + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + // Send read requests + int numListings = 15; + for (int i = 0; i < numListings; i++) { + fileSystem.listFiles(path, false); + } + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + // First read goes to active. + assertEquals("Calls sent to the active", 1, rpcCountForActive); + // The rest of the reads are sent to the observer. + assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: + // An msync is sent to each active namenode for each read. + // Total msyncs will be (1 * num_of_nameservices). + assertEquals("Msyncs sent to the active namenodes", + NUM_NAMESERVICES * 1, rpcCountForActive); + // All reads should be sent of the observer. + assertEquals("Reads sent to observer", numListings, rpcCountForObserver); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + } + @EnumSource(ConfigSetting.class) @ParameterizedTest public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception { From 76eec42b477e5bc5e85acf3d14e1c114f7e4ee1d Mon Sep 17 00:00:00 2001 From: fuchaohong Date: Tue, 23 Jul 2024 17:32:20 +0800 Subject: [PATCH 2/5] triggle compile From c9b3dfd4c48cb693ebdcbf0654cee1deeadc564d Mon Sep 17 00:00:00 2001 From: fuchaohong <1783129294@qq.com> Date: Wed, 24 Jul 2024 17:20:51 +0800 Subject: [PATCH 3/5] Update TestObserverWithRouter.java --- .../hdfs/server/federation/router/TestObserverWithRouter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 85c91584a9a72..745159ffe2da5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -853,8 +853,7 @@ public void testAutoMsyncDefault(ConfigSetting configSetting) throws Exception { break; case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: - // An msync is sent to each active namenode for each read. - // Total msyncs will be (1 * num_of_nameservices). + // An msync is sent to each active namenode. assertEquals("Msyncs sent to the active namenodes", NUM_NAMESERVICES * 1, rpcCountForActive); // All reads should be sent of the observer. From 9fba56d69b922e8e1bd9d7a2fc5482899a6ea4dd Mon Sep 17 00:00:00 2001 From: fuchaohong Date: Thu, 7 Nov 2024 14:55:59 +0800 Subject: [PATCH 4/5] Fix UT. --- .../router/TestObserverWithRouter.java | 110 +++++++++++++----- 1 file changed, 82 insertions(+), 28 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 85c91584a9a72..c99548e876c4a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -226,8 +226,15 @@ public void internalTestObserverRead() long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // Create and complete calls should be sent to active - assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + } else { + // Create, complete and 2 msyncs calls should be sent to active + assertEquals("Four calls should be sent to active", 4, rpcCountForActive); + } long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); @@ -258,8 +265,14 @@ public void testObserverReadWithoutFederatedStatePropagation(ConfigSetting confi long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // Create, complete and getBlockLocations calls should be sent to active - assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // Create, complete and getBlockLocations calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + } else { + // Create, complete, 2 msyncs and getBlockLocations calls should be sent to active + assertEquals("Five calls should be sent to active", 5, rpcCountForActive); + } long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); @@ -283,8 +296,14 @@ public void testDisablingObserverReadUsingNameserviceOverride(ConfigSetting conf long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // Create, complete and read calls should be sent to active - assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // Create, complete and read calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + } else { + // Create, complete, msync and read calls should be sent to active + assertEquals("Four calls should be sent to active", 4, rpcCountForActive); + } long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); @@ -310,9 +329,14 @@ public void testReadWhenObserverIsDown(ConfigSetting configSetting) throws Excep long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // Create, complete and getBlockLocation calls should be sent to active - assertEquals("Three calls should be sent to active", 3, - rpcCountForActive); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // Create, complete and getBlockLocation calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + } else { + // Create, complete, 2 msyncs and getBlockLocation calls should be sent to active + assertEquals("Five calls should be sent to active", 5, rpcCountForActive); + } long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); @@ -340,9 +364,15 @@ public void testMultipleObserver(ConfigSetting configSetting) throws Exception { long expectedActiveRpc = 2; long expectedObserverRpc = 1; - // Create and complete calls should be sent to active - assertEquals("Two calls should be sent to active", - expectedActiveRpc, rpcCountForActive); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", expectedActiveRpc, rpcCountForActive); + } else { + // Create, complete and 2 msyncs calls should be sent to active + expectedActiveRpc += 2; + assertEquals("Four calls should be sent to active", expectedActiveRpc, rpcCountForActive); + } long rpcCountForObserver = routerContext.getRouter() .getRpcServer().getRPCMetrics().getObserverProxyOps(); @@ -476,11 +506,14 @@ public void testUnavailableObserverNN(ConfigSetting configSetting) throws Except long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // Create, complete and getBlockLocations - // calls should be sent to active. - assertEquals("Three calls should be send to active", - 3, rpcCountForActive); - + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // Create, complete and getBlockLocations calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + } else { + // Create, complete, 2 msyncs and getBlockLocations calls should be sent to active + assertEquals("Five calls should be sent to active", 5, rpcCountForActive); + } boolean hasUnavailable = false; for(String ns : cluster.getNameservices()) { @@ -540,13 +573,20 @@ public void testSingleRead(ConfigSetting configSetting) throws Exception { rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // getListingCall sent to active. - assertEquals("Only one call should be sent to active", 1, rpcCountForActive); - rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); - // getList call should be sent to observer - assertEquals("No calls should be sent to observer", 0, rpcCountForObserver); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // getList call sent to active. + assertEquals("Only one call should be sent to active", 1, rpcCountForActive); + // No call should send to observer + assertEquals("No calls should be sent to observer", 0, rpcCountForObserver); + } else { + // 2 msyncs calls should be sent to active + assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + // getList call should be sent to observer + assertEquals("One call should be sent to observer", 1, rpcCountForObserver); + } } @Test @@ -735,10 +775,18 @@ public void testSharedStateInRouterStateIdContext(ConfigSetting configSetting) t long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); - // First list status goes to active - assertEquals("One call should be sent to active", 1, rpcCountForActive); - // Last two listStatuses go to observer. - assertEquals("Two calls should be sent to observer", 2, rpcCountForObserver); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + // First list status goes to active + assertEquals("One call should be sent to active", 1, rpcCountForActive); + // Last two listStatuses go to observer. + assertEquals("Two calls should be sent to observer", 2, rpcCountForObserver); + } else { + // 2 msyncs status go to active + assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + // All listStatuses go to observer. + assertEquals("Three calls should be sent to observer", 3, rpcCountForObserver); + } Assertions.assertSame(namespaceStateId1, namespaceStateId2, "The same object should be used in the shared RouterStateIdContext"); @@ -770,8 +818,14 @@ public void testRouterStateIdContextCleanup(ConfigSetting configSetting) throws Thread.sleep(recordExpiry * 2); List namespace2 = routerStateIdContext.getNamespaces(); - assertEquals(1, namespace1.size()); - assertEquals("ns0", namespace1.get(0)); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + assertEquals(1, namespace1.size()); + assertEquals("ns0", namespace1.get(0)); + } else { + // 2 msyncs status go to active + assertEquals(2, namespace1.size()); + } assertTrue(namespace2.isEmpty()); } From 97eddca3289283e7f07f906ae9df7b77f565da55 Mon Sep 17 00:00:00 2001 From: fuchaohong Date: Tue, 10 Dec 2024 16:12:15 +0800 Subject: [PATCH 5/5] fix testRestartingNamenodeWithStateIDContextDisabled. --- .../server/federation/router/TestObserverWithRouter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 5d313573d0b32..2dbc1724c8374 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -1141,7 +1141,10 @@ public void testRestartingNamenodeWithStateIDContextDisabled(ConfigSetting confi Configuration conf = getConfToEnableObserverReads(configSetting); conf.setBoolean("fs.hdfs.impl.disable.cache", true); FileSystem fileSystem2 = routerContext.getFileSystem(conf); - fileSystem2.msync(); + if (fileSystem.getConf().getBoolean( + HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, false)){ + fileSystem2.msync(); + } fileSystem2.open(path).close(); long observerCount2 = routerContext.getRouter().getRpcServer()