diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index 8378a47ceddfe..e218bea8000c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -120,12 +120,12 @@ private String formatTokenId(TokenIdent id) { /** * Access to currentKey is protected by this object lock */ - private volatile DelegationKey currentKey; + private DelegationKey currentKey; - private final long keyUpdateInterval; - private final long tokenMaxLifetime; - private final long tokenRemoverScanInterval; - private final long tokenRenewInterval; + private long keyUpdateInterval; + private long tokenMaxLifetime; + private long tokenRemoverScanInterval; + private long tokenRenewInterval; /** * Whether to store a token's tracking ID in its TokenInformation. * Can be overridden by a subclass. @@ -491,18 +491,17 @@ private synchronized void removeExpiredKeys() { } @Override - protected byte[] createPassword(TokenIdent identifier) { + protected synchronized byte[] createPassword(TokenIdent identifier) { int sequenceNum; long now = Time.now(); sequenceNum = incrementDelegationTokenSeqNum(); identifier.setIssueDate(now); identifier.setMaxDate(now + tokenMaxLifetime); - DelegationKey delegationCurrentKey = currentKey; - identifier.setMasterKeyId(delegationCurrentKey.getKeyId()); + identifier.setMasterKeyId(currentKey.getKeyId()); identifier.setSequenceNumber(sequenceNum); LOG.info("Creating password for identifier: " + formatTokenId(identifier) - + ", currentKey: " + delegationCurrentKey.getKeyId()); - byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey()); + + ", currentKey: " + currentKey.getKeyId()); + byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)); try { @@ -527,6 +526,7 @@ protected byte[] createPassword(TokenIdent identifier) { */ protected DelegationTokenInformation checkToken(TokenIdent identifier) throws InvalidToken { + assert Thread.holdsLock(this); DelegationTokenInformation info = getTokenInfo(identifier); String err; if (info == null) { @@ -546,7 +546,7 @@ protected DelegationTokenInformation checkToken(TokenIdent identifier) } @Override - public byte[] retrievePassword(TokenIdent identifier) + public synchronized byte[] retrievePassword(TokenIdent identifier) throws InvalidToken { return checkToken(identifier).getPassword(); } @@ -558,7 +558,7 @@ protected String getTrackingIdIfEnabled(TokenIdent ident) { return null; } - public String getTokenTrackingId(TokenIdent identifier) { + public synchronized String getTokenTrackingId(TokenIdent identifier) { DelegationTokenInformation info = getTokenInfo(identifier); if (info == null) { return null; @@ -572,7 +572,7 @@ public String getTokenTrackingId(TokenIdent identifier) { * @param password Password in the token. * @throws InvalidToken InvalidToken. */ - public void verifyToken(TokenIdent identifier, byte[] password) + public synchronized void verifyToken(TokenIdent identifier, byte[] password) throws InvalidToken { byte[] storedPassword = retrievePassword(identifier); if (!MessageDigest.isEqual(password, storedPassword)) { @@ -589,7 +589,7 @@ public void verifyToken(TokenIdent identifier, byte[] password) * @throws InvalidToken if the token is invalid * @throws AccessControlException if the user can't renew token */ - public long renewToken(Token token, + public synchronized long renewToken(Token token, String renewer) throws InvalidToken, IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -651,7 +651,7 @@ public long renewToken(Token token, * @throws InvalidToken for invalid token * @throws AccessControlException if the user isn't allowed to cancel */ - public TokenIdent cancelToken(Token token, + public synchronized TokenIdent cancelToken(Token token, String canceller) throws IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index 0642d3d581066..dd2ab3ff26f76 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import org.apache.curator.RetryPolicy; @@ -153,7 +152,7 @@ protected static CuratorFramework getCurator() { private final int seqNumBatchSize; private int currentSeqNum; private int currentMaxSeqNum; - private final ReentrantLock currentSeqNumLock; + private final boolean isTokenWatcherEnabled; public ZKDelegationTokenSecretManager(Configuration conf) { @@ -169,8 +168,7 @@ public ZKDelegationTokenSecretManager(Configuration conf) { ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT); isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT); - this.currentSeqNumLock = new ReentrantLock(true); - + String workPath = conf.get(ZK_DTSM_ZNODE_WORKING_PATH, ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT); String nameSpace = workPath + "/" + ZK_DTSM_NAMESPACE; if (CURATOR_TL.get() != null) { @@ -506,28 +504,24 @@ protected int incrementDelegationTokenSeqNum() { // The secret manager will keep a local range of seq num which won't be // seen by peers, so only when the range is exhausted it will ask zk for // another range again - try { - this.currentSeqNumLock.lock(); - if (currentSeqNum >= currentMaxSeqNum) { - try { - // after a successful batch request, we can get the range starting point - currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); - currentMaxSeqNum = currentSeqNum + seqNumBatchSize; - LOG.info("Fetched new range of seq num, from {} to {} ", - currentSeqNum+1, currentMaxSeqNum); - } catch (InterruptedException e) { - // The ExpirationThread is just finishing.. so dont do anything.. - LOG.debug( - "Thread interrupted while performing token counter increment", e); - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new RuntimeException("Could not increment shared counter !!", e); - } + if (currentSeqNum >= currentMaxSeqNum) { + try { + // after a successful batch request, we can get the range starting point + currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize); + currentMaxSeqNum = currentSeqNum + seqNumBatchSize; + LOG.info("Fetched new range of seq num, from {} to {} ", + currentSeqNum+1, currentMaxSeqNum); + } catch (InterruptedException e) { + // The ExpirationThread is just finishing.. so dont do anything.. + LOG.debug( + "Thread interrupted while performing token counter increment", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException("Could not increment shared counter !!", e); } - return ++currentSeqNum; - } finally { - this.currentSeqNumLock.unlock(); } + + return ++currentSeqNum; } @Override