Skip to content

Commit

Permalink
[HOPS-1654] Reconnect to NDB after network failure
Browse files Browse the repository at this point in the history
  • Loading branch information
smkniazi authored Nov 23, 2021
1 parent c6db718 commit 183abcc
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ public Object performTask() throws IOException {
if (localTx) {
connector.beginTransaction();
}
Exception exception = null;
try {

user = userDataAccess.getUser(lockRowName);
Expand All @@ -296,11 +297,12 @@ public Object performTask() throws IOException {
connector.commit();
}
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
lockUser = user;
Expand Down Expand Up @@ -350,7 +352,7 @@ public void clear() {

private User getUserFromDB(final String userName, final Integer userId)
throws IOException {
return (User) new LightWeightRequestHandler(UsersGroupsCache.UsersOperationsType.GET_USER) {
return (User) new LightWeightRequestHandler(UsersOperationsType.GET_USER) {
@Override
public Object performTask() throws IOException {
LOG.debug("Get User: " + userName + " from DB.");
Expand All @@ -360,6 +362,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugSharedLock(connector);
User user = userName == null ? userDataAccess.getUser(userId) :
Expand All @@ -371,11 +374,12 @@ public Object performTask() throws IOException {

return user;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -384,7 +388,7 @@ public Object performTask() throws IOException {

private User addUserToDB(final String userName) throws IOException {

return (User) new LightWeightRequestHandler(UsersGroupsCache.UsersOperationsType.ADD_USER) {
return (User) new LightWeightRequestHandler(UsersOperationsType.ADD_USER) {
@Override
public Object performTask() throws IOException {
LOG.debug("Add User: " + userName + " to DB.");
Expand All @@ -394,6 +398,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugExclusiveLock(connector);
User user = userDataAccess.addUser(userName);
Expand All @@ -403,11 +408,12 @@ public Object performTask() throws IOException {
}
return user;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -417,7 +423,7 @@ public Object performTask() throws IOException {
@VisibleForTesting
protected void removeUserFromDB(final Integer userId) throws IOException {

new LightWeightRequestHandler(UsersGroupsCache.UsersOperationsType.REMOVE_USER) {
new LightWeightRequestHandler(UsersOperationsType.REMOVE_USER) {
@Override
public Object performTask() throws IOException {
LOG.debug("Remove UserID: " + userId + " from DB.");
Expand All @@ -427,6 +433,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugExclusiveLock(connector);
userDataAccess.removeUser(userId);
Expand All @@ -437,11 +444,12 @@ public Object performTask() throws IOException {

return null;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -451,7 +459,7 @@ public Object performTask() throws IOException {
private Group getGroupFromDB(final String groupName, final Integer groupId) throws IOException {

return (Group) new LightWeightRequestHandler(
UsersGroupsCache.UsersOperationsType.GET_GROUP) {
UsersOperationsType.GET_GROUP) {
@Override
public Object performTask() throws IOException {
LOG.debug("Get GroupName: " + groupName + " GroupID: " + groupId + " from DB.");
Expand All @@ -461,6 +469,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugSharedLock(connector);
Group group = groupName == null ? groupDataAccess.getGroup(groupId) :
Expand All @@ -472,11 +481,12 @@ public Object performTask() throws IOException {

return group;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -485,7 +495,7 @@ public Object performTask() throws IOException {

private Group addGroupToDB(final String groupName) throws IOException {

return (Group) new LightWeightRequestHandler(UsersGroupsCache.UsersOperationsType.ADD_GROUP) {
return (Group) new LightWeightRequestHandler(UsersOperationsType.ADD_GROUP) {
@Override
public Object performTask() throws IOException {
LOG.debug("Add Group: " + groupName + " to DB.");
Expand All @@ -496,6 +506,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugExclusiveLock(connector);
Group group = groupDataAccess.addGroup(groupName);
Expand All @@ -506,11 +517,12 @@ public Object performTask() throws IOException {

return group;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -520,7 +532,7 @@ public Object performTask() throws IOException {
@VisibleForTesting
protected void removeGroupFromDB(final Integer groupId) throws IOException {

new LightWeightRequestHandler(UsersGroupsCache.UsersOperationsType.REMOVE_GROUP) {
new LightWeightRequestHandler(UsersOperationsType.REMOVE_GROUP) {
@Override
public Object performTask() throws IOException {
LOG.debug("Remove GroupID: " + groupId + " from DB.");
Expand All @@ -531,6 +543,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugExclusiveLock(connector);
groupDataAccess.removeGroup(groupId);
Expand All @@ -541,11 +554,12 @@ public Object performTask() throws IOException {

return null;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -555,7 +569,7 @@ public Object performTask() throws IOException {
private void removeUserFromGroupDB(final Integer userId, final Integer groupId) throws
IOException {

new LightWeightRequestHandler(UsersGroupsCache.UsersOperationsType.REMOVE_USER_FROM_GROUPS) {
new LightWeightRequestHandler(UsersOperationsType.REMOVE_USER_FROM_GROUPS) {
@Override
public Object performTask() throws IOException {
LOG.debug("Removing user from group. UserID: " + userId + " GropuID: " + groupId + ".");
Expand All @@ -566,6 +580,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugExclusiveLock(connector);
userGroupDataAccess.removeUserFromGroup(userId, groupId);
Expand All @@ -576,11 +591,12 @@ public Object performTask() throws IOException {

return null;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -601,6 +617,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugExclusiveLock(connector);
userGroupDataAccess.addUserToGroup(userId, groupId);
Expand All @@ -611,6 +628,7 @@ public Object performTask() throws IOException {

return null;
} catch (IOException e) {
exception = e;
fail = true;

if (e instanceof ForeignKeyConstraintViolationException) {
Expand All @@ -632,7 +650,7 @@ public Object performTask() throws IOException {
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand All @@ -643,7 +661,7 @@ private List<Group> getUserGroupsFromDB(final String userName, final Integer use
IOException {

return (List<Group>) new LightWeightRequestHandler
(UsersGroupsCache.UsersOperationsType.GET_USER_GROUPS) {
(UsersOperationsType.GET_USER_GROUPS) {
@Override
public Object performTask() throws IOException {
List<Group> result = null;
Expand All @@ -653,6 +671,7 @@ public Object performTask() throws IOException {
connector.beginTransaction();
}

Exception exception = null;
try {
ugSharedLock(connector);
User user = userId == null ? userDataAccess.getUser(userName) :
Expand All @@ -667,11 +686,12 @@ public Object performTask() throws IOException {

return result;
} catch (IOException e) {
exception = e;
fail = true;
throw e;
} finally {
if (fail && localTx) {
connector.rollback();
connector.rollback(exception);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4933,6 +4933,7 @@ private class ReplicationMonitor implements Runnable {
public void run() {
while (namesystem.isRunning()) {
try {
Thread.sleep(replicationRecheckInterval);
if (namesystem.isLeader()) {
LOG.debug("Running replication monitor");
// Process replication work only when active NN is out of safe mode.
Expand All @@ -4945,7 +4946,6 @@ public void run() {
updateState();
LOG.debug("Namesystem is not leader: will not run replication monitor");
}
Thread.sleep(replicationRecheckInterval);
} catch (Throwable t) {
if(t instanceof TransientStorageException){
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8819,6 +8819,7 @@ public void run() {
while (fsRunning && shouldCacheCleanerRun) {
try {
if (isLeader()) {
Thread.sleep(1000);
long lastDeletedEpochSec = HdfsVariables.getRetryCacheCleanerEpoch();
long toBeDeletedEpochSec = lastDeletedEpochSec + 1L;
if (toBeDeletedEpochSec < ((timer.now() - entryExpiryMillis) / 1000)) {
Expand All @@ -8832,7 +8833,6 @@ public void run() {
continue;
}
}
Thread.sleep(1000);
} catch (Exception e) {
if (e instanceof InterruptedException) {
cleanerLog.warn("RetryCacheCleaner Interrupted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void run() {
"sucessfulTx " + sucessfulTx + " failedTx " + failedtx +
" time period " + context.time_period + " " + te.getMessage(), te);
// transaction failed
sleepDuration = 0;
sleepDuration = 50;
txFailed = true;
failedtx++;
} catch (StorageException e) {
Expand All @@ -119,7 +119,7 @@ public void run() {
"sucessfulTx " + sucessfulTx + " failedTx " + failedtx +
" time period " + context.time_period + " " + e.getMessage(), e);
// transaction failed
sleepDuration = 0;
sleepDuration = 1000;
txFailed = true;
failedtx++;
} catch (LeaderElectionForceAbort fa) {
Expand Down

0 comments on commit 183abcc

Please sign in to comment.