-
Notifications
You must be signed in to change notification settings - Fork 165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-4784 interrupt threads using connections when forcefully closing #4787
Conversation
6292077
to
a0e4f58
Compare
…f we are closing from a different thread
a566033
to
7c7cfd3
Compare
if (!running.get()) { | ||
logger.warn( | ||
"LmdbSailStore was closed while active transaction was waiting for the next operation. Forcing a rollback!"); | ||
opQueue.add(ROLLBACK_TRANSACTION); | ||
} else if (Thread.interrupted()) { | ||
throw new InterruptedException(); | ||
} else { | ||
Thread.yield(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kenwenzel I've made some tests to fix some deadlock scenarios in the MemoryStore. The tests open two connections and start two transactions, in two separate threads. One of the tests then tries to call shutdown() on the store, the other test tries to close the connections directly. I ran into an issue with the async transaction handling in the LmdbSailStore that I've tried to handle here.
The issue is that when the executor service is shutdown() it waits indefinitely for the async transaction handling to complete. The async transaction handling can at that point be stuck in a while(true)
loop that is waiting for the transaction to either be committed or rolled back. For some reason it doesn't seem to get rolled back, but adding a check for running
and then rolling back the transaction seems to work.
What do you think @kenwenzel ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for Investigating this. I am not quite sure why this happens as I thought that it is handled by the running flag. But maybe I was wrong.
if (store instanceof AbstractSail) { | ||
((AbstractSail) store).setConnectionTimeOut(200); | ||
} else if (store instanceof SailWrapper) { | ||
Sail baseSail = ((SailWrapper) store).getBaseSail(); | ||
if (baseSail instanceof AbstractSail) { | ||
((AbstractSail) baseSail).setConnectionTimeOut(200); | ||
} | ||
} | ||
|
||
CountDownLatch countDownLatch = new CountDownLatch(1); | ||
Thread thread = new Thread(() -> { | ||
SailConnection connection = store.getConnection(); | ||
countDownLatch.countDown(); | ||
connection.begin(IsolationLevels.NONE); | ||
connection.addStatement(RDF.FIRST, RDF.TYPE, RDF.PROPERTY); | ||
}); | ||
thread.setName("Thread 1"); | ||
thread.start(); | ||
|
||
CountDownLatch countDownLatch2 = new CountDownLatch(1); | ||
Thread thread2 = new Thread(() -> { | ||
SailConnection connection = store.getConnection(); | ||
countDownLatch2.countDown(); | ||
connection.begin(IsolationLevels.NONE); | ||
connection.addStatement(RDF.REST, RDF.TYPE, RDF.PROPERTY); | ||
|
||
}); | ||
thread2.setName("Thread 2"); | ||
thread2.start(); | ||
|
||
countDownLatch.await(); | ||
countDownLatch2.await(); | ||
|
||
Thread.sleep(1000); | ||
|
||
store.shutDown(); | ||
|
||
} | ||
|
||
@Test | ||
public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException { | ||
if (store instanceof AbstractSail) { | ||
((AbstractSail) store).setConnectionTimeOut(200); | ||
} | ||
|
||
try (SailConnection connection = store.getConnection()) { | ||
connection.begin(); | ||
connection.addStatement(RDF.TYPE, RDF.TYPE, RDF.PROPERTY); | ||
connection.commit(); | ||
} | ||
|
||
AtomicReference<SailConnection> connection1 = new AtomicReference<>(); | ||
AtomicReference<SailConnection> connection2 = new AtomicReference<>(); | ||
|
||
CountDownLatch countDownLatch = new CountDownLatch(1); | ||
Thread thread = new Thread(() -> { | ||
connection1.set(store.getConnection()); | ||
countDownLatch.countDown(); | ||
connection1.get().begin(IsolationLevels.NONE); | ||
connection1.get().clear(); | ||
}); | ||
thread.setName("Thread 1"); | ||
thread.start(); | ||
|
||
CountDownLatch countDownLatch2 = new CountDownLatch(1); | ||
Thread thread2 = new Thread(() -> { | ||
connection2.set(store.getConnection()); | ||
countDownLatch2.countDown(); | ||
connection2.get().begin(IsolationLevels.NONE); | ||
connection2.get().clear(); | ||
|
||
}); | ||
thread2.setName("Thread 2"); | ||
thread2.start(); | ||
|
||
countDownLatch.await(); | ||
countDownLatch2.await(); | ||
|
||
Thread.sleep(1000); | ||
|
||
Thread thread3 = new Thread(() -> { | ||
|
||
}); | ||
thread3.setName("Thread 3"); | ||
thread3.start(); | ||
|
||
try { | ||
if (thread2.isAlive()) { | ||
connection2.get().close(); | ||
connection1.get().close(); | ||
} else { | ||
connection1.get().close(); | ||
connection2.get().close(); | ||
} | ||
} catch (SailException ignored) { | ||
} | ||
|
||
store.shutDown(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kenwenzel These are the tests.
GitHub issue resolved: #4784 #4790
Briefly describe the changes proposed in this PR:
This is mainly an issue with the MemoryStore, but has been detected when using the ShaclSail since it uses memory stores for tracking the changes in a transaction.
When one connection holds the MemorySailStore
txnLockManager
then another thread can get stuck waiting for the same lock if both threads are forcefully closed. Two issues are causing this, the first is that thetxnLockManager
can't be forcefully unlocked by another thread, the second is that thetxnLockManager
is being locked without support for interruptions.The fix is to track the owner thread of the connection and interrupt that thread. Since connections shouldn't be shared between threads we can assume that the thread that created the connection is the owner.
PR Author Checklist (see the contributor guidelines for more details):
mvn process-resources
to format from the command line)