Skip to content

Commit

Permalink
Restore fsync behavior in FSDirectory via P/Invoke, #933 (#938)
Browse files Browse the repository at this point in the history
* Restore fsync behavior in FSDirectory via P/Invoke

This restores the commented-out fsync behavior in FSDirectory to help
mitigate a performance regression in .NET 8.

* Use System.IO.Directory.Exists to avoid caching exists status

* Add unit test for ConcurrentHashSet.ExceptWith

* Improve errors thrown by CreateFileW

* Change FileSystemInfo use to string in IOUtils.Fsync

* Change Debug.Assert to Debugging use

* Lucene.Net.Index.TestIndexWriterOnJRECrash::TestNRTThreads_Mem(): Removed AwaitsFix attribute. The FSync implementation should fix this test.

* Make ExceptWith atomic

* Improve error handling if directory not found on Linux/macOS

* Refactor interop methods into separate partial class files

* Lucene.Net.Index.TestIndexWriterOnJRECrash::TestNRTThreads_Mem(): Added [Repeat(25)] attribute.

* Lucene.Net.Index.TestIndexWriterOnJRECrash: Instead of using a temp file to pass the process ID to kill back to the original test process, open a socket and listen for the process ID to be written.

* Synchronize access to stale files collection

This is necessary to prevent race conditions, even though this code is
not in the upstream Java code. A thread could try to add an item to the
collection after it has been synced in `Sync` but before it is removed
from the collection, then the file is removed from the collection,
resulting in a missed sync.

* Rename syncLock to m_syncLock

* Lucene.Net.Index.TestIndexWriterOnJRECrash: Added try/finally block and refactored to ensure the TcpListener and Process are cleaned up at the end of each test iteration. This makes it run ~20% faster.

* Refactor rename namespace to Lucene.Net.Native

* Mark JRE crash test as [AwaitsFix]

---------

Co-authored-by: Shad Storhaug <[email protected]>
  • Loading branch information
paulirwin and NightOwl888 authored Aug 12, 2024
1 parent 38a7b53 commit 0cf7218
Show file tree
Hide file tree
Showing 17 changed files with 855 additions and 137 deletions.
150 changes: 87 additions & 63 deletions src/Lucene.Net.Tests/Index/TestIndexWriterOnJRECrash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
using NUnit.Framework;
using RandomizedTesting.Generators;
using System;
using System.Data;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Text;
using System.Threading;
using BaseDirectoryWrapper = Lucene.Net.Store.BaseDirectoryWrapper;
using Assert = Lucene.Net.TestFramework.Assert;
using Console = Lucene.Net.Util.SystemConsole;

namespace Lucene.Net.Index
Expand Down Expand Up @@ -42,7 +43,7 @@ namespace Lucene.Net.Index
[TestFixture]
public class TestIndexWriterOnJRECrash : TestNRTThreads
{
// LUCENENET: Setup unnecessary because we create a new temp directory
// LUCENENET: Setup of directory unnecessary because we create a new temp directory
// in each iteration of the test.

[Test]
Expand All @@ -51,7 +52,7 @@ public class TestIndexWriterOnJRECrash : TestNRTThreads
public override void TestNRTThreads_Mem()
{
//if we are not the fork
if (SystemProperties.GetProperty("tests:crashmode") is null)
if (!SystemProperties.GetPropertyAsBoolean("tests:crashmode", false))
{
// try up to 10 times to create an index
for (int i = 0; i < 10; i++)
Expand All @@ -63,34 +64,45 @@ public override void TestNRTThreads_Mem()
// lexicographical order rather than checking the one we create in the current iteration.
DirectoryInfo tempDir = CreateTempDir("netcrash");

FileInfo tempProcessToKillFile = CreateTempFile(prefix: "netcrash-processToKill", suffix: ".txt");
tempProcessToKillFile.Delete(); // We use the creation of this file as a signal to parse it.
// Set up a TCP listener to receive the process ID
TcpListener listener = SetupSocketListener();
Process p = null;
try
{
// Get the port that we picked at random.
int port = ((IPEndPoint)listener.LocalEndpoint).Port;

// Note this is the vstest.console process we are tracking here.
Process p = ForkTest(tempDir.FullName, tempProcessToKillFile.FullName);
// Note this is the vstest.console process we are tracking here.
p = ForkTest(tempDir.FullName, port);

TextWriter childOut = BeginOutput(p, out ThreadJob stdOutPumper, out ThreadJob stdErrPumper);
TextWriter childOut = BeginOutput(p, out ThreadJob stdOutPumper, out ThreadJob stdErrPumper);

// LUCENENET: Note that ForkTest() creates the vstest.console.exe process.
// This spawns testhost.exe, which runs our test. We wait until
// the process starts and logs its own Id so we know who to kill later.
int processIdToKill = WaitForProcessToKillLogFile(tempProcessToKillFile.FullName);
// LUCENENET: Note that ForkTest() creates the vstest.console.exe process.
// This spawns testhost.exe, which runs our test. We wait until
// the process starts and transmits its own PID so we know who to kill later.
int processIdToKill = WaitForProcessId(listener);

// Setup a time to crash the forked thread
int crashTime = TestUtil.NextInt32(Random, 4000, 5000); // LUCENENET: Adjusted these up by 1 second to give our tests some more time to spin up
ThreadJob t = new ThreadAnonymousClass(this, crashTime, processIdToKill);
// Setup a time to crash the forked thread
int crashTime = TestUtil.NextInt32(Random, 4000, 5000); // LUCENENET: Adjusted these up by 1 second to give our tests some more time to spin up
ThreadJob t = new ThreadAnonymousClass(this, crashTime, processIdToKill);

t.Priority = ThreadPriority.Highest;
t.Start();
t.Join(); // Wait for our thread to kill the other process
t.Priority = ThreadPriority.Highest;
t.Start();
t.Join(); // Wait for our thread to kill the other process

// if we succeeded in finding an index, we are done.
if (CheckIndexes(tempDir))
{
// if we succeeded in finding an index, we are done.
if (CheckIndexes(tempDir))
{
EndOutput(p, childOut, stdOutPumper, stdErrPumper);
return;
}
EndOutput(p, childOut, stdOutPumper, stdErrPumper);
return;
}
EndOutput(p, childOut, stdOutPumper, stdErrPumper);
finally
{
listener.Stop();
p?.Dispose();
}
}
}
else
Expand All @@ -100,13 +112,12 @@ public override void TestNRTThreads_Mem()

// we are the fork, log our processId so the original test can kill us.
int processIdToKill = Process.GetCurrentProcess().Id;
string processIdToKillFile = SystemProperties.GetProperty("tests:tempProcessToKillFile");
int port = SystemProperties.GetPropertyAsInt32("tests:crashtestport");

assertNotNull("No tests:tempProcessToKillFile value was passed to the fork. This is a required system property.", processIdToKillFile);
assertTrue("No tests:crashtestport value was passed to the fork. This is a required system property.", port > 0);

// Writing this file will kick off the thread that crashes us.
using (var writer = new StreamWriter(processIdToKillFile, append: false, Encoding.UTF8, bufferSize: 32))
writer.WriteLine(processIdToKill.ToString(CultureInfo.InvariantCulture));
// Sending the process id will kick off the thread that crashes us.
SendProcessId(processIdToKill, port);

// run the test until we crash.
for (int i = 0; i < 100; i++)
Expand Down Expand Up @@ -145,7 +156,7 @@ public override void Run()
}
}

public Process ForkTest(string tempDir, string tempProcessToKillFile)
public Process ForkTest(string tempDir, int port)
{
//get the full location of the assembly with DaoTests in it
string testAssemblyPath = Assembly.GetAssembly(typeof(TestIndexWriterOnJRECrash)).Location;
Expand Down Expand Up @@ -174,8 +185,8 @@ public Process ForkTest(string tempDir, string tempProcessToKillFile)
// passing NIGHTLY to this test makes it run for much longer, easier to catch it in the act...
TestRunParameter("tests:nightly", "true"),
TestRunParameter("tempDir", tempDir),
// This file is for passing the process ID of the fork back to the original test so it can kill it.
TestRunParameter("tests:tempProcessToKillFile", tempProcessToKillFile),
// This port is for passing the process ID of the fork back to the original test so it can kill it.
TestRunParameter("tests:crashtestport", port.ToString(CultureInfo.InvariantCulture)),
}),
WorkingDirectory = theDirectory,
RedirectStandardOutput = true,
Expand Down Expand Up @@ -335,48 +346,61 @@ public virtual bool CheckIndexes(FileSystemInfo file)
return false;
}

// LUCENENET: Wait for our test to spin up and log its PID so we can kill it.
private static int WaitForProcessToKillLogFile(string processToKillFile)
private TcpListener SetupSocketListener()
{
bool exists = false;
Thread.Sleep(500);
for (int i = 0; i < 150; i++)
{
if (File.Exists(processToKillFile))
{
exists = true;
break;
}
Thread.Sleep(200);
}
// If the fork didn't log its process id, it is a failure.
assertTrue("The test fork didn't log its process id, so we cannot kill it", exists);
using var reader = new StreamReader(processToKillFile, Encoding.UTF8);
// LUCENENET: Our file only has one line with the process Id in it
return int.Parse(reader.ReadLine().Trim(), CultureInfo.InvariantCulture);
// Pick a random port that is available on the local machine.
TcpListener listener = new TcpListener(IPAddress.Loopback, 0);
listener.Start();
return listener;
}

// LUCENENET: Wait for our test to spin up and send its process ID so we can kill it.
private int WaitForProcessId(TcpListener listener)
{
using var client = listener.AcceptTcpClient();
using var stream = client.GetStream();
// Directly read the process ID as a 32-bit integer
using var reader = new BinaryReader(stream);
return reader.ReadInt32();
}

private void SendProcessId(int processId, int port)
{
using var client = new TcpClient("127.0.0.1", port);
using var stream = client.GetStream();
// Directly write the process ID as a 32-bit integer
using var writer = new BinaryWriter(stream);
writer.Write(processId);
}

public virtual void CrashDotNet(int processIdToKill)
{
Process process = null;
try
{
process = Process.GetProcessById(processIdToKill);
}
catch (ArgumentException)
{
// We get here if the process wasn't running for some reason.
// We should fix the forked test to make it run longer if we get here.
fail("The test completed before we could kill it.");
}
try
{
process = Process.GetProcessById(processIdToKill);
}
catch (ArgumentException)
{
// We get here if the process wasn't running for some reason.
// We should fix the forked test to make it run longer if we get here.
fail("The test completed before we could kill it.");
}
#if FEATURE_PROCESS_KILL_ENTIREPROCESSTREE
process.Kill(entireProcessTree: true);
process.Kill(entireProcessTree: true);
#else
process.Kill();
process.Kill();
#endif
process.WaitForExit(10000);
// We couldn't get .NET to crash for some reason.
assertTrue(process.HasExited);
process.WaitForExit(10000);
// We couldn't get .NET to crash for some reason.
assertTrue(process.HasExited);
}
finally
{
process?.Dispose();
}
}
}
}
51 changes: 51 additions & 0 deletions src/Lucene.Net.Tests/Support/TestConcurrentHashSet.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Lucene.Net.Attributes;
using Lucene.Net.Support;
using NUnit.Framework;
using System.Linq;
using System.Threading.Tasks;

namespace Lucene.Net
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

public class TestConcurrentHashSet
{
[Test, LuceneNetSpecific]
public void TestExceptWith()
{
// Numbers 0-8, 10-80, 99
var initialSet = Enumerable.Range(1, 8)
.Concat(Enumerable.Range(1, 8).Select(i => i * 10))
.Append(99)
.Append(0);

var hashSet = new ConcurrentHashSet<int>(initialSet);

Parallel.ForEach(Enumerable.Range(1, 8), i =>
{
// Remove i and i * 10, i.e. 1 and 10, 2 and 20, etc.
var except = new[] { i, i * 10 };
hashSet.ExceptWith(except);
});

Assert.AreEqual(2, hashSet.Count);
Assert.IsTrue(hashSet.Contains(0));
Assert.IsTrue(hashSet.Contains(99));
}
}
}
Loading

0 comments on commit 0cf7218

Please sign in to comment.