Skip to content

Commit

Permalink
#363: work around MLLP producer timeout problem in CAmel
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Ohr committed Nov 8, 2021
1 parent c0a34c0 commit e2a7772
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.openehealth.ipf.platform.camel.ihe.mllp.core;

import lombok.experimental.Delegate;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.component.mina2.Mina2Producer;
import org.apache.camel.impl.DefaultProducer;
import org.apache.mina.core.service.IoConnector;
Expand All @@ -37,12 +39,14 @@ public class MllpProducer extends DefaultProducer {

// The reason for this interface is to convince the Delegate annotation to *not* delegate
// the stop method. Weird API, really.
private interface DoStop {
private interface CustomMethods {
@SuppressWarnings("unused")
void stop();
@SuppressWarnings("unused")
void process(Exchange exchange) throws Exception;
}

@Delegate(excludes = DoStop.class)
@Delegate(excludes = CustomMethods.class)
private final Mina2Producer producer;

MllpProducer(Mina2Producer producer) {
Expand All @@ -51,6 +55,26 @@ private interface DoStop {
this.producer = producer;
}

/**
* Mitigate CAMEL-17022 and close current session for any timeouts
*
* @param exchange exchange
*/
@Override
public void process(Exchange exchange) throws Exception {
try {
producer.process(exchange);
} catch (ExchangeTimedOutException e) {
closeCurrentSession(producer);
throw e;
} finally {
if (LOG.isDebugEnabled()) {
IoSession session = getField(producer, IoSession.class, "session");
LOG.debug("Used session {}", session);
}
}
}

@Override
public void stop() throws Exception {
super.stop();
Expand All @@ -63,16 +87,22 @@ public void stop() throws Exception {
*/
@Override
protected void doStop() throws Exception {
IoSession session = getField(producer, IoSession.class, "session");
if (session != null) {
invoke(producer, "closeSessionIfNeededAndAwaitCloseInHandler", IoSession.class, session);
}
closeCurrentSession(producer);
IoConnector connector = getField(producer, IoConnector.class, "connector");
// Do NOT wait indefinitely
connector.dispose(false);
super.doStop();
}

private static void closeCurrentSession(Mina2Producer producer) throws Exception {
IoSession session = getField(producer, IoSession.class, "session");
if (session != null) {
LOG.debug("Closing session {}", session);
invoke(producer, "closeSessionIfNeededAndAwaitCloseInHandler", IoSession.class, session);
LOG.debug("Closed session {}", session);
}
}

private static <T> T getField(Object target, Class<T> clazz, String name) throws NoSuchFieldException, IllegalAccessException {
Field field = target.getClass().getDeclaredField(name);
field.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package org.openehealth.ipf.platform.camel.ihe.mllp.iti21

import ca.uhn.hl7v2.AcknowledgmentCode
import org.apache.camel.builder.RouteBuilder
import org.openehealth.ipf.modules.hl7.message.MessageUtils
import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpComponent
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import static org.openehealth.ipf.platform.camel.hl7.HL7v2.ack

Expand All @@ -27,6 +30,8 @@ import static org.openehealth.ipf.platform.camel.hl7.HL7v2.ack
*/
class Iti21TestRouteBuilder extends RouteBuilder {

private static final Logger LOG = LoggerFactory.getLogger(Iti21TestRouteBuilder.class)

def rsp = '''MSH|^~\\&|MESA_PD_SUPPLIER|PIM|MESA_PD_CONSUMER|MESA_DEPARTMENT|20090901140929||RSP^K22^RSP_K21|356757|P|2.5
MSA|AA|1305506339
QAK|1486133081|OK
Expand Down Expand Up @@ -90,6 +95,22 @@ PID|4||79233^^^HZLN&2.16.840.1.113883.3.37.4.1.1.2.411.1&ISO^PI||Müller^Joach
it.out.headers[MllpComponent.ACK_TYPE_CODE_HEADER] = AcknowledgmentCode.AE
}

// wait "QPD-2" seconds and properly respond with a valid answer
from('pdq-iti21://0.0.0.0:18220')
.process {
int wait = Long.parseLong(it.in.body.QPD[2].value)
LOG.info("Waiting $wait ms")
Thread.sleep(wait)
}
.transmogrify {
def response = MessageUtils.response(it, 'RSP', 'K22')
response.QPD = it.QPD
response.QAK[1] = response.QPD[2].value
response.MSA[1] = 'AA'
LOG.info("Now responding after waiting ${response.QAK[1].value} ms")
response
}

/*
from('pdq-iti21://0.0.0.0:18225?interceptorFactories=#receiveTracingData')
.to("mock:trace")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class TestIti21 extends MllpTestContainer {
doTestHappyCaseAndAudit("pdq-iti21://localhost:18218?secure=true&sslContext=#sslContext&sslCiphers=SSL_RSA_WITH_NULL_SHA,TLS_RSA_WITH_AES_128_CBC_SHA&timeout=${TIMEOUT}", 2)
}

@Test
@Test @Ignore
void testSSLFailureWithIncompatibleProtocols() {
try {
send("pdq-iti21://localhost:18216?secure=true&sslContext=#sslContext&sslProtocols=TLSv1&timeout=${TIMEOUT}", getMessageString('QBP^Q22', '2.5'))
Expand Down Expand Up @@ -176,13 +176,39 @@ class TestIti21 extends MllpTestContainer {
assertEquals(EventIdCode.SecurityAlert, messages[0].getEventIdentification().getEventID())
}

@Test
void testTestTimeoutHandling() {
// Timeout after 500ms, but response takes 1000ms => Exception
doTestWaitAndAssertTimeout("pdq-iti21://localhost:18220?cachedAddress=false&maximumPoolSize=1&timeout=500", 1000)
// Long timeout, response takes 1500ms => check that response is not from previous request, that is handled by now
doTestWaitAndAssertCorrectResponse("pdq-iti21://localhost:18220?cachedAddress=false&maximumPoolSize=1&timeout=5000", 1500)
}

def doTestHappyCaseAndAudit(String endpointUri, int expectedAuditItemsCount) {
final String body = getMessageString('QBP^Q22', '2.5')
def msg = send(endpointUri, body)
assertRSP(msg)
assertEquals(expectedAuditItemsCount, auditSender.messages.size())
}

def doTestWaitAndAssertCorrectResponse(String endpointUri, int timeout) {
final String body = getMessageString('QBP^Q22', '2.5')
final String timeoutString = Integer.toString(timeout)
def msg = send(endpointUri, body.replace('1402274727', timeoutString))
assertRSP(msg)
assertEquals(timeoutString, msg.QAK[1].value)
}

def doTestWaitAndAssertTimeout(String endpointUri, int timeout) {
final String body = getMessageString('QBP^Q22', '2.5')
final String timeoutString = Integer.toString(timeout)
try {
def msg = send(endpointUri, body.replace('1402274727', timeoutString))
fail("Assuming timeout after $timeout ms")
} catch (ExchangeTimedOutException ignored) {
}
}

@Test
void testCustomInterceptorCanThrowAuthenticationException() {
send("pdq-iti21://localhost:18214?timeout=${TIMEOUT}", getMessageString('QBP^Q22', '2.5'))
Expand Down
2 changes: 1 addition & 1 deletion platform-camel/ihe/mllp/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</Appenders>

<Loggers>
<Root level="WARN">
<Root level="INFO">
<AppenderRef ref="CONSOLE"/>
</Root>
</Loggers>
Expand Down

0 comments on commit e2a7772

Please sign in to comment.