Skip to content

Commit

Permalink
[server] factory node connection (#1273)
Browse files Browse the repository at this point in the history
* lock per factoryNode to allow parallel session creation requests to different nodes
* limit retries during abortCommunication
* XynaProperty shutdown.timeout.factorynodes
  • Loading branch information
LukasFey-GIP authored Nov 27, 2024
1 parent 4ff0b34 commit 569254d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 36 deletions.
68 changes: 49 additions & 19 deletions server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/CredentialsCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public Session(XynaPlainSessionCredentials creds, long age) {

private Map<String, XynaUserCredentials> credsPerNode = new ConcurrentHashMap<String, XynaUserCredentials>();
private Map<String, Session> sessionsPerNode = new ConcurrentHashMap<String, Session>();

private Map<String, Object> locksPerNode = new ConcurrentHashMap<String, Object>();

//deprecated nur für externe verwendung. es soll das singleton verwendet werden!
@Deprecated
public CredentialsCache() {
Expand All @@ -80,30 +81,59 @@ public static synchronized CredentialsCache getInstance() {
}
return instance;
}


private void ensureLockIsPresent(String nodeName) {
if (!locksPerNode.containsKey(nodeName)) {
synchronized (this) {
if (!locksPerNode.containsKey(nodeName)) {
locksPerNode.putIfAbsent(nodeName, new Object());
}
}
}
}

public synchronized XynaCredentials getCredentials(String nodeName, InfrastructureLinkProfile infrastructure) throws XFMG_NodeConnectException {
Session session = sessionsPerNode.get(nodeName);
if (session == null) {
XynaUserCredentials creds = credsPerNode.get(nodeName);
if (creds == null) {
creds = getXynaUserCredentials(nodeName);
credsPerNode.put(nodeName, creds);
public XynaCredentials getCredentials(String nodeName, InfrastructureLinkProfile infrastructure) throws XFMG_NodeConnectException {
ensureLockIsPresent(nodeName);
synchronized (locksPerNode.get(nodeName)) {
Session session = sessionsPerNode.get(nodeName);
if (session == null) {
XynaUserCredentials creds = credsPerNode.get(nodeName);
if (creds == null) {
creds = getXynaUserCredentials(nodeName);
credsPerNode.put(nodeName, creds);
}
SessionCredentials createdSession = infrastructure.createSession(creds.getUserName(), creds.getPassword());
session = new Session(new XynaPlainSessionCredentials(createdSession.getSessionId(), createdSession.getToken()),
System.currentTimeMillis());
sessionsPerNode.put(nodeName, session);
}
SessionCredentials createdSession = infrastructure.createSession(creds.getUserName(), creds.getPassword());
session = new Session(new XynaPlainSessionCredentials(createdSession.getSessionId(), createdSession.getToken()), System.currentTimeMillis());
sessionsPerNode.put(nodeName, session);
return session.creds;
}
}

public XynaCredentials getCredentialsIfPresent(String nodeName) {
ensureLockIsPresent(nodeName);
synchronized(locksPerNode.get(nodeName)) {
Session session = sessionsPerNode.get(nodeName);
if(session == null) {
return null;
}
return session.creds;
}
return session.creds;
}


public synchronized void clearSession(String nodeName) {
Session session = sessionsPerNode.get(nodeName);
if (session != null && System.currentTimeMillis() - session.age >= minOffsetForSessionDeletion.getMillis()) {
//nicht die session entfernen, wenn sie gerade erst neu gemacht wurde (das passiert, wenn mehrere threads grob gleichzeitig einen fehler sehen)
//TODO schöner wäre es, wenn stattdessen alle interlink-verwender einfach konsistenz die session-fehler behandeln, d.h. wenn das alles über
//einen gemeinsamen codepfad gehen würde
sessionsPerNode.remove(nodeName);
public void clearSession(String nodeName) {
ensureLockIsPresent(nodeName);
synchronized (locksPerNode.get(nodeName)) {
Session session = sessionsPerNode.get(nodeName);
if (session != null && System.currentTimeMillis() - session.age >= minOffsetForSessionDeletion.getMillis()) {
//nicht die session entfernen, wenn sie gerade erst neu gemacht wurde (das passiert, wenn mehrere threads grob gleichzeitig einen fehler sehen)
//TODO schöner wäre es, wenn stattdessen alle interlink-verwender einfach konsistenz die session-fehler behandeln, d.h. wenn das alles über
//einen gemeinsamen codepfad gehen würde
sessionsPerNode.remove(nodeName);
}
}
}

Expand Down
34 changes: 30 additions & 4 deletions server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/NodeManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.gip.xyna.FunctionGroup;
import com.gip.xyna.FutureExecution;
Expand Down Expand Up @@ -149,13 +155,14 @@ private void initPropertiesAndStorables() {
@Override
protected void shutdown() throws XynaException {
logger.debug("shutdown");

long timeoutMillis = XynaProperty.SHUTDOWN_ABORT_COMMUNICATION_TIMEOUT.get().getDurationInMillis();
for (FactoryNodeCaller factoryNode : factoryNodeCaller.values()) {
AbortCommunicationCaller caller = new AbortCommunicationCaller();
try {
factoryNode.getRemoteOrderExecution().abortCommunication();
} catch (XFMG_NodeConnectException e) {
caller.abortCommunication(factoryNode).get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Exception during shutdown of communication with " + factoryNode.getNodeName() + ". " + e.getMessage());
logger.warn("Exception during shutdown of communication with " + factoryNode.getNodeName() + ". ", e);
}
}
}
Expand All @@ -164,6 +171,25 @@ protected void shutdown() throws XynaException {
clusterNodes.clear();
interlinkSearchDispatcher.shutDownInternally();
}


private class AbortCommunicationCaller {
private ExecutorService executor = Executors.newSingleThreadExecutor();

public Future<Void> abortCommunication(FactoryNodeCaller factoryNode) {
return executor.submit(() -> {
try {
factoryNode.getRemoteOrderExecution().abortCommunication();
return null;
} catch(XFMG_NodeConnectException e) {
if (logger.isWarnEnabled()) {
logger.warn("Exception during shutdown of communication with " + factoryNode.getNodeName() + ". " + e.getMessage());
}
return null;
}
});
}
}

/**
* Instanziiert einen Remote Access vom angegebenen type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import com.gip.xyna.xfmg.xfctrl.nodemgmt.InterFactoryLink.InterFactoryLinkProfileIdentifier;
import com.gip.xyna.xfmg.xfctrl.nodemgmt.RemoteData;
import com.gip.xyna.xfmg.xfctrl.nodemgmt.remotecall.RemoteOrderExecutionInterface.TransactionMode;
import com.gip.xyna.xfmg.xopctrl.usermanagement.XynaCredentials;
import com.gip.xyna.xmcp.OrderExecutionResponse;
import com.gip.xyna.xmcp.RemoteCallXynaOrderCreationParameter;
import com.gip.xyna.xmcp.RemoteXynaOrderCreationParameter;

public class RemoteOrderExecution {

Expand Down Expand Up @@ -105,9 +105,12 @@ public List<RemoteData> execute() throws XFMG_NodeConnectException {
}

public void abortCommunication() throws XFMG_NodeConnectException{
executeRemoteCommand(new RemoteCommand<Void>() {
executeRemoteCommandNoRetry(new RemoteCommand<Void>() {
public Void execute() throws XFMG_NodeConnectException {
orderExecution.abortCommunication(credentials.getCredentials(nodeName, infrastructure), identifier);
XynaCredentials creds = credentials.getCredentialsIfPresent(nodeName);
if (creds != null) {
orderExecution.abortCommunication(creds, identifier);
}
return null;
}
});
Expand Down Expand Up @@ -165,19 +168,11 @@ public boolean checkConnectivity() {

private <O> O executeRemoteCommand(RemoteCommand<O> command) throws XFMG_NodeConnectException {
try {
O output = command.execute();
connected = true;
logger.debug("connected to " + nodeName);
lastConnectException = null;
return output;
return executeRemoteCommandNoRetry(command);
} catch (XFMG_NodeConnectException e) {
if (checkConnectivity()) {
try {
O output = command.execute();
connected = true;
logger.debug("connected to " + nodeName);
lastConnectException = null;
return output;
return executeRemoteCommandNoRetry(command);
} catch (XFMG_NodeConnectException ee) {
connected = false;
logger.debug("disconnected from " + nodeName);
Expand All @@ -191,6 +186,14 @@ private <O> O executeRemoteCommand(RemoteCommand<O> command) throws XFMG_NodeCon
}
}

private <O> O executeRemoteCommandNoRetry(RemoteCommand<O> command) throws XFMG_NodeConnectException {
O output = command.execute();
connected = true;
logger.debug("connected to " + nodeName);
lastConnectException = null;
return output;
}


private interface RemoteCommand<O> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ public interface XynaProperty {
public static final XynaPropertyInt RMI_IL_SOCKET_COMPRESSION_BUFFERSIZE =
new XynaPropertyInt("xyna.rmi.interlink.ssl.compression.buffersize", 1024*128);

public static final XynaPropertyDuration SHUTDOWN_ABORT_COMMUNICATION_TIMEOUT = new XynaPropertyDuration("shutdown.timeout.factorynodes", "10 s")
.setDefaultDocumentation(DocumentationLanguage.EN, "Time to wait for each remote node to abort communication during shutdown")
.setDefaultDocumentation(DocumentationLanguage.DE, "Solange pro FactoryNode warten, bis die Kommunikation beendet ist");

/**
* Path to and including grep (for logScanning) Examples: "/bin/grep" "/usr/xpg4/bin/grep"
Expand Down

0 comments on commit 569254d

Please sign in to comment.