From 569254dd837eda070cbb6a8d49f8b58c0871291d Mon Sep 17 00:00:00 2001 From: Lukas Fey <107985579+LukasFey-GIP@users.noreply.github.com> Date: Wed, 27 Nov 2024 07:44:53 +0100 Subject: [PATCH] [server] factory node connection (#1273) * lock per factoryNode to allow parallel session creation requests to different nodes * limit retries during abortCommunication * XynaProperty shutdown.timeout.factorynodes --- .../xfctrl/nodemgmt/CredentialsCache.java | 68 +++++++++++++------ .../xfmg/xfctrl/nodemgmt/NodeManagement.java | 34 ++++++++-- .../remotecall/RemoteOrderExecution.java | 29 ++++---- .../xfmg/xods/configuration/XynaProperty.java | 3 + 4 files changed, 98 insertions(+), 36 deletions(-) diff --git a/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/CredentialsCache.java b/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/CredentialsCache.java index 091789cf5..d351294ed 100644 --- a/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/CredentialsCache.java +++ b/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/CredentialsCache.java @@ -64,7 +64,8 @@ public Session(XynaPlainSessionCredentials creds, long age) { private Map credsPerNode = new ConcurrentHashMap(); private Map sessionsPerNode = new ConcurrentHashMap(); - + private Map locksPerNode = new ConcurrentHashMap(); + //deprecated nur für externe verwendung. es soll das singleton verwendet werden! @Deprecated public CredentialsCache() { @@ -80,30 +81,59 @@ public static synchronized CredentialsCache getInstance() { } return instance; } + + + private void ensureLockIsPresent(String nodeName) { + if (!locksPerNode.containsKey(nodeName)) { + synchronized (this) { + if (!locksPerNode.containsKey(nodeName)) { + locksPerNode.putIfAbsent(nodeName, new Object()); + } + } + } + } - public synchronized XynaCredentials getCredentials(String nodeName, InfrastructureLinkProfile infrastructure) throws XFMG_NodeConnectException { - Session session = sessionsPerNode.get(nodeName); - if (session == null) { - XynaUserCredentials creds = credsPerNode.get(nodeName); - if (creds == null) { - creds = getXynaUserCredentials(nodeName); - credsPerNode.put(nodeName, creds); + public XynaCredentials getCredentials(String nodeName, InfrastructureLinkProfile infrastructure) throws XFMG_NodeConnectException { + ensureLockIsPresent(nodeName); + synchronized (locksPerNode.get(nodeName)) { + Session session = sessionsPerNode.get(nodeName); + if (session == null) { + XynaUserCredentials creds = credsPerNode.get(nodeName); + if (creds == null) { + creds = getXynaUserCredentials(nodeName); + credsPerNode.put(nodeName, creds); + } + SessionCredentials createdSession = infrastructure.createSession(creds.getUserName(), creds.getPassword()); + session = new Session(new XynaPlainSessionCredentials(createdSession.getSessionId(), createdSession.getToken()), + System.currentTimeMillis()); + sessionsPerNode.put(nodeName, session); } - SessionCredentials createdSession = infrastructure.createSession(creds.getUserName(), creds.getPassword()); - session = new Session(new XynaPlainSessionCredentials(createdSession.getSessionId(), createdSession.getToken()), System.currentTimeMillis()); - sessionsPerNode.put(nodeName, session); + return session.creds; + } + } + + public XynaCredentials getCredentialsIfPresent(String nodeName) { + ensureLockIsPresent(nodeName); + synchronized(locksPerNode.get(nodeName)) { + Session session = sessionsPerNode.get(nodeName); + if(session == null) { + return null; + } + return session.creds; } - return session.creds; } - public synchronized void clearSession(String nodeName) { - Session session = sessionsPerNode.get(nodeName); - if (session != null && System.currentTimeMillis() - session.age >= minOffsetForSessionDeletion.getMillis()) { - //nicht die session entfernen, wenn sie gerade erst neu gemacht wurde (das passiert, wenn mehrere threads grob gleichzeitig einen fehler sehen) - //TODO schöner wäre es, wenn stattdessen alle interlink-verwender einfach konsistenz die session-fehler behandeln, d.h. wenn das alles über - //einen gemeinsamen codepfad gehen würde - sessionsPerNode.remove(nodeName); + public void clearSession(String nodeName) { + ensureLockIsPresent(nodeName); + synchronized (locksPerNode.get(nodeName)) { + Session session = sessionsPerNode.get(nodeName); + if (session != null && System.currentTimeMillis() - session.age >= minOffsetForSessionDeletion.getMillis()) { + //nicht die session entfernen, wenn sie gerade erst neu gemacht wurde (das passiert, wenn mehrere threads grob gleichzeitig einen fehler sehen) + //TODO schöner wäre es, wenn stattdessen alle interlink-verwender einfach konsistenz die session-fehler behandeln, d.h. wenn das alles über + //einen gemeinsamen codepfad gehen würde + sessionsPerNode.remove(nodeName); + } } } diff --git a/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/NodeManagement.java b/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/NodeManagement.java index 7ffea7c6b..552c83ea6 100644 --- a/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/NodeManagement.java +++ b/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/NodeManagement.java @@ -26,6 +26,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.gip.xyna.FunctionGroup; import com.gip.xyna.FutureExecution; @@ -149,13 +155,14 @@ private void initPropertiesAndStorables() { @Override protected void shutdown() throws XynaException { logger.debug("shutdown"); - + long timeoutMillis = XynaProperty.SHUTDOWN_ABORT_COMMUNICATION_TIMEOUT.get().getDurationInMillis(); for (FactoryNodeCaller factoryNode : factoryNodeCaller.values()) { + AbortCommunicationCaller caller = new AbortCommunicationCaller(); try { - factoryNode.getRemoteOrderExecution().abortCommunication(); - } catch (XFMG_NodeConnectException e) { + caller.abortCommunication(factoryNode).get(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { if (logger.isWarnEnabled()) { - logger.warn("Exception during shutdown of communication with " + factoryNode.getNodeName() + ". " + e.getMessage()); + logger.warn("Exception during shutdown of communication with " + factoryNode.getNodeName() + ". ", e); } } } @@ -164,6 +171,25 @@ protected void shutdown() throws XynaException { clusterNodes.clear(); interlinkSearchDispatcher.shutDownInternally(); } + + + private class AbortCommunicationCaller { + private ExecutorService executor = Executors.newSingleThreadExecutor(); + + public Future abortCommunication(FactoryNodeCaller factoryNode) { + return executor.submit(() -> { + try { + factoryNode.getRemoteOrderExecution().abortCommunication(); + return null; + } catch(XFMG_NodeConnectException e) { + if (logger.isWarnEnabled()) { + logger.warn("Exception during shutdown of communication with " + factoryNode.getNodeName() + ". " + e.getMessage()); + } + return null; + } + }); + } + } /** * Instanziiert einen Remote Access vom angegebenen type. diff --git a/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/remotecall/RemoteOrderExecution.java b/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/remotecall/RemoteOrderExecution.java index a37486406..3ec5a51d5 100644 --- a/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/remotecall/RemoteOrderExecution.java +++ b/server/src/com/gip/xyna/xfmg/xfctrl/nodemgmt/remotecall/RemoteOrderExecution.java @@ -32,9 +32,9 @@ import com.gip.xyna.xfmg.xfctrl.nodemgmt.InterFactoryLink.InterFactoryLinkProfileIdentifier; import com.gip.xyna.xfmg.xfctrl.nodemgmt.RemoteData; import com.gip.xyna.xfmg.xfctrl.nodemgmt.remotecall.RemoteOrderExecutionInterface.TransactionMode; +import com.gip.xyna.xfmg.xopctrl.usermanagement.XynaCredentials; import com.gip.xyna.xmcp.OrderExecutionResponse; import com.gip.xyna.xmcp.RemoteCallXynaOrderCreationParameter; -import com.gip.xyna.xmcp.RemoteXynaOrderCreationParameter; public class RemoteOrderExecution { @@ -105,9 +105,12 @@ public List execute() throws XFMG_NodeConnectException { } public void abortCommunication() throws XFMG_NodeConnectException{ - executeRemoteCommand(new RemoteCommand() { + executeRemoteCommandNoRetry(new RemoteCommand() { public Void execute() throws XFMG_NodeConnectException { - orderExecution.abortCommunication(credentials.getCredentials(nodeName, infrastructure), identifier); + XynaCredentials creds = credentials.getCredentialsIfPresent(nodeName); + if (creds != null) { + orderExecution.abortCommunication(creds, identifier); + } return null; } }); @@ -165,19 +168,11 @@ public boolean checkConnectivity() { private O executeRemoteCommand(RemoteCommand command) throws XFMG_NodeConnectException { try { - O output = command.execute(); - connected = true; - logger.debug("connected to " + nodeName); - lastConnectException = null; - return output; + return executeRemoteCommandNoRetry(command); } catch (XFMG_NodeConnectException e) { if (checkConnectivity()) { try { - O output = command.execute(); - connected = true; - logger.debug("connected to " + nodeName); - lastConnectException = null; - return output; + return executeRemoteCommandNoRetry(command); } catch (XFMG_NodeConnectException ee) { connected = false; logger.debug("disconnected from " + nodeName); @@ -191,6 +186,14 @@ private O executeRemoteCommand(RemoteCommand command) throws XFMG_NodeCon } } + private O executeRemoteCommandNoRetry(RemoteCommand command) throws XFMG_NodeConnectException { + O output = command.execute(); + connected = true; + logger.debug("connected to " + nodeName); + lastConnectException = null; + return output; + } + private interface RemoteCommand { diff --git a/server/src/com/gip/xyna/xfmg/xods/configuration/XynaProperty.java b/server/src/com/gip/xyna/xfmg/xods/configuration/XynaProperty.java index c70af44d3..85de545e1 100644 --- a/server/src/com/gip/xyna/xfmg/xods/configuration/XynaProperty.java +++ b/server/src/com/gip/xyna/xfmg/xods/configuration/XynaProperty.java @@ -297,6 +297,9 @@ public interface XynaProperty { public static final XynaPropertyInt RMI_IL_SOCKET_COMPRESSION_BUFFERSIZE = new XynaPropertyInt("xyna.rmi.interlink.ssl.compression.buffersize", 1024*128); + public static final XynaPropertyDuration SHUTDOWN_ABORT_COMMUNICATION_TIMEOUT = new XynaPropertyDuration("shutdown.timeout.factorynodes", "10 s") + .setDefaultDocumentation(DocumentationLanguage.EN, "Time to wait for each remote node to abort communication during shutdown") + .setDefaultDocumentation(DocumentationLanguage.DE, "Solange pro FactoryNode warten, bis die Kommunikation beendet ist"); /** * Path to and including grep (for logScanning) Examples: "/bin/grep" "/usr/xpg4/bin/grep"