From 13a0a6c34194ba847e26d87714a9f4f2c35bc796 Mon Sep 17 00:00:00 2001 From: Jeevananthan-23 Date: Thu, 2 Nov 2023 21:03:05 +0530 Subject: [PATCH 1/5] initial sequence numbers patch --- .../Index/TestIndexingSequenceNumbers.cs | 321 ++++++++++++++++++ src/Lucene.Net/Index/BufferedUpdates.cs | 2 +- src/Lucene.Net/Index/DocumentsWriter.cs | 42 ++- .../Index/DocumentsWriterDeleteQueue.cs | 104 +++--- .../Index/DocumentsWriterFlushControl.cs | 11 +- .../Index/DocumentsWriterPerThread.cs | 12 +- .../Index/DocumentsWriterPerThreadPool.cs | 20 -- src/Lucene.Net/Index/IndexWriter.cs | 96 ++++-- src/Lucene.Net/Index/TrackingIndexWriter.cs | 3 +- src/Lucene.Net/Index/TwoPhaseCommit.cs | 6 +- 10 files changed, 510 insertions(+), 107 deletions(-) create mode 100644 src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs diff --git a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs new file mode 100644 index 0000000000..1f5c4fab62 --- /dev/null +++ b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs @@ -0,0 +1,321 @@ +using J2N.Threading; +using Lucene.Net.Analysis; +using Lucene.Net.Documents; +using Lucene.Net.Index; +using Lucene.Net.Search; +using Lucene.Net.Store; +using Lucene.Net.Util; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Lucene.Net.Tests.Misc.Index +{ + public class TestIndexingSequenceNumbers : LuceneTestCase + { + [Test] + public void TestBasic() + { + Directory dir = NewDirectory(); + IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); + long a = w.AddDocument(new Document()); + long b = w.AddDocument(new Document()); + assertTrue(b > a); + w.Dispose(); + dir.Dispose(); + } + + [Test] + public void TestAfterRefresh() + { + Directory dir = NewDirectory(); + IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); + long a = w.AddDocument(new Document()); + DirectoryReader.Open(w,true).Dispose(); + long b = w.AddDocument(new Document()); + assertTrue(b > a); + w.Dispose(); + dir.Dispose(); + } + + [Test] + public void TestAfterCommit() + { + Directory dir = NewDirectory(); + IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); + long a = w.AddDocument(new Document()); + w.Commit(); + long b = w.AddDocument(new Document()); + assertTrue(b > a); + w.Dispose(); + dir.Dispose(); + } + + /* [Test] + public void TestStressUpdateSameID() + { + int iters = AtLeast(100); + for (int iter = 0; iter < iters; iter++) + { + Directory dir = NewDirectory(); + // nocommit use RandomIndexWriter + IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)))); + ThreadJob[] threads = new ThreadJob[TestUtil.NextInt32(Random, 2, 5)]; + CountdownEvent startingGun = new CountdownEvent(1); + long[] seqNos = new long[threads.Length]; + Term id = new Term("id", "id"); + // multiple threads update the same document + for (int i = 0; i < threads.Length; i++) + { + int threadID = i; + //threads[i] = new Thread() + //{ + *//*public void run() + { + try + { + Document doc = new Document(); + doc.add(new StoredField("thread", threadID)); + doc.add(new StringField("id", "id", Field.Store.NO)); + startingGun.await(); + for (int j = 0; j < 100; j++) + { + seqNos[threadID] = w.updateDocument(id, doc); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }; + threads[i].start();*//* + //}; + startingGun.CountDown(); + foreach (ThreadJob thread in threads) + { + thread.Join(); + } + + // now confirm that the reported sequence numbers agree with the index: + int maxThread = 0; + var allSeqNos = new HashSet(); + for (int i = 0; i < threads.Length; i++) + { + allSeqNos.add(seqNos[i]); + if (seqNos[i] > seqNos[maxThread]) + { + maxThread = i; + } + } + // make sure all sequence numbers were different + assertEquals(threads.Length, allSeqNos.size()); + DirectoryReader r = DirectoryReader.Open(w); + IndexSearcher s = NewSearcher(r); + TopDocs hits = s.Search(new TermQuery(id), 1); + assertEquals(1, hits.TotalHits); + Document doc = r.Document(hits.ScoreDocs[0].Doc); + assertEquals(maxThread, doc.GetField("thread").NumericValue.intValue()); + r.Dispose(); + w.Dispose(); + dir.Dispose(); + } + } + + static class Operation + { + // 0 = update, 1 = delete, 2 = commit + static byte what; + static int id; + static int threadID; + static long seqNo; + } + + public void testStressConcurrentCommit() + { + int opCount = AtLeast(10000); + int idCount = TestUtil.NextInt32(Random, 10, 1000); + + Directory dir = NewDirectory(); + // nocommit use RandomIndexWriter + IndexWriterConfig iwc = NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)); + iwc.IndexDeletionPolicy = (NoDeletionPolicy.INSTANCE); + IndexWriter w = new IndexWriter(dir, iwc); + int numThreads = TestUtil.NextInt32(Random, 2, 5); + Thread[] threads = new Thread[numThreads]; + //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length); + CountDownLatch startingGun = new CountDownLatch(1); + List> threadOps = new ArrayList<>(); + + Object commitLock = new Object(); + List commits = new ArrayList<>(); + AtomicInteger opsSinceCommit = new AtomicInteger(); + + // multiple threads update the same set of documents, and we randomly commit + for (int i = 0; i < threads.length; i++) + { + List ops = new ArrayList<>(); + threadOps.add(ops); + int threadID = i; + threads[i] = new Thread() { + public void run() + { + try + { + startingGun.await(); + for (int i = 0; i < opCount; i++) + { + Operation op = new Operation(); + op.threadID = threadID; + if (random().nextInt(500) == 17) + { + op.what = 2; + synchronized(commitLock) { + // nocommit why does this sometimes fail :) + //if (w.hasUncommittedChanges()) { + if (opsSinceCommit.get() > numThreads) + { + op.seqNo = w.commit(); + commits.add(op); + opsSinceCommit.set(0); + } + //System.out.println("done commit seqNo=" + op.seqNo); + } + } + else + { + op.id = random().nextInt(idCount); + Term idTerm = new Term("id", "" + op.id); + if (random().nextInt(10) == 1) + { + op.what = 1; + op.seqNo = w.deleteDocuments(idTerm); + } + else + { + Document doc = new Document(); + doc.add(new StoredField("thread", threadID)); + doc.add(new StringField("id", "" + op.id, Field.Store.NO)); + op.seqNo = w.UpdateDocument(idTerm, doc); + op.what = 2; + } + ops.Add(op); + opsSinceCommit.getAndIncrement(); + } + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + }; + threads[i].start(); + } + startingGun.countDown(); +for (Thread thread : threads) +{ + thread.join(); +} + +Operation commitOp = new Operation(); +synchronized(commitLock) { + commitOp.seqNo = w.commit(); + commits.add(commitOp); +} + +List indexCommits = DirectoryReader.listCommits(dir); +assertEquals(commits.size(), indexCommits.size()); + +int[] expectedThreadIDs = new int[idCount]; +long[] seqNos = new long[idCount]; + +//System.out.println("TEST: " + commits.size() + " commits"); +for (int i = 0; i < commits.size(); i++) +{ + // this commit point should reflect all operations <= this seqNo + long commitSeqNo = commits.get(i).seqNo; + //System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i)); + + Arrays.fill(expectedThreadIDs, -1); + Arrays.fill(seqNos, 0); + + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + long lastSeqNo = 0; + for (Operation op : threadOps.get(threadID)) + { + if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) + { + seqNos[op.id] = op.seqNo; + if (op.what == 2) + { + expectedThreadIDs[op.id] = threadID; + } + else + { + expectedThreadIDs[op.id] = -1; + } + } + + assertTrue(op.seqNo >= lastSeqNo); + lastSeqNo = op.seqNo; + } + } + + DirectoryReader r = DirectoryReader.open(indexCommits.get(i)); + IndexSearcher s = new IndexSearcher(r); + + for (int id = 0; id < idCount; id++) + { + //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]); + TopDocs hits = s.search(new TermQuery(new Term("id", "" + id)), 1); + + if (expectedThreadIDs[id] != -1) + { + assertEquals(1, hits.totalHits); + Document doc = r.document(hits.scoreDocs[0].doc); + int actualThreadID = doc.getField("thread").numericValue().intValue(); + if (expectedThreadIDs[id] != actualThreadID) + { + System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + for (Operation op : threadOps.get(threadID)) + { + if (id == op.id) + { + System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted")); + } + } + } + assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID); + } + } + else if (hits.totalHits != 0) + { + System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + for (Operation op : threadOps.get(threadID)) + { + if (id == op.id) + { + System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del")); + } + } + } + assertEquals(0, hits.totalHits); + } + } + w.close(); + r.close(); +} + +dir.close(); + }*/ + + // nocommit test that does n ops across threads, then does it again with a single index / single thread, and assert indices are the same + } +} diff --git a/src/Lucene.Net/Index/BufferedUpdates.cs b/src/Lucene.Net/Index/BufferedUpdates.cs index 114588876d..321accf3b8 100644 --- a/src/Lucene.Net/Index/BufferedUpdates.cs +++ b/src/Lucene.Net/Index/BufferedUpdates.cs @@ -29,7 +29,7 @@ namespace Lucene.Net.Index /// single segment. this is used to hold buffered pending /// deletes and updates against the to-be-flushed segment. Once the /// deletes and updates are pushed (on flush in ), they - /// are converted to a FrozenDeletes instance. + /// are converted to a FrozenBufferedUpdates instance. /// /// NOTE: instances of this class are accessed either via a private /// instance on , or via sync'd code by diff --git a/src/Lucene.Net/Index/DocumentsWriter.cs b/src/Lucene.Net/Index/DocumentsWriter.cs index c967e11605..7a53b39437 100644 --- a/src/Lucene.Net/Index/DocumentsWriter.cs +++ b/src/Lucene.Net/Index/DocumentsWriter.cs @@ -152,6 +152,7 @@ internal bool DeleteQueries(params Query[] queries) DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; deleteQueue.AddDelete(queries); flushControl.DoOnDelete(); + //nocommit long return ApplyAllDeletes(deleteQueue); } finally @@ -163,16 +164,20 @@ internal bool DeleteQueries(params Query[] queries) // TODO: we could check w/ FreqProxTermsWriter: if the // term doesn't exist, don't bother buffering into the // per-DWPT map (but still must go into the global map) - internal bool DeleteTerms(params Term[] terms) + internal long DeleteTerms(params Term[] terms) { UninterruptableMonitor.Enter(this); try { // TODO why is this synchronized? DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.AddDelete(terms); + long seqNo = deleteQueue.AddDelete(terms); flushControl.DoOnDelete(); - return ApplyAllDeletes(deleteQueue); + if (ApplyAllDeletes(deleteQueue)) + { + seqNo = -seqNo; + } + return seqNo; } finally { @@ -566,13 +571,14 @@ internal bool UpdateDocuments(IEnumerable> docs, An return PostUpdate(flushingDWPT, hasEvents); } - internal bool UpdateDocument(IEnumerable doc, Analyzer analyzer, Term delTerm) + internal long UpdateDocument(IEnumerable doc, Analyzer analyzer, Term delTerm) { bool hasEvents = PreUpdate(); ThreadState perThread = flushControl.ObtainAndLock(); DocumentsWriterPerThread flushingDWPT; + long seqno; try { if (!perThread.IsActive) @@ -586,7 +592,7 @@ internal bool UpdateDocument(IEnumerable doc, Analyzer analyzer int dwptNumDocs = dwpt.NumDocsInRAM; try { - dwpt.UpdateDocument(doc, analyzer, delTerm); + seqno = dwpt.UpdateDocument(doc, analyzer, delTerm); numDocsInRAM.IncrementAndGet(); } finally @@ -609,7 +615,14 @@ internal bool UpdateDocument(IEnumerable doc, Analyzer analyzer perThreadPool.Release(perThread); } - return PostUpdate(flushingDWPT, hasEvents); + if (PostUpdate(flushingDWPT, hasEvents)) + { + return -seqno; + } + else + { + return seqno; + } } private bool DoFlush(DocumentsWriterPerThread flushingDWPT) @@ -760,13 +773,14 @@ private bool SetFlushingDeleteQueue(DocumentsWriterDeleteQueue session) * is called after this method, to release the flush lock in DWFlushControl */ - internal bool FlushAllThreads(IndexWriter indexWriter) + internal long FlushAllThreads(IndexWriter indexWriter) { DocumentsWriterDeleteQueue flushingDeleteQueue; if (infoStream.IsEnabled("DW")) { infoStream.Message("DW", "startFullFlush"); } + long seqNo; UninterruptableMonitor.Enter(this); try @@ -776,8 +790,8 @@ internal bool FlushAllThreads(IndexWriter indexWriter) /* Cutover to a new delete queue. this must be synced on the flush control * otherwise a new DWPT could sneak into the loop with an already flushing * delete queue */ - flushControl.MarkForFullFlush(); // swaps the delQueue synced on FlushControl - if (Debugging.AssertsEnabled) Debugging.Assert(SetFlushingDeleteQueue(flushingDeleteQueue)); + seqNo = flushControl.MarkForFullFlush(); // swaps the delQueue synced on FlushControl + if (Debugging.AssertsEnabled) Debugging.Assert(SetFlushingDeleteQueue(flushingDeleteQueue)); } finally { @@ -815,7 +829,15 @@ internal bool FlushAllThreads(IndexWriter indexWriter) { if (Debugging.AssertsEnabled) Debugging.Assert(flushingDeleteQueue == currentFullFlushDelQueue); } - return anythingFlushed; + + if (anythingFlushed) + { + return -seqNo; + } + else + { + return seqNo; + } } internal void FinishFullFlush(bool success) diff --git a/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs b/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs index cd2c3b340e..f2eadb1a4a 100644 --- a/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs +++ b/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs @@ -1,4 +1,5 @@ -using Lucene.Net.Diagnostics; +using J2N.Threading.Atomic; +using Lucene.Net.Diagnostics; using Lucene.Net.Search; using Lucene.Net.Support; using Lucene.Net.Support.Threading; @@ -72,43 +73,52 @@ internal sealed class DocumentsWriterDeleteQueue private readonly DeleteSlice globalSlice; private readonly BufferedUpdates globalBufferedUpdates; + + private long gen; + /* only acquired to update the global deletes */ private readonly ReentrantLock globalBufferLock = new ReentrantLock(); internal readonly long generation; + internal readonly AtomicInt64 seqNo; + + // seqNo must start at 1 because some APIs negate this to encode a boolean internal DocumentsWriterDeleteQueue() - : this(0) + :this(0, 1) { } - internal DocumentsWriterDeleteQueue(long generation) - : this(new BufferedUpdates(), generation) + internal DocumentsWriterDeleteQueue(long generation, long startSeqNo) + : this(new BufferedUpdates(), generation, startSeqNo) { } - internal DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation) + internal DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) { this.globalBufferedUpdates = globalBufferedUpdates; this.generation = generation; - /* - * we use a sentinel instance as our initial tail. No slice will ever try to - * apply this tail since the head is always omitted. - */ - tail = new Node(null); // sentinel + this.seqNo = new AtomicInt64(startSeqNo); + /* + * we use a sentinel instance as our initial tail. No slice will ever try to + * apply this tail since the head is always omitted. + */ + tail = new Node(null); // sentinel globalSlice = new DeleteSlice(tail); } - internal void AddDelete(params Query[] queries) + internal long AddDelete(params Query[] queries) { - Add(new QueryArrayNode(queries)); + long seqNo = Add(new QueryArrayNode(queries)); TryApplyGlobalSlice(); + return seqNo; } - internal void AddDelete(params Term[] terms) + internal long AddDelete(params Term[] terms) { - Add(new TermArrayNode(terms)); + long seqNo = Add(new TermArrayNode(terms)); TryApplyGlobalSlice(); + return seqNo; } internal void AddNumericUpdate(NumericDocValuesUpdate update) @@ -126,10 +136,10 @@ internal void AddBinaryUpdate(BinaryDocValuesUpdate update) /// /// invariant for document update /// - internal void Add(Term term, DeleteSlice slice) + internal long Add(Term term, DeleteSlice slice) { TermNode termNode = new TermNode(term); - Add(termNode); + long seqNo = Add(termNode); /* * this is an update request where the term is the updated documents * delTerm. in that case we need to guarantee that this insert is atomic @@ -144,49 +154,59 @@ internal void Add(Term term, DeleteSlice slice) if (Debugging.AssertsEnabled) Debugging.Assert(slice.sliceHead != slice.sliceTail, "slice head and tail must differ after add"); TryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe // we can do it just every n times or so? + return seqNo; } - internal void Add(Node item) + // nocommit can we remove the sync'd + internal long Add(Node newNode) { + UninterruptableMonitor.Enter(this); /* * this non-blocking / 'wait-free' linked list add was inspired by Apache * Harmony's ConcurrentLinkedQueue Implementation. */ - while (true) + try { - Node currentTail = this.tail; - Node tailNext = currentTail.next; - if (tail == currentTail) + while (true) { - if (tailNext != null) - { - /* - * we are in intermediate state here. the tails next pointer has been - * advanced but the tail itself might not be updated yet. help to - * advance the tail and try again updating it. - */ - Interlocked.CompareExchange(ref tail, tailNext, currentTail); // can fail - } - else + Node currentTail = this.tail; + Node tailNext = currentTail.next; + if (tail == currentTail) { - /* - * we are in quiescent state and can try to insert the item to the - * current tail if we fail to insert we just retry the operation since - * somebody else has already added its item - */ - if (currentTail.CasNext(null, item)) + if (tailNext != null) { /* - * now that we are done we need to advance the tail while another - * thread could have advanced it already so we can ignore the return - * type of this CAS call + * we are in intermediate state here. the tails next pointer has been + * advanced but the tail itself might not be updated yet. help to + * advance the tail and try again updating it. */ - Interlocked.CompareExchange(ref tail, item, currentTail); - return; + Interlocked.CompareExchange(ref tail, tailNext, currentTail); // can fail + } + else + { + /* + * we are in quiescent state and can try to insert the new node to the + * current tail if we fail to insert we just retry the operation since + * somebody else has already added its item + */ + if (currentTail.CasNext(null, newNode)) + { + /* + * now that we are done we need to advance the tail while another + * thread could have advanced it already so we can ignore the return + * type of this CAS call + */ + Interlocked.CompareExchange(ref tail, newNode, currentTail); + return seqNo.GetAndIncrement(); + } } } } } + finally + { + UninterruptableMonitor.Exit(this); + } } internal bool AnyChanges() diff --git a/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs b/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs index b26b073784..ba0d8de426 100644 --- a/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs +++ b/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs @@ -681,9 +681,10 @@ internal ThreadState ObtainAndLock() } } - internal void MarkForFullFlush() + internal long MarkForFullFlush() { DocumentsWriterDeleteQueue flushingQueue; + long seqNo; UninterruptableMonitor.Enter(this); try { @@ -696,8 +697,11 @@ internal void MarkForFullFlush() flushingQueue = documentsWriter.deleteQueue; // Set a new delete queue - all subsequent DWPT will use this queue until // we do another full flush - DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation + 1); - documentsWriter.deleteQueue = newQueue; + seqNo = documentsWriter.deleteQueue.seqNo.Value + perThreadPool.NumThreadStatesActive; + + // nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would + // have to get this next delete queue? + DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation + 1, seqNo + 1); } finally { @@ -756,6 +760,7 @@ internal void MarkForFullFlush() UninterruptableMonitor.Exit(this); } if (Debugging.AssertsEnabled) Debugging.Assert(AssertActiveDeleteQueue(documentsWriter.deleteQueue)); + return seqNo; } private bool AssertActiveDeleteQueue(DocumentsWriterDeleteQueue queue) diff --git a/src/Lucene.Net/Index/DocumentsWriterPerThread.cs b/src/Lucene.Net/Index/DocumentsWriterPerThread.cs index 95e5f03fb8..4d3cfc8595 100644 --- a/src/Lucene.Net/Index/DocumentsWriterPerThread.cs +++ b/src/Lucene.Net/Index/DocumentsWriterPerThread.cs @@ -271,7 +271,7 @@ internal bool TestPoint(string message) return true; } - public virtual void UpdateDocument(IEnumerable doc, Analyzer analyzer, Term delTerm) + public virtual long UpdateDocument(IEnumerable doc, Analyzer analyzer, Term delTerm) { if (Debugging.AssertsEnabled) { @@ -327,7 +327,7 @@ public virtual void UpdateDocument(IEnumerable doc, Analyzer an Abort(filesToDelete); } } - FinishDocument(delTerm); + return FinishDocument(delTerm); } public virtual int UpdateDocuments(IEnumerable> docs, Analyzer analyzer, Term delTerm) @@ -424,7 +424,7 @@ public virtual int UpdateDocuments(IEnumerable> doc } [MethodImpl(MethodImplOptions.NoInlining)] - private void FinishDocument(Term delTerm) + private long FinishDocument(Term delTerm) { /* * here we actually finish the document in two steps 1. push the delete into @@ -435,14 +435,17 @@ private void FinishDocument(Term delTerm) * since we updated the slice the last time. */ bool applySlice = numDocsInRAM != 0; + long seqNo; if (delTerm != null) { - deleteQueue.Add(delTerm, deleteSlice); + seqNo = deleteQueue.Add(delTerm, deleteSlice); if (Debugging.AssertsEnabled) Debugging.Assert(deleteSlice.IsTailItem(delTerm), "expected the delete term as the tail item"); } else { applySlice &= deleteQueue.UpdateSlice(deleteSlice); + // nocommit we don't need to increment here? + seqNo = deleteQueue.seqNo; } if (applySlice) @@ -454,6 +457,7 @@ private void FinishDocument(Term delTerm) deleteSlice.Reset(); } ++numDocsInRAM; + return seqNo; } // Buffer a specific docID for deletion. Currently only diff --git a/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs b/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs index f567a455fa..7e1f7be80c 100644 --- a/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs +++ b/src/Lucene.Net/Index/DocumentsWriterPerThreadPool.cs @@ -415,26 +415,6 @@ internal ThreadState GetThreadState(int ord) return threadStates[ord]; } - /// - /// Returns the with the minimum estimated number of threads - /// waiting to acquire its lock or null if no - /// is yet visible to the calling thread. - /// - internal ThreadState MinContendedThreadState() - { - ThreadState minThreadState = null; - int limit = numThreadStatesActive; - for (int i = 0; i < limit; i++) - { - ThreadState state = threadStates[i]; - if (minThreadState is null || state.QueueLength < minThreadState.QueueLength) - { - minThreadState = state; - } - } - return minThreadState; - } - /// /// Returns the number of currently deactivated instances. /// A deactivated should not be used for indexing anymore. diff --git a/src/Lucene.Net/Index/IndexWriter.cs b/src/Lucene.Net/Index/IndexWriter.cs index fd81b70740..8d7ca81231 100644 --- a/src/Lucene.Net/Index/IndexWriter.cs +++ b/src/Lucene.Net/Index/IndexWriter.cs @@ -3,6 +3,7 @@ using Lucene.Net.Diagnostics; using Lucene.Net.Support; using Lucene.Net.Support.Threading; +using Lucene.Net.Util; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -188,7 +189,7 @@ namespace Lucene.Net.Index * keeps track of the last non commit checkpoint. */ - public class IndexWriter : IDisposable, ITwoPhaseCommit + public class IndexWriter : IDisposable, ITwoPhaseCommit, IAccountable { private const int UNBOUNDED_MAX_MERGE_SEGMENTS = -1; @@ -234,6 +235,7 @@ public class IndexWriter : IDisposable, ITwoPhaseCommit private IList rollbackSegments; // list of segmentInfo we will fallback to if the commit fails internal volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit()) + internal AtomicInt64 pendingSeqNo; internal long pendingCommitChangeCount; private ICollection filesToCommit; @@ -383,7 +385,17 @@ public virtual DirectoryReader GetReader(bool applyAllDeletes) bool success = false; try { - anySegmentFlushed = docWriter.FlushAllThreads(this); + // nocommit should we make this available in the returned NRT reader? + long seqNo = docWriter.FlushAllThreads(this); + if (seqNo < 0) + { + anySegmentFlushed = true; + seqNo = -seqNo; + } + else + { + anySegmentFlushed = false; + } if (!anySegmentFlushed) { // prevent double increment since docWriter#doFlush increments the flushcount @@ -1599,9 +1611,9 @@ public virtual bool HasDeletions() /// /// if the index is corrupt /// if there is a low-level IO error - public virtual void AddDocument(IEnumerable doc) + public virtual long AddDocument(IEnumerable doc) { - AddDocument(doc, analyzer); + return AddDocument(doc, analyzer); } /// @@ -1618,9 +1630,9 @@ public virtual void AddDocument(IEnumerable doc) /// /// if the index is corrupt /// if there is a low-level IO error - public virtual void AddDocument(IEnumerable doc, Analyzer analyzer) + public virtual long AddDocument(IEnumerable doc, Analyzer analyzer) { - UpdateDocument(null, doc, analyzer); + return UpdateDocument(null, doc, analyzer); } /// @@ -1752,20 +1764,25 @@ public virtual void UpdateDocuments(Term delTerm, IEnumerable the term to identify the documents to be deleted /// if the index is corrupt /// if there is a low-level IO error - public virtual void DeleteDocuments(Term term) + public virtual long DeleteDocuments(Term term) { EnsureOpen(); try { - if (docWriter.DeleteTerms(term)) + long seqNo = docWriter.DeleteTerms(term); + if (seqNo < 0) { + seqNo = -seqNo; ProcessEvents(true, false); } + return seqNo; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "DeleteDocuments(Term)"); } + // dead code but javac disagrees: + return -1; } /// @@ -1884,20 +1901,25 @@ public virtual bool TryDeleteDocument(IndexReader readerIn, int docID) /// to be deleted /// if the index is corrupt /// if there is a low-level IO error - public virtual void DeleteDocuments(params Term[] terms) + public virtual long DeleteDocuments(params Term[] terms) { EnsureOpen(); try { - if (docWriter.DeleteTerms(terms)) + long seqNo = docWriter.DeleteTerms(terms); + if (seqNo < 0) { + seqNo = -seqNo; ProcessEvents(true, false); } + return seqNo; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "DeleteDocuments(Term..)"); } + // dead code but javac disagrees: + return -1; } /// @@ -1993,7 +2015,7 @@ public virtual void UpdateDocument(Term term, IEnumerable doc) /// the analyzer to use when analyzing the document /// if the index is corrupt /// if there is a low-level IO error - public virtual void UpdateDocument(Term term, IEnumerable doc, Analyzer analyzer) + public virtual long UpdateDocument(Term term, IEnumerable doc, Analyzer analyzer) { EnsureOpen(); try @@ -2001,11 +2023,14 @@ public virtual void UpdateDocument(Term term, IEnumerable doc, bool success = false; try { - if (docWriter.UpdateDocument(doc, analyzer, term)) + long seqNo = docWriter.UpdateDocument(doc, analyzer, term); + if (seqNo < 0) { + seqNo = - seqNo; ProcessEvents(true, false); } success = true; + return seqNo; } finally { @@ -2022,6 +2047,8 @@ public virtual void UpdateDocument(Term term, IEnumerable doc, { HandleOOM(oom, "UpdateDocument"); } + // dead code but javac disagrees: + return -1; } /// @@ -3865,13 +3892,13 @@ protected virtual void DoBeforeFlush() /// you should immediately dispose the writer. See /// for details. /// - public void PrepareCommit() + public long PrepareCommit() { EnsureOpen(); - PrepareCommitInternal(); + return PrepareCommitInternal(); } - private void PrepareCommitInternal() + private long PrepareCommitInternal() { UninterruptableMonitor.Enter(commitLock); try @@ -3898,7 +3925,7 @@ private void PrepareCommitInternal() if (Lucene.Net.Diagnostics.Debugging.AssertsEnabled) TestPoint("startDoFlush"); SegmentInfos toCommit = null; bool anySegmentsFlushed = false; - + long seqNo; // this is copied from doFlush, except it's modified to // clone & incRef the flushed SegmentInfos inside the // sync block: @@ -3912,7 +3939,12 @@ private void PrepareCommitInternal() bool success = false; try { - anySegmentsFlushed = docWriter.FlushAllThreads(this); + seqNo = docWriter.FlushAllThreads(this); + if (seqNo < 0) + { + anySegmentsFlushed = true; + seqNo = -seqNo; + } if (!anySegmentsFlushed) { // prevent double increment since docWriter#doFlush increments the flushcount @@ -3974,6 +4006,8 @@ private void PrepareCommitInternal() catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "PrepareCommit"); + // dead code but javac disagrees: + seqNo = -1; } bool success_ = false; @@ -3985,6 +4019,7 @@ private void PrepareCommitInternal() } StartCommit(toCommit); success_ = true; + return seqNo; } finally { @@ -4089,10 +4124,10 @@ public IDictionary CommitData /// you should immediately dispose the writer. See /// for details. /// - public void Commit() + public long Commit() { EnsureOpen(); - CommitInternal(); + return CommitInternal(); } /// @@ -4111,13 +4146,15 @@ public bool HasUncommittedChanges() return changeCount != lastCommitChangeCount || docWriter.AnyChanges() || bufferedUpdatesStream.Any(); } - private void CommitInternal() + private long CommitInternal() { if (infoStream.IsEnabled("IW")) { infoStream.Message("IW", "commit: start"); } + long seqNo; + UninterruptableMonitor.Enter(commitLock); try { @@ -4134,7 +4171,7 @@ private void CommitInternal() { infoStream.Message("IW", "commit: now prepare"); } - PrepareCommitInternal(); + seqNo = PrepareCommitInternal(); } else { @@ -4142,14 +4179,16 @@ private void CommitInternal() { infoStream.Message("IW", "commit: already prepared"); } + seqNo = pendingSeqNo; } - FinishCommit(); + FinishCommit(); } finally { UninterruptableMonitor.Exit(commitLock); } + return seqNo; } private void FinishCommit() @@ -4264,7 +4303,16 @@ private bool DoFlush(bool applyAllDeletes) bool flushSuccess = false; try { - anySegmentFlushed = docWriter.FlushAllThreads(this); + long seqNo = docWriter.FlushAllThreads(this); + if (seqNo < 0) + { + seqNo = -seqNo; + anySegmentFlushed = true; + } + else + { + anySegmentFlushed = false; + } flushSuccess = true; } finally @@ -6484,5 +6532,7 @@ private static bool SlowFileExists(Directory dir, string fileName) return false; } } + + public long RamBytesUsed() => throw new NotImplementedException(); } } \ No newline at end of file diff --git a/src/Lucene.Net/Index/TrackingIndexWriter.cs b/src/Lucene.Net/Index/TrackingIndexWriter.cs index a589814ce6..61501a2d32 100644 --- a/src/Lucene.Net/Index/TrackingIndexWriter.cs +++ b/src/Lucene.Net/Index/TrackingIndexWriter.cs @@ -1,4 +1,4 @@ -using J2N.Threading.Atomic; +using J2N.Threading.Atomic; using System.Collections.Generic; namespace Lucene.Net.Index @@ -38,6 +38,7 @@ namespace Lucene.Net.Index /// @lucene.experimental /// + // nocommit removeme public class TrackingIndexWriter { private readonly IndexWriter writer; diff --git a/src/Lucene.Net/Index/TwoPhaseCommit.cs b/src/Lucene.Net/Index/TwoPhaseCommit.cs index af896b5bd4..ee4afb9b40 100644 --- a/src/Lucene.Net/Index/TwoPhaseCommit.cs +++ b/src/Lucene.Net/Index/TwoPhaseCommit.cs @@ -1,4 +1,4 @@ -namespace Lucene.Net.Index +namespace Lucene.Net.Index { /* * Licensed to the Apache Software Foundation (ASF) under one or more @@ -32,7 +32,7 @@ public interface ITwoPhaseCommit /// 2-phase commit fails, is called to discard all changes /// since last successful commit. /// - void PrepareCommit(); + long PrepareCommit(); /// /// The second phase of a 2-phase commit. Implementations should ideally do @@ -40,7 +40,7 @@ public interface ITwoPhaseCommit /// after it returns, the caller can assume that the changes were successfully /// committed to the underlying storage. /// - void Commit(); + long Commit(); /// /// Discards any changes that have occurred since the last commit. In a 2-phase From 281a839d45f081c98060538c7c3da625f58635e3 Mon Sep 17 00:00:00 2001 From: Jeevananthan-23 Date: Fri, 3 Nov 2023 18:27:15 +0530 Subject: [PATCH 2/5] completely cutover TrackingIndexWriter and all IW change to return seqNo --- .../Directory/DirectoryTaxonomyWriter.cs | 8 +- .../Index/RandomIndexWriter.cs | 46 +-- .../Index/TestIndexingSequenceNumbers.cs | 3 +- .../Index/TestIndexWriterDelete.cs | 4 +- .../Index/TestRollingUpdates.cs | 2 +- src/Lucene.Net.Tests/Index/TestTryDelete.cs | 16 +- .../Index/TestTwoPhaseCommitTool.cs | 14 +- .../TestControlledRealTimeReopenThread.cs | 44 ++- src/Lucene.Net/Index/DocumentsWriter.cs | 58 ++-- .../Index/DocumentsWriterDeleteQueue.cs | 50 +--- .../Index/DocumentsWriterPerThread.cs | 13 +- src/Lucene.Net/Index/IndexWriter.cs | 104 +++++-- .../Index/StandardDirectoryReader.cs | 2 +- src/Lucene.Net/Index/TrackingIndexWriter.cs | 262 ------------------ .../Search/ControlledRealTimeReopenThread.cs | 16 +- 15 files changed, 208 insertions(+), 434 deletions(-) delete mode 100644 src/Lucene.Net/Index/TrackingIndexWriter.cs diff --git a/src/Lucene.Net.Facet/Taxonomy/Directory/DirectoryTaxonomyWriter.cs b/src/Lucene.Net.Facet/Taxonomy/Directory/DirectoryTaxonomyWriter.cs index 37189a01b8..f95395d5f4 100644 --- a/src/Lucene.Net.Facet/Taxonomy/Directory/DirectoryTaxonomyWriter.cs +++ b/src/Lucene.Net.Facet/Taxonomy/Directory/DirectoryTaxonomyWriter.cs @@ -745,7 +745,7 @@ private void RefreshReaderManager() } } - public virtual void Commit() + public virtual long Commit() { UninterruptableMonitor.Enter(syncLock); try @@ -758,7 +758,7 @@ public virtual void Commit() { indexWriter.SetCommitData(CombinedCommitData(indexWriter.CommitData)); } - indexWriter.Commit(); + return indexWriter.Commit(); } finally { @@ -790,7 +790,7 @@ public virtual void SetCommitData(IDictionary commitUserData) /// prepare most of the work needed for a two-phase commit. /// See . /// - public virtual void PrepareCommit() + public virtual long PrepareCommit() { UninterruptableMonitor.Enter(syncLock); try @@ -803,7 +803,7 @@ public virtual void PrepareCommit() { indexWriter.SetCommitData(CombinedCommitData(indexWriter.CommitData)); } - indexWriter.PrepareCommit(); + return indexWriter.PrepareCommit(); } finally { diff --git a/src/Lucene.Net.TestFramework/Index/RandomIndexWriter.cs b/src/Lucene.Net.TestFramework/Index/RandomIndexWriter.cs index daa030d0ca..58df1f666c 100644 --- a/src/Lucene.Net.TestFramework/Index/RandomIndexWriter.cs +++ b/src/Lucene.Net.TestFramework/Index/RandomIndexWriter.cs @@ -125,27 +125,29 @@ public RandomIndexWriter(Random r, Directory dir, IndexWriterConfig c) /// /// Adds a Document. /// - public virtual void AddDocument(IEnumerable doc) + public virtual long AddDocument(IEnumerable doc) { - AddDocument(doc, IndexWriter.Analyzer); + return AddDocument(doc, IndexWriter.Analyzer); } - public virtual void AddDocument(IEnumerable doc, Analyzer a) + public virtual long AddDocument(IEnumerable doc, Analyzer a) { + long seqNo; if (r.Next(5) == 3) { // TODO: maybe, we should simply buffer up added docs // (but we need to clone them), and only when // getReader, commit, etc. are called, we do an // addDocuments? Would be better testing. - IndexWriter.AddDocuments(new EnumerableAnonymousClass(doc), a); + seqNo = IndexWriter.AddDocuments(new EnumerableAnonymousClass(doc), a); } else { - IndexWriter.AddDocument(doc, a); + seqNo = IndexWriter.AddDocument(doc, a); } MaybeCommit(); + return seqNo; } private sealed class EnumerableAnonymousClass : IEnumerable> @@ -223,32 +225,36 @@ private void MaybeCommit() } } - public virtual void AddDocuments(IEnumerable> docs) + public virtual long AddDocuments(IEnumerable> docs) { - IndexWriter.AddDocuments(docs); + long seqNo = IndexWriter.AddDocuments(docs); MaybeCommit(); + return seqNo; } - public virtual void UpdateDocuments(Term delTerm, IEnumerable> docs) + public virtual long UpdateDocuments(Term delTerm, IEnumerable> docs) { - IndexWriter.UpdateDocuments(delTerm, docs); + long seqNo = IndexWriter.UpdateDocuments(delTerm, docs); MaybeCommit(); + return seqNo; } /// /// Updates a document. /// - public virtual void UpdateDocument(Term t, IEnumerable doc) + public virtual long UpdateDocument(Term t, IEnumerable doc) { + long seqNo; if (r.Next(5) == 3) { - IndexWriter.UpdateDocuments(t, new EnumerableAnonymousClass2(doc)); + seqNo = IndexWriter.UpdateDocuments(t, new EnumerableAnonymousClass2(doc)); } else { - IndexWriter.UpdateDocument(t, doc); + seqNo = IndexWriter.UpdateDocument(t, doc); } MaybeCommit(); + return seqNo; } private sealed class EnumerableAnonymousClass2 : IEnumerable> @@ -306,25 +312,25 @@ public void Dispose() } } - public virtual void AddIndexes(params Directory[] dirs) + public virtual long AddIndexes(params Directory[] dirs) => IndexWriter.AddIndexes(dirs); - public virtual void AddIndexes(params IndexReader[] readers) + public virtual long AddIndexes(params IndexReader[] readers) => IndexWriter.AddIndexes(readers); - public virtual void UpdateNumericDocValue(Term term, string field, long? value) + public virtual long UpdateNumericDocValue(Term term, string field, long? value) => IndexWriter.UpdateNumericDocValue(term, field, value); - public virtual void UpdateBinaryDocValue(Term term, string field, BytesRef value) + public virtual long UpdateBinaryDocValue(Term term, string field, BytesRef value) => IndexWriter.UpdateBinaryDocValue(term, field, value); - public virtual void DeleteDocuments(Term term) + public virtual long DeleteDocuments(Term term) => IndexWriter.DeleteDocuments(term); - public virtual void DeleteDocuments(Query q) + public virtual long DeleteDocuments(Query q) => IndexWriter.DeleteDocuments(q); - public virtual void Commit() + public virtual long Commit() => IndexWriter.Commit(); public virtual int NumDocs @@ -333,7 +339,7 @@ public virtual int NumDocs public virtual int MaxDoc => IndexWriter.MaxDoc; - public virtual void DeleteAll() + public virtual long DeleteAll() => IndexWriter.DeleteAll(); public virtual DirectoryReader GetReader() diff --git a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs index 1f5c4fab62..b5a52b2118 100644 --- a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs +++ b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs @@ -12,6 +12,7 @@ namespace Lucene.Net.Tests.Misc.Index { + //move me to Lucene.Net.Index public class TestIndexingSequenceNumbers : LuceneTestCase { [Test] @@ -21,7 +22,7 @@ public void TestBasic() IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); long a = w.AddDocument(new Document()); long b = w.AddDocument(new Document()); - assertTrue(b > a); + assertTrue(b >= a); w.Dispose(); dir.Dispose(); } diff --git a/src/Lucene.Net.Tests/Index/TestIndexWriterDelete.cs b/src/Lucene.Net.Tests/Index/TestIndexWriterDelete.cs index b60622718d..5e008f55b6 100644 --- a/src/Lucene.Net.Tests/Index/TestIndexWriterDelete.cs +++ b/src/Lucene.Net.Tests/Index/TestIndexWriterDelete.cs @@ -1429,8 +1429,8 @@ public virtual void TestTryDeleteDocument() iwc.SetOpenMode(OpenMode.APPEND); w = new IndexWriter(d, iwc); IndexReader r = DirectoryReader.Open(w, false); - Assert.IsTrue(w.TryDeleteDocument(r, 1)); - Assert.IsTrue(w.TryDeleteDocument(r.Leaves[0].Reader, 0)); + Assert.IsTrue(w.TryDeleteDocument(r, 1) != -1); + Assert.IsTrue(w.TryDeleteDocument(r.Leaves[0].Reader, 0) != -1); r.Dispose(); w.Dispose(); diff --git a/src/Lucene.Net.Tests/Index/TestRollingUpdates.cs b/src/Lucene.Net.Tests/Index/TestRollingUpdates.cs index 8ec8c04e81..9b52615e53 100644 --- a/src/Lucene.Net.Tests/Index/TestRollingUpdates.cs +++ b/src/Lucene.Net.Tests/Index/TestRollingUpdates.cs @@ -95,7 +95,7 @@ public virtual void TestRollingUpdates_Mem() { TopDocs hits = s.Search(new TermQuery(idTerm), 1); Assert.AreEqual(1, hits.TotalHits); - doUpdate = !w.TryDeleteDocument(r, hits.ScoreDocs[0].Doc); + doUpdate = w.TryDeleteDocument(r, hits.ScoreDocs[0].Doc) == -1; if (Verbose) { if (doUpdate) diff --git a/src/Lucene.Net.Tests/Index/TestTryDelete.cs b/src/Lucene.Net.Tests/Index/TestTryDelete.cs index 52e0e1cbb2..9a59a25294 100644 --- a/src/Lucene.Net.Tests/Index/TestTryDelete.cs +++ b/src/Lucene.Net.Tests/Index/TestTryDelete.cs @@ -82,8 +82,6 @@ public virtual void TestTryDeleteDocument() ReferenceManager mgr = new SearcherManager(writer, true, new SearcherFactory()); - TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer); - IndexSearcher searcher = mgr.Acquire(); TopDocs topDocs = searcher.Search(new TermQuery(new Term("foo", "0")), 100); @@ -93,12 +91,12 @@ public virtual void TestTryDeleteDocument() if (Random.NextBoolean()) { IndexReader r = DirectoryReader.Open(writer, true); - result = mgrWriter.TryDeleteDocument(r, 0); + result = writer.TryDeleteDocument(r, 0); r.Dispose(); } else { - result = mgrWriter.TryDeleteDocument(searcher.IndexReader, 0); + result = writer.TryDeleteDocument(searcher.IndexReader, 0); } // The tryDeleteDocument should have succeeded: @@ -136,10 +134,9 @@ public virtual void TestTryDeleteDocumentCloseAndReopen() TopDocs topDocs = searcher.Search(new TermQuery(new Term("foo", "0")), 100); Assert.AreEqual(1, topDocs.TotalHits); - TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer); - long result = mgrWriter.TryDeleteDocument(DirectoryReader.Open(writer, true), 0); + long result = writer.TryDeleteDocument(DirectoryReader.Open(writer, true), 0); - Assert.AreEqual(1, result); + Assert.True(result != -1); writer.Commit(); @@ -176,10 +173,9 @@ public virtual void TestDeleteDocuments() TopDocs topDocs = searcher.Search(new TermQuery(new Term("foo", "0")), 100); Assert.AreEqual(1, topDocs.TotalHits); - TrackingIndexWriter mgrWriter = new TrackingIndexWriter(writer); - long result = mgrWriter.DeleteDocuments(new TermQuery(new Term("foo", "0"))); + long result = writer.DeleteDocuments(new TermQuery(new Term("foo", "0"))); - Assert.AreEqual(1, result); + Assert.True(result != -1); // writer.Commit(); diff --git a/src/Lucene.Net.Tests/Index/TestTwoPhaseCommitTool.cs b/src/Lucene.Net.Tests/Index/TestTwoPhaseCommitTool.cs index a9286fe431..f956f7d090 100644 --- a/src/Lucene.Net.Tests/Index/TestTwoPhaseCommitTool.cs +++ b/src/Lucene.Net.Tests/Index/TestTwoPhaseCommitTool.cs @@ -46,12 +46,12 @@ public TwoPhaseCommitImpl(bool failOnPrepare, bool failOnCommit, bool failOnRoll this.failOnRollback = failOnRollback; } - public void PrepareCommit() + public long PrepareCommit() { - PrepareCommit(null); + return PrepareCommit(null); } - public virtual void PrepareCommit(IDictionary commitData) + public virtual long PrepareCommit(IDictionary commitData) { this.prepareCommitData = commitData; Assert.IsFalse(commitCalled, "commit should not have been called before all prepareCommit were"); @@ -59,14 +59,15 @@ public virtual void PrepareCommit(IDictionary commitData) { throw new IOException("failOnPrepare"); } + return 1; } - public void Commit() + public long Commit() { - Commit(null); + return Commit(null); } - public virtual void Commit(IDictionary commitData) + public virtual long Commit(IDictionary commitData) { this.commitData = commitData; commitCalled = true; @@ -74,6 +75,7 @@ public virtual void Commit(IDictionary commitData) { throw RuntimeException.Create("failOnCommit"); } + return 1; } public void Rollback() diff --git a/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs b/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs index 5f377cdf52..972a026eba 100644 --- a/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs +++ b/src/Lucene.Net.Tests/Search/TestControlledRealTimeReopenThread.cs @@ -61,7 +61,6 @@ namespace Lucene.Net.Search using Term = Lucene.Net.Index.Term; using TextField = Lucene.Net.Documents.TextField; using ThreadedIndexingAndSearchingTestCase = Lucene.Net.Index.ThreadedIndexingAndSearchingTestCase; - using TrackingIndexWriter = Lucene.Net.Index.TrackingIndexWriter; using Version = Lucene.Net.Util.LuceneVersion; [SuppressCodecs("SimpleText", "Memory", "Direct")] @@ -75,7 +74,7 @@ public class TestControlledRealTimeReopenThread : ThreadedIndexingAndSearchingTe // Is guaranteed to reflect deletes: private SearcherManager nrtDeletes; - private TrackingIndexWriter genWriter; + private IndexWriter genWriter; private ControlledRealTimeReopenThread nrtDeletesThread; private ControlledRealTimeReopenThread nrtNoDeletesThread; @@ -279,7 +278,7 @@ protected override void DoAfterWriter(TaskScheduler es) Console.WriteLine("TEST: make SearcherManager maxReopenSec=" + maxReopenSec + " minReopenSec=" + minReopenSec); } - genWriter = new TrackingIndexWriter(m_writer); + genWriter = m_writer; SearcherFactory sf = new SearcherFactoryAnonymousClass(this, es); @@ -400,9 +399,8 @@ public virtual void TestThreadStarvationNoDeleteNRTReader() CountdownEvent latch = new CountdownEvent(1); CountdownEvent signal = new CountdownEvent(1); - LatchedIndexWriter _writer = new LatchedIndexWriter(d, conf, latch, signal); - TrackingIndexWriter writer = new TrackingIndexWriter(_writer); - SearcherManager manager = new SearcherManager(_writer, false, null); + LatchedIndexWriter writer = new LatchedIndexWriter(d, conf, latch, signal); + SearcherManager manager = new SearcherManager(writer, false, null); Document doc = new Document(); doc.Add(NewTextField("test", "test", Field.Store.YES)); @@ -411,7 +409,7 @@ public virtual void TestThreadStarvationNoDeleteNRTReader() var t = new ThreadAnonymousClass(this, latch, signal, writer, manager); t.Start(); - _writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through + writer.waitAfterUpdate = true; // wait in addDocument to let some reopens go through long lastGen = writer.UpdateDocument(new Term("foo", "bar"), doc); // once this returns the doc is already reflected in the last reopen assertFalse(manager.IsSearcherCurrent()); // false since there is a delete in the queue @@ -446,7 +444,7 @@ public virtual void TestThreadStarvationNoDeleteNRTReader() } thread.Dispose(); thread.Join(); - IOUtils.Dispose(manager, _writer, d); + IOUtils.Dispose(manager, writer, d); } private sealed class ThreadAnonymousClass : ThreadJob @@ -455,10 +453,10 @@ private sealed class ThreadAnonymousClass : ThreadJob private readonly CountdownEvent latch; private readonly CountdownEvent signal; - private readonly TrackingIndexWriter writer; + private readonly IndexWriter writer; private readonly SearcherManager manager; - public ThreadAnonymousClass(TestControlledRealTimeReopenThread outerInstance, CountdownEvent latch, CountdownEvent signal, TrackingIndexWriter writer, SearcherManager manager) + public ThreadAnonymousClass(TestControlledRealTimeReopenThread outerInstance, CountdownEvent latch, CountdownEvent signal, IndexWriter writer, SearcherManager manager) { this.outerInstance = outerInstance; this.latch = latch; @@ -533,9 +531,9 @@ public LatchedIndexWriter(Directory d, IndexWriterConfig conf, CountdownEvent la } - public override void UpdateDocument(Term term, IEnumerable doc, Analyzer analyzer) + public override long UpdateDocument(Term term, IEnumerable doc, Analyzer analyzer) { - base.UpdateDocument(term, doc, analyzer); + long result = base.UpdateDocument(term, doc, analyzer); try { if (waitAfterUpdate) @@ -548,6 +546,7 @@ public override void UpdateDocument(Term term, IEnumerable doc, { throw new Util.ThreadInterruptedException(ie); } + return result; } } @@ -666,9 +665,8 @@ public virtual void TestCRTReopen() config.SetOpenMode(OpenMode.CREATE_OR_APPEND); IndexWriter iw = new IndexWriter(dir, config); SearcherManager sm = new SearcherManager(iw, true, new SearcherFactory()); - TrackingIndexWriter tiw = new TrackingIndexWriter(iw); ControlledRealTimeReopenThread controlledRealTimeReopenThread = - new ControlledRealTimeReopenThread(tiw, sm, maxStaleSecs, 0); + new ControlledRealTimeReopenThread(iw, sm, maxStaleSecs, 0); controlledRealTimeReopenThread.IsBackground = (true); controlledRealTimeReopenThread.Start(); @@ -687,7 +685,7 @@ public virtual void TestCRTReopen() d.Add(new TextField("count", i + "", Field.Store.NO)); d.Add(new TextField("content", content, Field.Store.YES)); long start = J2N.Time.NanoTime() / J2N.Time.MillisecondsPerNanosecond; // LUCENENET: Use NanoTime() rather than CurrentTimeMilliseconds() for more accurate/reliable results - long l = tiw.AddDocument(d); + long l = iw.AddDocument(d); controlledRealTimeReopenThread.WaitForGeneration(l); long wait = (J2N.Time.NanoTime() / J2N.Time.MillisecondsPerNanosecond) - start; // LUCENENET: Use NanoTime() rather than CurrentTimeMilliseconds() for more accurate/reliable results assertTrue("waited too long for generation " + wait, wait < (maxStaleSecs * 1000)); @@ -764,18 +762,17 @@ public void TestStraightForwardDemonstration() Analyzer standardAnalyzer = new StandardAnalyzer(TEST_VERSION_CURRENT); IndexWriterConfig indexConfig = new IndexWriterConfig(TEST_VERSION_CURRENT, standardAnalyzer); IndexWriter indexWriter = new IndexWriter(indexDir, indexConfig); - TrackingIndexWriter trackingWriter = new TrackingIndexWriter(indexWriter); Document doc = new Document(); doc.Add(new Int32Field("id", 1, Field.Store.YES)); doc.Add(new StringField("name", "Doc1", Field.Store.YES)); - trackingWriter.AddDocument(doc); + indexWriter.AddDocument(doc); SearcherManager searcherManager = new SearcherManager(indexWriter, applyAllDeletes: true, null); //Reopen SearcherManager every 1 secs via background thread if no thread waiting for newer generation. //Reopen SearcherManager after .2 secs if another thread IS waiting on a newer generation. - var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread(trackingWriter, searcherManager, 1, 0.2); + var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread(indexWriter, searcherManager, 1, 0.2); //Start() will start a seperate thread that will invoke the object's Run(). However, //calling Run() directly would execute that code on the current thread rather then a new thread @@ -802,7 +799,7 @@ public void TestStraightForwardDemonstration() doc = new Document(); doc.Add(new Int32Field("id", 2, Field.Store.YES)); doc.Add(new StringField("name", "Doc2", Field.Store.YES)); - trackingWriter.AddDocument(doc); + indexWriter.AddDocument(doc); //Demonstrate that we can only see the first doc because we haven't //waited 1 sec or called WaitForGeneration @@ -838,7 +835,7 @@ public void TestStraightForwardDemonstration() doc = new Document(); doc.Add(new Int32Field("id", 3, Field.Store.YES)); doc.Add(new StringField("name", "Doc3", Field.Store.YES)); - long generation = trackingWriter.AddDocument(doc); + long generation = indexWriter.AddDocument(doc); //Demonstrate that if we call WaitForGeneration our wait will be // .2 secs or less (the min interval we set earlier) and then we will @@ -919,17 +916,16 @@ Thread CreateWorker(int threadNum, ControlledRealTimeReopenThread Analyzer standardAnalyzer = new StandardAnalyzer(TEST_VERSION_CURRENT); IndexWriterConfig indexConfig = new IndexWriterConfig(TEST_VERSION_CURRENT, standardAnalyzer); IndexWriter indexWriter = new IndexWriter(indexDir, indexConfig); - TrackingIndexWriter trackingWriter = new TrackingIndexWriter(indexWriter); //Add two documents Document doc = new Document(); doc.Add(new Int32Field("id", 1, Field.Store.YES)); doc.Add(new StringField("name", "Doc1", Field.Store.YES)); - long generation = trackingWriter.AddDocument(doc); + long generation = indexWriter.AddDocument(doc); doc.Add(new Int32Field("id", 2, Field.Store.YES)); doc.Add(new StringField("name", "Doc3", Field.Store.YES)); - generation = trackingWriter.AddDocument(doc); + generation = indexWriter.AddDocument(doc); SearcherManager searcherManager = new SearcherManager(indexWriter, applyAllDeletes: true, null); @@ -937,7 +933,7 @@ Thread CreateWorker(int threadNum, ControlledRealTimeReopenThread //Reopen SearcherManager after .2 secs if another thread IS waiting on a newer generation. double maxRefreshSecs = 2.0; double minRefreshSecs = .2; - var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread(trackingWriter, searcherManager, maxRefreshSecs, minRefreshSecs); + var controlledRealTimeReopenThread = new ControlledRealTimeReopenThread(indexWriter, searcherManager, maxRefreshSecs, minRefreshSecs); //Start() will start a seperate thread that will invoke the object's Run(). However, //calling Run() directly would execute that code on the current thread rather then a new thread diff --git a/src/Lucene.Net/Index/DocumentsWriter.cs b/src/Lucene.Net/Index/DocumentsWriter.cs index 7a53b39437..0807e26f60 100644 --- a/src/Lucene.Net/Index/DocumentsWriter.cs +++ b/src/Lucene.Net/Index/DocumentsWriter.cs @@ -143,17 +143,20 @@ internal DocumentsWriter(IndexWriter writer, LiveIndexWriterConfig config, Direc flushControl = new DocumentsWriterFlushControl(this, config, writer.bufferedUpdatesStream); } - internal bool DeleteQueries(params Query[] queries) + internal long DeleteQueries(params Query[] queries) { UninterruptableMonitor.Enter(this); try { // TODO why is this synchronized? DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.AddDelete(queries); + long seqNo = deleteQueue.AddDelete(queries); flushControl.DoOnDelete(); - //nocommit long - return ApplyAllDeletes(deleteQueue); + if (ApplyAllDeletes(deleteQueue)) + { + seqNo = -seqNo; + } + return seqNo; } finally { @@ -185,15 +188,20 @@ internal long DeleteTerms(params Term[] terms) } } - internal bool UpdateNumericDocValue(Term term, string field, long? value) + internal long UpdateNumericDocValue(Term term, string field, long? value) { UninterruptableMonitor.Enter(this); try { DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.AddNumericUpdate(new NumericDocValuesUpdate(term, field, value)); + long seqNo = deleteQueue.AddNumericUpdate(new NumericDocValuesUpdate(term, field, value)); flushControl.DoOnDelete(); - return ApplyAllDeletes(deleteQueue); + if (ApplyAllDeletes(deleteQueue)) + { + seqNo = -seqNo; + } + + return seqNo; } finally { @@ -201,15 +209,20 @@ internal bool UpdateNumericDocValue(Term term, string field, long? value) } } - internal bool UpdateBinaryDocValue(Term term, string field, BytesRef value) + internal long UpdateBinaryDocValue(Term term, string field, BytesRef value) { UninterruptableMonitor.Enter(this); try { DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue; - deleteQueue.AddBinaryUpdate(new BinaryDocValuesUpdate(term, field, value)); + long seqNo = deleteQueue.AddBinaryUpdate(new BinaryDocValuesUpdate(term, field, value)); flushControl.DoOnDelete(); - return ApplyAllDeletes(deleteQueue); + if (ApplyAllDeletes(deleteQueue)) + { + seqNo = -seqNo; + } + + return seqNo; } finally { @@ -335,6 +348,8 @@ internal void LockAndAbortAll(IndexWriter indexWriter) AbortThreadState(perThread, newFilesSet); } deleteQueue.Clear(); + // jump over any possible in flight ops: + deleteQueue.seqNo.AddAndGet(perThreadPool.NumThreadStatesActive + 1); flushControl.AbortPendingFlushes(newFilesSet); PutEvent(new DeleteNewFilesEvent(newFilesSet)); flushControl.WaitForFlush(); @@ -525,12 +540,13 @@ private void EnsureInitialized(ThreadState state) } } - internal bool UpdateDocuments(IEnumerable> docs, Analyzer analyzer, Term delTerm) + internal long UpdateDocuments(IEnumerable> docs, Analyzer analyzer, Term delTerm) { bool hasEvents = PreUpdate(); ThreadState perThread = flushControl.ObtainAndLock(); DocumentsWriterPerThread flushingDWPT; + long seqNo; try { @@ -545,8 +561,7 @@ internal bool UpdateDocuments(IEnumerable> docs, An int dwptNumDocs = dwpt.NumDocsInRAM; try { - int docCount = dwpt.UpdateDocuments(docs, analyzer, delTerm); - numDocsInRAM.AddAndGet(docCount); + seqNo = dwpt.UpdateDocuments(docs, analyzer, delTerm); } finally { @@ -568,7 +583,14 @@ internal bool UpdateDocuments(IEnumerable> docs, An perThreadPool.Release(perThread); } - return PostUpdate(flushingDWPT, hasEvents); + if (PostUpdate(flushingDWPT, hasEvents)) + { + return -seqNo; + } + else + { + return seqNo; + } } internal long UpdateDocument(IEnumerable doc, Analyzer analyzer, Term delTerm) @@ -578,7 +600,7 @@ internal long UpdateDocument(IEnumerable doc, Analyzer analyzer ThreadState perThread = flushControl.ObtainAndLock(); DocumentsWriterPerThread flushingDWPT; - long seqno; + long seqNo; try { if (!perThread.IsActive) @@ -592,7 +614,7 @@ internal long UpdateDocument(IEnumerable doc, Analyzer analyzer int dwptNumDocs = dwpt.NumDocsInRAM; try { - seqno = dwpt.UpdateDocument(doc, analyzer, delTerm); + seqNo = dwpt.UpdateDocument(doc, analyzer, delTerm); numDocsInRAM.IncrementAndGet(); } finally @@ -617,11 +639,11 @@ internal long UpdateDocument(IEnumerable doc, Analyzer analyzer if (PostUpdate(flushingDWPT, hasEvents)) { - return -seqno; + return -seqNo; } else { - return seqno; + return seqNo; } } diff --git a/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs b/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs index f2eadb1a4a..780eefe796 100644 --- a/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs +++ b/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs @@ -74,8 +74,6 @@ internal sealed class DocumentsWriterDeleteQueue private readonly BufferedUpdates globalBufferedUpdates; - private long gen; - /* only acquired to update the global deletes */ private readonly ReentrantLock globalBufferLock = new ReentrantLock(); @@ -121,16 +119,18 @@ internal long AddDelete(params Term[] terms) return seqNo; } - internal void AddNumericUpdate(NumericDocValuesUpdate update) + internal long AddNumericUpdate(NumericDocValuesUpdate update) { - Add(new NumericUpdateNode(update)); + long seqNo = Add(new NumericUpdateNode(update)); TryApplyGlobalSlice(); + return seqNo; } - internal void AddBinaryUpdate(BinaryDocValuesUpdate update) + internal long AddBinaryUpdate(BinaryDocValuesUpdate update) { - Add(new BinaryUpdateNode(update)); + long seqNo = Add(new BinaryUpdateNode(update)); TryApplyGlobalSlice(); + return seqNo; } /// @@ -167,41 +167,9 @@ internal long Add(Node newNode) */ try { - while (true) - { - Node currentTail = this.tail; - Node tailNext = currentTail.next; - if (tail == currentTail) - { - if (tailNext != null) - { - /* - * we are in intermediate state here. the tails next pointer has been - * advanced but the tail itself might not be updated yet. help to - * advance the tail and try again updating it. - */ - Interlocked.CompareExchange(ref tail, tailNext, currentTail); // can fail - } - else - { - /* - * we are in quiescent state and can try to insert the new node to the - * current tail if we fail to insert we just retry the operation since - * somebody else has already added its item - */ - if (currentTail.CasNext(null, newNode)) - { - /* - * now that we are done we need to advance the tail while another - * thread could have advanced it already so we can ignore the return - * type of this CAS call - */ - Interlocked.CompareExchange(ref tail, newNode, currentTail); - return seqNo.GetAndIncrement(); - } - } - } - } + tail.next = newNode; + tail = newNode; + return seqNo.GetAndIncrement(); } finally { diff --git a/src/Lucene.Net/Index/DocumentsWriterPerThread.cs b/src/Lucene.Net/Index/DocumentsWriterPerThread.cs index 4d3cfc8595..1682150889 100644 --- a/src/Lucene.Net/Index/DocumentsWriterPerThread.cs +++ b/src/Lucene.Net/Index/DocumentsWriterPerThread.cs @@ -330,7 +330,7 @@ public virtual long UpdateDocument(IEnumerable doc, Analyzer an return FinishDocument(delTerm); } - public virtual int UpdateDocuments(IEnumerable> docs, Analyzer analyzer, Term delTerm) + public virtual long UpdateDocuments(IEnumerable> docs, Analyzer analyzer, Term delTerm) { if (Debugging.AssertsEnabled) { @@ -396,12 +396,19 @@ public virtual int UpdateDocuments(IEnumerable> doc // Apply delTerm only after all indexing has // succeeded, but apply it only to docs prior to when // this batch started: + long seqNo; if (delTerm != null) { - deleteQueue.Add(delTerm, deleteSlice); + seqNo = deleteQueue.Add(delTerm, deleteSlice); if (Debugging.AssertsEnabled) Debugging.Assert(deleteSlice.IsTailItem(delTerm), "expected the delete term as the tail item"); deleteSlice.Apply(pendingUpdates, numDocsInRAM - docCount); + return seqNo; } + else + { + seqNo = deleteQueue.seqNo; + } + return seqNo; } finally { @@ -419,8 +426,6 @@ public virtual int UpdateDocuments(IEnumerable> doc } docState.Clear(); } - - return docCount; } [MethodImpl(MethodImplOptions.NoInlining)] diff --git a/src/Lucene.Net/Index/IndexWriter.cs b/src/Lucene.Net/Index/IndexWriter.cs index 8d7ca81231..d6ddbebeab 100644 --- a/src/Lucene.Net/Index/IndexWriter.cs +++ b/src/Lucene.Net/Index/IndexWriter.cs @@ -1530,6 +1530,16 @@ public virtual int MaxDoc } } + // nocommit javadocs + public virtual long LastSequenceNumber + { + get + { + EnsureOpen(); + return docWriter.deleteQueue.seqNo; + } + } + /// /// Returns true if this index has deletions (including /// buffered deletions). Note that this will return true @@ -1675,9 +1685,9 @@ public virtual long AddDocument(IEnumerable doc, Analyzer analy /// /// if the index is corrupt /// if there is a low-level IO error - public virtual void AddDocuments(IEnumerable> docs) + public virtual long AddDocuments(IEnumerable> docs) { - AddDocuments(docs, analyzer); + return AddDocuments(docs, analyzer); } /// @@ -1690,9 +1700,9 @@ public virtual void AddDocuments(IEnumerable> docs) /// /// if the index is corrupt /// if there is a low-level IO error - public virtual void AddDocuments(IEnumerable> docs, Analyzer analyzer) + public virtual long AddDocuments(IEnumerable> docs, Analyzer analyzer) { - UpdateDocuments(null, docs, analyzer); + return UpdateDocuments(null, docs, analyzer); } /// @@ -1706,9 +1716,9 @@ public virtual void AddDocuments(IEnumerable> docs, /// if the index is corrupt /// if there is a low-level IO error /// - public virtual void UpdateDocuments(Term delTerm, IEnumerable> docs) + public virtual long UpdateDocuments(Term delTerm, IEnumerable> docs) { - UpdateDocuments(delTerm, docs, analyzer); + return UpdateDocuments(delTerm, docs, analyzer); } /// @@ -1723,7 +1733,7 @@ public virtual void UpdateDocuments(Term delTerm, IEnumerable if the index is corrupt /// if there is a low-level IO error /// - public virtual void UpdateDocuments(Term delTerm, IEnumerable> docs, Analyzer analyzer) + public virtual long UpdateDocuments(Term delTerm, IEnumerable> docs, Analyzer analyzer) { EnsureOpen(); try @@ -1731,11 +1741,14 @@ public virtual void UpdateDocuments(Term delTerm, IEnumerable @@ -1791,9 +1806,9 @@ public virtual long DeleteDocuments(Term term) /// . If the /// provided is an NRT reader obtained from this /// writer, and its segment has not been merged away, then - /// the delete succeeds and this method returns true; else, it - /// returns false the caller must then separately delete by - /// Term or Query. + /// the delete succeeds and this method returns a valid (> 0) sequence + /// number; else, it returns -1 and the caller must then + /// separately delete by Term or Query. /// /// NOTE: this method can only delete documents /// visible to the currently open NRT reader. If you need @@ -1801,7 +1816,7 @@ public virtual long DeleteDocuments(Term term) /// reader you must use the other DeleteDocument() methods /// (e.g., ). /// - public virtual bool TryDeleteDocument(IndexReader readerIn, int docID) + public virtual long TryDeleteDocument(IndexReader readerIn, int docID) { UninterruptableMonitor.Enter(this); try @@ -1864,7 +1879,7 @@ public virtual bool TryDeleteDocument(IndexReader readerIn, int docID) Changed(); } //System.out.println(" yes " + info.info.name + " " + docID); - return true; + return docWriter.deleteQueue.seqNo.GetAndIncrement(); } finally { @@ -1880,7 +1895,7 @@ public virtual bool TryDeleteDocument(IndexReader readerIn, int docID) { //System.out.println(" no seg " + info.info.name + " " + docID); } - return false; + return -1; } finally { @@ -1932,20 +1947,24 @@ public virtual long DeleteDocuments(params Term[] terms) /// the query to identify the documents to be deleted /// if the index is corrupt /// if there is a low-level IO error - public virtual void DeleteDocuments(Query query) + public virtual long DeleteDocuments(Query query) { EnsureOpen(); try { - if (docWriter.DeleteQueries(query)) + long seqNo = docWriter.DeleteQueries(query); + if (seqNo < 0) { + seqNo = -seqNo; ProcessEvents(true, false); } + return seqNo; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "DeleteDocuments(Query)"); } + return -1; } /// @@ -1960,20 +1979,24 @@ public virtual void DeleteDocuments(Query query) /// to be deleted /// if the index is corrupt /// if there is a low-level IO error - public virtual void DeleteDocuments(params Query[] queries) + public virtual long DeleteDocuments(params Query[] queries) { EnsureOpen(); try { - if (docWriter.DeleteQueries(queries)) + long seqNo = docWriter.DeleteQueries(queries); + if (seqNo < 0) { + seqNo = -seqNo; ProcessEvents(true, false); } + return seqNo; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "DeleteDocuments(Query..)"); } + return -1; } /// @@ -1992,10 +2015,10 @@ public virtual void DeleteDocuments(params Query[] queries) /// the document to be added /// if the index is corrupt /// if there is a low-level IO error - public virtual void UpdateDocument(Term term, IEnumerable doc) + public virtual long UpdateDocument(Term term, IEnumerable doc) { EnsureOpen(); - UpdateDocument(term, doc, analyzer); + return UpdateDocument(term, doc, analyzer); } /// @@ -2073,7 +2096,7 @@ public virtual long UpdateDocument(Term term, IEnumerable doc, /// if the index is corrupt /// /// if there is a low-level IO error - public virtual void UpdateNumericDocValue(Term term, string field, long? value) + public virtual long UpdateNumericDocValue(Term term, string field, long? value) { EnsureOpen(); if (!globalFieldNumberMap.Contains(field, DocValuesType.NUMERIC)) @@ -2082,14 +2105,18 @@ public virtual void UpdateNumericDocValue(Term term, string field, long? value) } try { - if (docWriter.UpdateNumericDocValue(term, field, value)) + long seqNo = docWriter.UpdateNumericDocValue(term, field, value); + if (seqNo < 0) { + seqNo = -seqNo; ProcessEvents(true, false); } + return seqNo; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "UpdateNumericDocValue"); + return -1; } } @@ -2119,7 +2146,7 @@ public virtual void UpdateNumericDocValue(Term term, string field, long? value) /// if the index is corrupt /// /// if there is a low-level IO error - public virtual void UpdateBinaryDocValue(Term term, string field, BytesRef value) + public virtual long UpdateBinaryDocValue(Term term, string field, BytesRef value) { EnsureOpen(); if (!globalFieldNumberMap.Contains(field, DocValuesType.BINARY)) @@ -2128,14 +2155,18 @@ public virtual void UpdateBinaryDocValue(Term term, string field, BytesRef value } try { - if (docWriter.UpdateBinaryDocValue(term, field, value)) + long seqNo = docWriter.UpdateBinaryDocValue(term, field, value); + if (seqNo < 0) { + seqNo = -seqNo; ProcessEvents(true, false); } + return seqNo; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "UpdateBinaryDocValue"); + return -1; } } @@ -2944,7 +2975,7 @@ private void RollbackInternal() /// methods, they may receive /// s. /// - public virtual void DeleteAll() + public virtual long DeleteAll() { EnsureOpen(); // Remove any buffered docs @@ -2992,10 +3023,15 @@ public virtual void DeleteAll() segmentInfos.Changed(); globalFieldNumberMap.Clear(); success = true; + return docWriter.deleteQueue.seqNo; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "DeleteAll"); + + // dead code but javac disagrees + return -1; + } finally { @@ -3373,7 +3409,7 @@ private IEnumerable AcquireWriteLocks(params Directory[] dirs) /// if there is a low-level IO error /// if we were unable to /// acquire the write lock in at least one directory - public virtual void AddIndexes(params Directory[] dirs) + public virtual long AddIndexes(params Directory[] dirs) { EnsureOpen(); @@ -3502,6 +3538,8 @@ public virtual void AddIndexes(params Directory[] dirs) IOUtils.DisposeWhileHandlingException(locks); } } + MaybeMerge(); + return docWriter.deleteQueue.seqNo; } /// @@ -3542,7 +3580,7 @@ public virtual void AddIndexes(params Directory[] dirs) /// if the index is corrupt /// /// if there is a low-level IO error - public virtual void AddIndexes(params IndexReader[] readers) + public virtual long AddIndexes(params IndexReader[] readers) { EnsureOpen(); int numDocs = 0; @@ -3578,7 +3616,8 @@ public virtual void AddIndexes(params IndexReader[] readers) if (!merger.ShouldMerge) { - return; + // no need to increment: + return docWriter.deleteQueue.seqNo; } MergeState mergeState; @@ -3618,7 +3657,8 @@ public virtual void AddIndexes(params IndexReader[] readers) if (stopMerges) { deleter.DeleteNewFiles(infoPerCommit.GetFiles()); - return; + // no need to increment: + return docWriter.deleteQueue.seqNo; } EnsureOpen(); useCompoundFile = mergePolicy.UseCompoundFile(segmentInfos, infoPerCommit); @@ -3688,7 +3728,8 @@ public virtual void AddIndexes(params IndexReader[] readers) if (stopMerges) { deleter.DeleteNewFiles(info.GetFiles()); - return; + // no need to increment: + return docWriter.deleteQueue.seqNo; } EnsureOpen(); segmentInfos.Add(infoPerCommit); @@ -3703,6 +3744,9 @@ public virtual void AddIndexes(params IndexReader[] readers) { HandleOOM(oom, "AddIndexes(IndexReader...)"); } + MaybeMerge(); + // no need to increment: + return docWriter.deleteQueue.seqNo; } /// diff --git a/src/Lucene.Net/Index/StandardDirectoryReader.cs b/src/Lucene.Net/Index/StandardDirectoryReader.cs index 19776dd153..03a982bf33 100644 --- a/src/Lucene.Net/Index/StandardDirectoryReader.cs +++ b/src/Lucene.Net/Index/StandardDirectoryReader.cs @@ -525,7 +525,7 @@ internal ReaderCommit(SegmentInfos infos, Directory dir) public override string ToString() { - return "DirectoryReader.ReaderCommit(" + segmentsFileName + ")"; + return "StandardDirectoryReader.ReaderCommit(" + segmentsFileName + " files=" + files + ")"; } public override int SegmentCount => segmentCount; diff --git a/src/Lucene.Net/Index/TrackingIndexWriter.cs b/src/Lucene.Net/Index/TrackingIndexWriter.cs deleted file mode 100644 index 61501a2d32..0000000000 --- a/src/Lucene.Net/Index/TrackingIndexWriter.cs +++ /dev/null @@ -1,262 +0,0 @@ -using J2N.Threading.Atomic; -using System.Collections.Generic; - -namespace Lucene.Net.Index -{ - /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - using Analyzer = Lucene.Net.Analysis.Analyzer; - using Directory = Lucene.Net.Store.Directory; - using Query = Lucene.Net.Search.Query; - - /// - /// Class that tracks changes to a delegated - /// , used by - /// to ensure specific - /// changes are visible. Create this class (passing your - /// ), and then pass this class to - /// . - /// Be sure to make all changes via the - /// , otherwise - /// won't know about the changes. - /// - /// @lucene.experimental - /// - - // nocommit removeme - public class TrackingIndexWriter - { - private readonly IndexWriter writer; - private readonly AtomicInt64 indexingGen = new AtomicInt64(1); - - /// - /// Create a wrapping the - /// provided . - /// - public TrackingIndexWriter(IndexWriter writer) - { - this.writer = writer; - } - - /// - /// Calls - /// - /// and returns the generation that reflects this change. - /// - public virtual long UpdateDocument(Term t, IEnumerable d, Analyzer a) - { - writer.UpdateDocument(t, d, a); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// and - /// returns the generation that reflects this change. - /// - public virtual long UpdateDocument(Term t, IEnumerable d) - { - writer.UpdateDocument(t, d); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// - /// and returns the generation that reflects this change. - /// - public virtual long UpdateDocuments(Term t, IEnumerable> docs, Analyzer a) - { - writer.UpdateDocuments(t, docs, a); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// and returns - /// the generation that reflects this change. - /// - public virtual long UpdateDocuments(Term t, IEnumerable> docs) - { - writer.UpdateDocuments(t, docs); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls and - /// returns the generation that reflects this change. - /// - public virtual long DeleteDocuments(Term t) - { - writer.DeleteDocuments(t); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls and - /// returns the generation that reflects this change. - /// - public virtual long DeleteDocuments(params Term[] terms) - { - writer.DeleteDocuments(terms); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls and - /// returns the generation that reflects this change. - /// - public virtual long DeleteDocuments(Query q) - { - writer.DeleteDocuments(q); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// and returns the generation that reflects this change. - /// - public virtual long DeleteDocuments(params Query[] queries) - { - writer.DeleteDocuments(queries); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls and returns the - /// generation that reflects this change. - /// - public virtual long DeleteAll() - { - writer.DeleteAll(); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// and - /// returns the generation that reflects this change. - /// - public virtual long AddDocument(IEnumerable d, Analyzer a) - { - writer.AddDocument(d, a); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// and - /// returns the generation that reflects this change. - /// - public virtual long AddDocuments(IEnumerable> docs, Analyzer a) - { - writer.AddDocuments(docs, a); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// and returns the generation that reflects this change. - /// - public virtual long AddDocument(IEnumerable d) - { - writer.AddDocument(d); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls and - /// returns the generation that reflects this change. - /// - public virtual long AddDocuments(IEnumerable> docs) - { - writer.AddDocuments(docs); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls and - /// returns the generation that reflects this change. - /// - public virtual long AddIndexes(params Directory[] dirs) - { - writer.AddIndexes(dirs); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Calls - /// and returns the generation that reflects this change. - /// - public virtual long AddIndexes(params IndexReader[] readers) - { - writer.AddIndexes(readers); - // Return gen as of when indexing finished: - return indexingGen; - } - - /// - /// Return the current generation being indexed. - public virtual long Generation => indexingGen; - - /// - /// Return the wrapped . - public virtual IndexWriter IndexWriter => writer; - - /// - /// Return and increment current gen. - /// - /// @lucene.internal - /// - public virtual long GetAndIncrementGeneration() - { - return indexingGen.GetAndIncrement(); - } - - /// - /// Cals - /// and - /// returns the generation that reflects this change. - /// - public virtual long TryDeleteDocument(IndexReader reader, int docID) - { - if (writer.TryDeleteDocument(reader, docID)) - { - return indexingGen; - } - else - { - return -1; - } - } - } -} \ No newline at end of file diff --git a/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs b/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs index 8c3e854bc7..fd42ebed11 100644 --- a/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs +++ b/src/Lucene.Net/Search/ControlledRealTimeReopenThread.cs @@ -24,16 +24,12 @@ namespace Lucene.Net.Search * limitations under the License. */ - using TrackingIndexWriter = Lucene.Net.Index.TrackingIndexWriter; + using IndexWriter = Lucene.Net.Index.IndexWriter; /// /// Utility class that runs a thread to manage periodic /// reopens of a , with methods to wait for a specific - /// index changes to become visible. To use this class you - /// must first wrap your with a - /// and always use it to make changes - /// to the index, saving the returned generation. Then, - /// when a given search request needs to see a specific + /// index changes to become visible. When a given search request needs to see a specific /// index change, call the to wait for /// that change to be visible. Note that this will only /// scale well if most searches do not need to wait for a @@ -48,7 +44,7 @@ public class ControlledRealTimeReopenThread : ThreadJob, IDisposable private readonly ReferenceManager manager; private readonly long targetMaxStaleNS; private readonly long targetMinStaleNS; - private readonly TrackingIndexWriter writer; + private readonly IndexWriter writer; private volatile bool finish; private long waitingGen; private long searchingGen; @@ -74,7 +70,7 @@ public class ControlledRealTimeReopenThread : ThreadJob, IDisposable /// on how quickly reopens may occur, when a caller /// is waiting for a specific generation to /// become visible. - public ControlledRealTimeReopenThread(TrackingIndexWriter writer, ReferenceManager manager, double targetMaxStaleSec, double targetMinStaleSec) + public ControlledRealTimeReopenThread(IndexWriter writer, ReferenceManager manager, double targetMaxStaleSec, double targetMinStaleSec) { if (targetMaxStaleSec < targetMinStaleSec) { @@ -211,7 +207,7 @@ public virtual bool WaitForGeneration(long targetGen, int maxMS) // syncronize lock and c# has no similar primitive. So we must handle locking a // bit differently here to mimic that affect. - long curGen = writer.Generation; + long curGen = writer.LastSequenceNumber; if (targetGen > curGen) { throw new ArgumentException("targetGen=" + targetGen + " was never returned by the ReferenceManager instance (current gen=" + curGen + ")"); @@ -322,7 +318,7 @@ public override void Run() // Save the gen as of when we started the reopen; the // listener (HandleRefresh above) copies this to // searchingGen once the reopen completes: - refreshStartGen.Value = writer.GetAndIncrementGeneration(); + refreshStartGen.Value = writer.LastSequenceNumber; try { manager.MaybeRefreshBlocking(); From 487597c54708881f251c88fcff504bd380ade831 Mon Sep 17 00:00:00 2001 From: Jeevananthan-23 Date: Sun, 5 Nov 2023 00:22:54 +0530 Subject: [PATCH 3/5] seqno: adding more test and bug fix in concurrency issues --- .../Index/TestIndexingSequenceNumbers.cs | 762 ++++++++++++++---- .../Index/TestDocumentsWriterDeleteQueue.cs | 6 +- .../Index/TestIndexWriterConfig.cs | 2 +- src/Lucene.Net/Index/BufferedUpdates.cs | 5 +- src/Lucene.Net/Index/DocumentsWriter.cs | 2 +- .../Index/DocumentsWriterDeleteQueue.cs | 82 +- .../Index/DocumentsWriterFlushControl.cs | 9 +- .../Index/DocumentsWriterPerThread.cs | 30 +- src/Lucene.Net/Index/IndexWriter.cs | 35 +- 9 files changed, 717 insertions(+), 216 deletions(-) diff --git a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs index b5a52b2118..b716fcf8cd 100644 --- a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs +++ b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs @@ -1,9 +1,13 @@ using J2N.Threading; +using J2N.Threading.Atomic; using Lucene.Net.Analysis; +using Lucene.Net.Diagnostics; using Lucene.Net.Documents; using Lucene.Net.Index; using Lucene.Net.Search; using Lucene.Net.Store; +using Lucene.Net.Support; +using Lucene.Net.Support.Threading; using Lucene.Net.Util; using NUnit.Framework; using System; @@ -22,7 +26,7 @@ public void TestBasic() IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); long a = w.AddDocument(new Document()); long b = w.AddDocument(new Document()); - assertTrue(b >= a); + assertTrue(b > a); w.Dispose(); dir.Dispose(); } @@ -33,7 +37,7 @@ public void TestAfterRefresh() Directory dir = NewDirectory(); IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); long a = w.AddDocument(new Document()); - DirectoryReader.Open(w,true).Dispose(); + DirectoryReader.Open(w, true).Dispose(); long b = w.AddDocument(new Document()); assertTrue(b > a); w.Dispose(); @@ -53,7 +57,7 @@ public void TestAfterCommit() dir.Dispose(); } - /* [Test] + [Test] public void TestStressUpdateSameID() { int iters = AtLeast(100); @@ -61,39 +65,45 @@ public void TestStressUpdateSameID() { Directory dir = NewDirectory(); // nocommit use RandomIndexWriter - IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)))); - ThreadJob[] threads = new ThreadJob[TestUtil.NextInt32(Random, 2, 5)]; - CountdownEvent startingGun = new CountdownEvent(1); - long[] seqNos = new long[threads.Length]; - Term id = new Term("id", "id"); - // multiple threads update the same document - for (int i = 0; i < threads.Length; i++) - { - int threadID = i; - //threads[i] = new Thread() - //{ - *//*public void run() + IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); + ThreadJob[] threads = new ThreadJob[TestUtil.NextInt32(Random, 2, 5)]; + CountdownEvent startingGun = new CountdownEvent(1); + long[] seqNos = new long[threads.Length]; + Term id = new Term("id", "id"); + // multiple threads update the same document + for (int i = 0; i < threads.Length; i++) + { + int threadID = i; + threads[i] = new ThreadJob(() => + { + try + { + Document doc = new Document(); + doc.Add(new StoredField("thread", threadID)); + doc.Add(new StringField("id", "id", Field.Store.NO)); + startingGun.Wait(); + for (int j = 0; j < 100; j++) { - try + if (Random.nextBoolean()) { - Document doc = new Document(); - doc.add(new StoredField("thread", threadID)); - doc.add(new StringField("id", "id", Field.Store.NO)); - startingGun.await(); - for (int j = 0; j < 100; j++) - { - seqNos[threadID] = w.updateDocument(id, doc); - } + seqNos[threadID] = w.UpdateDocument(id, doc); } - catch (Exception e) + else { - throw new RuntimeException(e); + List docs = new(); + docs.Add(doc); + seqNos[threadID] = w.UpdateDocuments(id, docs); } } - }; - threads[i].start();*//* - //}; - startingGun.CountDown(); + } + catch (Exception e) when (e.IsException()) + { + throw RuntimeException.Create(e); + } + }); + threads[i].Start(); + } + startingGun.Signal(); foreach (ThreadJob thread in threads) { thread.Join(); @@ -101,7 +111,7 @@ public void TestStressUpdateSameID() // now confirm that the reported sequence numbers agree with the index: int maxThread = 0; - var allSeqNos = new HashSet(); + HashSet allSeqNos = new HashSet(); for (int i = 0; i < threads.Length; i++) { allSeqNos.add(seqNos[i]); @@ -112,211 +122,625 @@ public void TestStressUpdateSameID() } // make sure all sequence numbers were different assertEquals(threads.Length, allSeqNos.size()); - DirectoryReader r = DirectoryReader.Open(w); + DirectoryReader r = w.GetReader(); IndexSearcher s = NewSearcher(r); TopDocs hits = s.Search(new TermQuery(id), 1); assertEquals(1, hits.TotalHits); Document doc = r.Document(hits.ScoreDocs[0].Doc); - assertEquals(maxThread, doc.GetField("thread").NumericValue.intValue()); + assertEquals(maxThread, doc.GetField("thread").GetInt32Value()); r.Dispose(); w.Dispose(); dir.Dispose(); } } - static class Operation - { - // 0 = update, 1 = delete, 2 = commit - static byte what; - static int id; - static int threadID; - static long seqNo; - } + private sealed class Operation + { + // 0 = update, 1 = delete, 2 = commit, 3 = add + internal byte what; + internal int id; + internal int threadID; + internal long seqNo; + } - public void testStressConcurrentCommit() - { - int opCount = AtLeast(10000); - int idCount = TestUtil.NextInt32(Random, 10, 1000); - - Directory dir = NewDirectory(); - // nocommit use RandomIndexWriter - IndexWriterConfig iwc = NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)); - iwc.IndexDeletionPolicy = (NoDeletionPolicy.INSTANCE); - IndexWriter w = new IndexWriter(dir, iwc); - int numThreads = TestUtil.NextInt32(Random, 2, 5); - Thread[] threads = new Thread[numThreads]; - //System.out.println("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length); - CountDownLatch startingGun = new CountDownLatch(1); - List> threadOps = new ArrayList<>(); - - Object commitLock = new Object(); - List commits = new ArrayList<>(); - AtomicInteger opsSinceCommit = new AtomicInteger(); - - // multiple threads update the same set of documents, and we randomly commit - for (int i = 0; i < threads.length; i++) + [Test] + public void TestStressConcurrentCommit() { - List ops = new ArrayList<>(); - threadOps.add(ops); - int threadID = i; - threads[i] = new Thread() { - public void run() + int opCount = AtLeast(10000); + int idCount = TestUtil.NextInt32(Random, 10, 1000); + + Directory dir = NewDirectory(); + // nocommit use RandomIndexWriter + IndexWriterConfig iwc = NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)); + iwc.IndexDeletionPolicy = NoDeletionPolicy.INSTANCE; + IndexWriter w = new IndexWriter(dir, iwc); + int numThreads = TestUtil.NextInt32(Random, 2, 5); + ThreadJob[] threads = new ThreadJob[numThreads]; + // Console.WriteLine("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length); + CountdownEvent startingGun = new CountdownEvent(1); + List> threadOps = new(); + + Object commitLock = new Object(); + List commits = new(); + AtomicInt32 opsSinceCommit = new(); + + // multiple threads update the same set of documents, and we randomly commit + for (int i = 0; i < threads.Length; i++) { - try + List ops = new(); + threadOps.Add(ops); + int threadID = i; + threads[i] = new ThreadJob(() => { - startingGun.await(); - for (int i = 0; i < opCount; i++) + try { - Operation op = new Operation(); - op.threadID = threadID; - if (random().nextInt(500) == 17) + startingGun.Wait(); + for (int j = 0; j < opCount; j++) { - op.what = 2; - synchronized(commitLock) { - // nocommit why does this sometimes fail :) - //if (w.hasUncommittedChanges()) { - if (opsSinceCommit.get() > numThreads) + Operation op = new(); + op.threadID = threadID; + if (new Random().Next(500) == 17) + { + op.what = 0; + UninterruptableMonitor.Enter(commitLock); + try { - op.seqNo = w.commit(); - commits.add(op); - opsSinceCommit.set(0); + op.seqNo = w.Commit(); + if (op.seqNo != -1) + { + commits.Add(op); + } + } + finally + { + UninterruptableMonitor.Exit(commitLock); + } + } + else + { + op.id = new Random().Next(idCount); + Term idTerm = new Term("id", op.id.ToString()); + if (new Random().Next(10) == 1) + { + op.what = 1; + op.seqNo = new Random().Next(2) == 0 ? w.DeleteDocuments(idTerm) : w.DeleteDocuments(new TermQuery(idTerm)); + } + else + { + Document doc = new Document(); + doc.Add(new StoredField("thread", threadID)); + doc.Add(new StringField("id", op.id.ToString(), Field.Store.NO)); + if (new Random().Next(2) == 0) + { + List docs = new List { doc }; + op.seqNo = w.UpdateDocuments(idTerm, docs); + } + else + { + op.seqNo = w.UpdateDocument(idTerm, doc); + op.what = 0; + } + ops.Add(op); } - //System.out.println("done commit seqNo=" + op.seqNo); } } - else + } + catch (Exception e) + { + throw RuntimeException.Create(e.Message, e); + } + }); + threads[i].Start(); + } + startingGun.Signal(); + foreach (ThreadJob thread in threads) + { + thread.Join(); + } + + Operation commitOp = new(); + UninterruptableMonitor.Enter(commitLock); + try + { + commitOp.seqNo = w.Commit(); + commits.Add(commitOp); + } + finally + { + UninterruptableMonitor.Exit(commitLock); + } + + IList indexCommits = DirectoryReader.ListCommits(dir); + assertEquals(commits.size(), indexCommits.size()); + + int[] expectedThreadIDs = new int[idCount]; + long[] seqNos = new long[idCount]; + + // Console.WriteLine("TEST: " + commits.size() + " commits"); + for (int i = 0; i < commits.size(); i++) + { + // this commit point should reflect all operations <= this seqNo + long commitSeqNo = commits[i].seqNo; + // Console.WriteLine(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits[i)); + + Arrays.Fill(expectedThreadIDs, -1); + Arrays.Fill(seqNos, 0); + + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + long lastSeqNo = 0; + foreach (Operation op in threadOps[threadID]) + { + if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) { - op.id = random().nextInt(idCount); - Term idTerm = new Term("id", "" + op.id); - if (random().nextInt(10) == 1) + seqNos[op.id] = op.seqNo; + if (op.what == 0) { - op.what = 1; - op.seqNo = w.deleteDocuments(idTerm); + expectedThreadIDs[op.id] = threadID; } else { - Document doc = new Document(); - doc.add(new StoredField("thread", threadID)); - doc.add(new StringField("id", "" + op.id, Field.Store.NO)); - op.seqNo = w.UpdateDocument(idTerm, doc); - op.what = 2; + expectedThreadIDs[op.id] = -1; } - ops.Add(op); - opsSinceCommit.getAndIncrement(); } + assertTrue(op.seqNo > lastSeqNo); + lastSeqNo = op.seqNo; } } - catch (Exception e) + + DirectoryReader r = DirectoryReader.Open(indexCommits[i]); + IndexSearcher s = new IndexSearcher(r); + + for (int id = 0; id < idCount; id++) { - throw new RuntimeException(e); + // Console.WriteLine("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]); + TopDocs hits = s.Search(new TermQuery(new Term("id", "" + id)), 1); + + if (expectedThreadIDs[id] != -1) + { + assertEquals(1, hits.TotalHits); + Document doc = r.Document(hits.ScoreDocs[0].Doc); + int? actualThreadID = doc.GetField("thread").GetInt32Value(); + if (expectedThreadIDs[id] != actualThreadID) + { + Console.WriteLine("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + foreach (Operation op in threadOps[threadID]) + { + if (id == op.id) + { + Console.WriteLine(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted")); + } + } + } + assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID); + } + } + else if (hits.TotalHits != 0) + { + Console.WriteLine("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.TotalHits); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + foreach (Operation op in threadOps[threadID]) + { + if (id == op.id) + { + Console.WriteLine(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del")); + } + } + } + assertEquals(0, hits.TotalHits); + } } + w.Dispose(); + r.Dispose(); } - }; - threads[i].start(); - } - startingGun.countDown(); -for (Thread thread : threads) -{ - thread.join(); -} + dir.Dispose(); + } -Operation commitOp = new Operation(); -synchronized(commitLock) { - commitOp.seqNo = w.commit(); - commits.add(commitOp); -} + [Test] + public void TestStressConcurrentDocValuesUpdatesCommit() + { + int opCount = AtLeast(10000); + int idCount = TestUtil.NextInt32(Random, 10, 1000); -List indexCommits = DirectoryReader.listCommits(dir); -assertEquals(commits.size(), indexCommits.size()); + Directory dir = NewDirectory(); + IndexWriterConfig iwc = NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)); + iwc.IndexDeletionPolicy = NoDeletionPolicy.INSTANCE; -int[] expectedThreadIDs = new int[idCount]; -long[] seqNos = new long[idCount]; + // Cannot use RIW since it randomly commits: + IndexWriter w = new IndexWriter(dir, iwc); -//System.out.println("TEST: " + commits.size() + " commits"); -for (int i = 0; i < commits.size(); i++) -{ - // this commit point should reflect all operations <= this seqNo - long commitSeqNo = commits.get(i).seqNo; - //System.out.println(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits.get(i)); + int numThreads = TestUtil.NextInt32(Random, 2, 10); + if (Verbose) + { + Console.WriteLine("TEST: numThreads=" + numThreads); + } + ThreadJob[] threads = new ThreadJob[numThreads]; + //Console.WriteLine("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.length); + CountdownEvent startingGun = new CountdownEvent(1); + List> threadOps = new(); - Arrays.fill(expectedThreadIDs, -1); - Arrays.fill(seqNos, 0); + Object commitLock = new Object(); + List commits = new(); - for (int threadID = 0; threadID < threadOps.size(); threadID++) - { - long lastSeqNo = 0; - for (Operation op : threadOps.get(threadID)) - { - if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) + List ops1 = new(); + threadOps.Add(ops1); + + // pre-index every ID so none are missing: + for (int id = 0; id < idCount; id++) + { + int threadID = 0; + Operation op = new Operation(); + op.threadID = threadID; + op.id = id; + + Document doc = new Document(); + doc.Add(new StoredField("thread", threadID)); + doc.Add(new NumericDocValuesField("thread", threadID)); + doc.Add(new StringField("id", "" + id, Field.Store.NO)); + op.seqNo = w.AddDocument(doc); + ops1.Add(op); + } + + // multiple threads update the same set of documents, and we randomly commit, recording the commit seqNo and then opening each commit in + // the end to verify it reflects the correct updates + for (int i = 0; i < threads.Length; i++) { - seqNos[op.id] = op.seqNo; - if (op.what == 2) + List ops; + if (i == 0) { - expectedThreadIDs[op.id] = threadID; + ops = threadOps[0]; } else { - expectedThreadIDs[op.id] = -1; + ops = new(); + threadOps.Add(ops); } + + int threadID = i; + threads[i] = new ThreadJob(() => + { + try + { + startingGun.Wait(); + for (int i = 0; i < opCount; i++) + { + Operation op = new Operation(); + op.threadID = threadID; + if (new Random().Next(500) == 17) + { + op.what = 2; + UninterruptableMonitor.Enter(commitLock); + try + { + op.seqNo = w.Commit(); + if (op.seqNo != -1) + { + commits.Add(op); + } + } + finally + { + UninterruptableMonitor.Exit(commitLock); + } + } + else + { + op.id = new Random().Next(idCount); + Term idTerm = new Term("id", "" + op.id); + op.seqNo = w.UpdateNumericDocValue(idTerm, "thread", threadID); + op.what = 0; + ops.Add(op); + } + } + } + catch (Exception e) + { + throw RuntimeException.Create(e); + } + }); + + threads[i].Name = ("thread" + i); + threads[i].Start(); + } + startingGun.Signal(); + foreach (ThreadJob thread in threads) + { + thread.Join(); } - assertTrue(op.seqNo >= lastSeqNo); - lastSeqNo = op.seqNo; - } - } + Operation commitOp = new Operation(); + commitOp.seqNo = w.Commit(); + if (commitOp.seqNo != -1) + { + commits.Add(commitOp); + } - DirectoryReader r = DirectoryReader.open(indexCommits.get(i)); - IndexSearcher s = new IndexSearcher(r); + IList indexCommits = DirectoryReader.ListCommits(dir); + assertEquals(commits.size(), indexCommits.size()); - for (int id = 0; id < idCount; id++) - { - //System.out.println("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]); - TopDocs hits = s.search(new TermQuery(new Term("id", "" + id)), 1); + int[] expectedThreadIDs = new int[idCount]; + long[] seqNos = new long[idCount]; - if (expectedThreadIDs[id] != -1) - { - assertEquals(1, hits.totalHits); - Document doc = r.document(hits.scoreDocs[0].doc); - int actualThreadID = doc.getField("thread").numericValue().intValue(); - if (expectedThreadIDs[id] != actualThreadID) + //Console.WriteLine("TEST: " + commits.size() + " commits"); + for (int i = 0; i < commits.size(); i++) { - System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID); + // this commit point should reflect all operations <= this seqNo + long commitSeqNo = commits[i].seqNo; + //Console.WriteLine(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits[i)); + + Arrays.Fill(expectedThreadIDs, -1); + Arrays.Fill(seqNos, 0); + for (int threadID = 0; threadID < threadOps.size(); threadID++) { - for (Operation op : threadOps.get(threadID)) + long lastSeqNo = 0; + foreach (Operation op in threadOps[threadID]) { - if (id == op.id) + if (op.seqNo <= commitSeqNo && op.seqNo > seqNos[op.id]) { - System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "deleted")); + seqNos[op.id] = op.seqNo; + Debugging.Assert(op.what == 0); + expectedThreadIDs[op.id] = threadID; } + + assertTrue(op.seqNo > lastSeqNo); + lastSeqNo = op.seqNo; } } - assertEquals("id=" + id, expectedThreadIDs[id], actualThreadID); + + DirectoryReader r = DirectoryReader.Open(indexCommits[i]); + IndexSearcher s = new IndexSearcher(r); + NumericDocValues docValues = MultiDocValues.GetNumericValues(r, "thread"); + + for (int id = 0; id < idCount; id++) + { + //Console.WriteLine("TEST: check id=" + id + " expectedThreadID=" + expectedThreadIDs[id]); + TopDocs hits = s.Search(new TermQuery(new Term("id", "" + id)), 1); + + // We pre-Add all ids up front: + Debugging.Assert(expectedThreadIDs[id] != -1); + assertEquals(1, hits.TotalHits); + int actualThreadID = (int)docValues.Get(hits.ScoreDocs[0].Doc); + if (expectedThreadIDs[id] != actualThreadID) + { + Console.WriteLine("FAIL: commit=" + i + " (of " + commits.size() + ") id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs actualThreadID=" + actualThreadID + " commitSeqNo=" + commitSeqNo + " numThreads=" + numThreads + " reader=" + r + " commit=" + indexCommits[i]); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + foreach (Operation op in threadOps[threadID]) + { + if (id == op.id) + { + Console.WriteLine(" threadID=" + threadID + " seqNo=" + op.seqNo); + } + } + } + assertEquals("id=" + id + " docID=" + hits.ScoreDocs[0].Doc, expectedThreadIDs[id], actualThreadID); + } + } + w.Dispose(); + r.Dispose(); } + + dir.Dispose(); } - else if (hits.totalHits != 0) + + [Test] + public void TestStressConcurrentAddAndDeleteAndCommit() { - System.out.println("FAIL: id=" + id + " expectedThreadID=" + expectedThreadIDs[id] + " vs totalHits=" + hits.totalHits); - for (int threadID = 0; threadID < threadOps.size(); threadID++) + int opCount = AtLeast(10000); + int idCount = TestUtil.NextInt32(Random, 10, 1000); + + Directory dir = NewDirectory(); + IndexWriterConfig iwc = NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random)); + iwc.IndexDeletionPolicy = (NoDeletionPolicy.INSTANCE); + + // Cannot use RIW since it randomly commits: + IndexWriter w = new IndexWriter(dir, iwc); + + int numThreads = TestUtil.NextInt32(Random, 2, 5); + ThreadJob[] threads = new ThreadJob[numThreads]; + //Console.WriteLine("TEST: iter=" + iter + " opCount=" + opCount + " idCount=" + idCount + " threadCount=" + threads.Length); + CountdownEvent startingGun = new CountdownEvent(1); + List> threadOps = new(); + + Object commitLock = new Object(); + List commits = new(); + + // multiple threads update the same set of documents, and we randomly commit + for (int i = 0; i < threads.Length; i++) { - for (Operation op : threadOps.get(threadID)) + List ops = new(); + threadOps.Add(ops); + int threadID = i; + threads[i] = new ThreadJob(() => { - if (id == op.id) + try { - System.out.println(" threadID=" + threadID + " seqNo=" + op.seqNo + " " + (op.what == 2 ? "updated" : "del")); + startingGun.Wait(); + for (int i = 0; i < opCount; i++) + { + Operation op = new Operation(); + op.threadID = threadID; + if (new Random().Next(500) == 17) + { + op.what = 2; + lock (commitLock) + { + op.seqNo = w.Commit(); + if (op.seqNo != -1) + { + commits.Add(op); + } + } + } + else + { + op.id = new Random().Next(idCount); + Term idTerm = new Term("id", "" + op.id); + if (new Random().Next(10) == 1) + { + op.what = 1; + if (new Random().nextBoolean()) + { + op.seqNo = w.DeleteDocuments(idTerm); + } + else + { + op.seqNo = w.DeleteDocuments(new TermQuery(idTerm)); + } + } + else + { + Document doc = new Document(); + doc.Add(new StoredField("threadop", threadID + "-" + ops.size())); + doc.Add(new StringField("id", "" + op.id, Field.Store.NO)); + if (new Random().nextBoolean()) + { + List docs = new(); + docs.Add(doc); + op.seqNo = w.AddDocuments(docs); + } + else + { + op.seqNo = w.AddDocument(doc); + } + op.what = 3; + } + ops.Add(op); + } + } + } + catch (Exception e) + { + throw RuntimeException.Create(e); + } + }); + threads[i].Name = ("thread" + threadID); + threads[i].Start(); + } + startingGun.Signal(); + foreach (ThreadJob thread in threads) + { + thread.Join(); + } + + Operation commitOp = new Operation(); + commitOp.seqNo = w.Commit(); + if (commitOp.seqNo != -1) + { + commits.Add(commitOp); + } + + IList indexCommits = DirectoryReader.ListCommits(dir); + assertEquals(commits.size(), indexCommits.size()); + + // how many docs with this id are expected: + int[] expectedCounts = new int[idCount]; + long[] lastDelSeqNos = new long[idCount]; + + //Console.WriteLine("TEST: " + commits.size() + " commits"); + for (int i = 0; i < commits.size(); i++) + { + // this commit point should reflect all operations <= this seqNo + long commitSeqNo = commits[i].seqNo; + //Console.WriteLine(" commit " + i + ": seqNo=" + commitSeqNo + " segs=" + indexCommits[i)); + + // first find the highest seqNo of the last delete op, for each id, prior to this commit: + Arrays.Fill(lastDelSeqNos, -1); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + long lastSeqNo = 0; + foreach (Operation op in threadOps[threadID]) + { + if (op.what == 1 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) + { + lastDelSeqNos[op.id] = op.seqNo; + } + + // within one thread the seqNos must only increase: + assertTrue(op.seqNo > lastSeqNo); + lastSeqNo = op.seqNo; + } + } + + // then count how many adds happened since the last delete and before this commit: + Arrays.Fill(expectedCounts, 0); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + foreach (Operation op in threadOps[threadID]) + { + if (op.what == 3 && op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]) + { + expectedCounts[op.id]++; + } } } + + DirectoryReader r = DirectoryReader.Open(indexCommits[i]); + IndexSearcher s = new IndexSearcher(r); + + for (int id = 0; id < idCount; id++) + { + //we don't have count menthod in IndexSearcher which counts and returns value for the given query + int actualCount = s.Search(new TermQuery(new Term("id", "" + id)),idCount).TotalHits; + if (expectedCounts[id] != actualCount) + { + Console.WriteLine("TEST: FAIL r=" + r + " id=" + id + " commitSeqNo=" + commitSeqNo); + for (int threadID = 0; threadID < threadOps.size(); threadID++) + { + int opCount2 = 0; + foreach (Operation op in threadOps[threadID]) + { + if (op.id == id) + { + bool shouldCount = op.seqNo <= commitSeqNo && op.seqNo > lastDelSeqNos[op.id]; + Console.WriteLine(" id=" + id + " what=" + op.what + " threadop=" + threadID + "-" + opCount2 + " seqNo=" + op.seqNo + " vs lastDelSeqNo=" + lastDelSeqNos[op.id] + " shouldCount=" + shouldCount); + } + opCount2++; + } + } + TopDocs hits = s.Search(new TermQuery(new Term("id", "" + id)), 1 + actualCount); + foreach (ScoreDoc hit in hits.ScoreDocs) + { + Console.WriteLine(" hit: " + s.Doc(hit.Doc).Get("threadop")); + } + + //why this is com-out bc Bits liveDocs is not supported for now + /* foreach (AtomicReaderContext ctx in r.Leaves) + { + Console.WriteLine(" sub=" + ctx.Reader); + Bits liveDocs = ctx.Reader; + for (int docID = 0; docID < ctx.Reader.MaxDoc; docID++) + { + Console.WriteLine(" docID=" + docID + " threadop=" + ctx.Reader.Document(docID).Get("threadop") + (liveDocs != null && liveDocs[docID] == false ? " (deleted)" : "")); + } + }*/ + + assertEquals("commit " + i + " of " + commits.size() + " id=" + id + " reader=" + r, expectedCounts[id], actualCount); + } + } + w.Dispose(); + r.Dispose(); } - assertEquals(0, hits.totalHits); - } - } - w.close(); - r.close(); -} -dir.close(); - }*/ + dir.Dispose(); + } - // nocommit test that does n ops across threads, then does it again with a single index / single thread, and assert indices are the same + [Test] + public void TestDeleteAll() + { + Directory dir = NewDirectory(); + IndexWriter w = new IndexWriter(dir, NewIndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer(Random))); + long a = w.AddDocument(new Document()); + long b = w.DeleteAll(); + assertTrue(a < b); + long c = w.Commit(); + assertTrue(b < c); + w.Dispose(); + dir.Dispose(); + } } } diff --git a/src/Lucene.Net.Tests/Index/TestDocumentsWriterDeleteQueue.cs b/src/Lucene.Net.Tests/Index/TestDocumentsWriterDeleteQueue.cs index 1655afec7b..5ad12ca954 100644 --- a/src/Lucene.Net.Tests/Index/TestDocumentsWriterDeleteQueue.cs +++ b/src/Lucene.Net.Tests/Index/TestDocumentsWriterDeleteQueue.cs @@ -52,8 +52,8 @@ public virtual void TestUpdateDelteSlices() } DeleteSlice slice1 = queue.NewSlice(); DeleteSlice slice2 = queue.NewSlice(); - BufferedUpdates bd1 = new BufferedUpdates(); - BufferedUpdates bd2 = new BufferedUpdates(); + BufferedUpdates bd1 = new BufferedUpdates("bd1"); + BufferedUpdates bd2 = new BufferedUpdates("bd2"); int last1 = 0; int last2 = 0; ISet uniqueValues = new JCG.HashSet(); @@ -312,7 +312,7 @@ protected internal UpdateThread(DocumentsWriterDeleteQueue queue, AtomicInt32 in this.index = index; this.ids = ids; this.slice = queue.NewSlice(); - deletes = new BufferedUpdates(); + deletes = new BufferedUpdates("deletes"); this.latch = latch; } diff --git a/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs b/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs index 97fb748ee2..3e6477b4cc 100644 --- a/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs +++ b/src/Lucene.Net.Tests/Index/TestIndexWriterConfig.cs @@ -101,7 +101,7 @@ public virtual void TestDefaults() getters.Add("getIndexingChain"); getters.Add("getMergedSegmentWarmer"); getters.Add("getMergePolicy"); - getters.Add("getMaxThreadStates"); + getters.Add("getMaxThreadStates");//why should removed? getters.Add("getReaderPooling"); getters.Add("getIndexerThreadPool"); getters.Add("getReaderTermsIndexDivisor"); diff --git a/src/Lucene.Net/Index/BufferedUpdates.cs b/src/Lucene.Net/Index/BufferedUpdates.cs index 321accf3b8..7e6d970e8f 100644 --- a/src/Lucene.Net/Index/BufferedUpdates.cs +++ b/src/Lucene.Net/Index/BufferedUpdates.cs @@ -154,9 +154,12 @@ load factor (say 2 * POINTER). Entry is object w/ internal long gen; - internal BufferedUpdates() // LUCENENET specific - Made internal rather than public, since this class is intended to be internal but couldn't be because it is exposed through a public API + private readonly string segmentName; + + internal BufferedUpdates(string segmentName) // LUCENENET specific - Made internal rather than public, since this class is intended to be internal but couldn't be because it is exposed through a public API { this.bytesUsed = new AtomicInt64(); + this.segmentName = segmentName; } public override string ToString() diff --git a/src/Lucene.Net/Index/DocumentsWriter.cs b/src/Lucene.Net/Index/DocumentsWriter.cs index 0807e26f60..f852cd5c40 100644 --- a/src/Lucene.Net/Index/DocumentsWriter.cs +++ b/src/Lucene.Net/Index/DocumentsWriter.cs @@ -349,7 +349,7 @@ internal void LockAndAbortAll(IndexWriter indexWriter) } deleteQueue.Clear(); // jump over any possible in flight ops: - deleteQueue.seqNo.AddAndGet(perThreadPool.NumThreadStatesActive + 1); + deleteQueue.SkipSequenceNumbers(perThreadPool.NumThreadStatesActive + 1); flushControl.AbortPendingFlushes(newFilesSet); PutEvent(new DeleteNewFilesEvent(newFilesSet)); flushControl.WaitForFlush(); diff --git a/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs b/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs index 780eefe796..cdcfe9ca9d 100644 --- a/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs +++ b/src/Lucene.Net/Index/DocumentsWriterDeleteQueue.cs @@ -67,10 +67,12 @@ namespace Lucene.Net.Index /// internal sealed class DocumentsWriterDeleteQueue { + // the current end (latest delete operation) in the delete queue: private Node tail; // LUCENENET NOTE: can't use type without specifying type parameter, also not volatile due to Interlocked - // LUCENENET NOTE: no need for AtomicReferenceFieldUpdater, we can use Interlocked instead - private readonly DeleteSlice globalSlice; + /* Used to record deletes against all prior (already written to disk) segments. Whenever any segment flushes, we bundle up this set of + * deletes and insert into the buffered updates stream before the newly flushed segment(s). */ + private readonly DeleteSlice globalSlice; // LUCENENET NOTE: no need for AtomicReferenceFieldUpdater, we can use Interlocked instead private readonly BufferedUpdates globalBufferedUpdates; @@ -79,16 +81,20 @@ internal sealed class DocumentsWriterDeleteQueue internal readonly long generation; - internal readonly AtomicInt64 seqNo; + /* Generates the sequence number that IW returns to callers changing the index, showing the effective serialization of all operations. */ + private readonly AtomicInt64 nextSeqNo; + + // for asserts + internal long maxSeqNo = long.MaxValue; // seqNo must start at 1 because some APIs negate this to encode a boolean internal DocumentsWriterDeleteQueue() - :this(0, 1) + : this(0, 1) { } internal DocumentsWriterDeleteQueue(long generation, long startSeqNo) - : this(new BufferedUpdates(), generation, startSeqNo) + : this(new BufferedUpdates("global"), generation, startSeqNo) { } @@ -96,12 +102,12 @@ internal DocumentsWriterDeleteQueue(BufferedUpdates globalBufferedUpdates, long { this.globalBufferedUpdates = globalBufferedUpdates; this.generation = generation; - this.seqNo = new AtomicInt64(startSeqNo); - /* - * we use a sentinel instance as our initial tail. No slice will ever try to - * apply this tail since the head is always omitted. - */ - tail = new Node(null); // sentinel + this.nextSeqNo = new AtomicInt64(startSeqNo); + /* + * we use a sentinel instance as our initial tail. No slice will ever try to + * apply this tail since the head is always omitted. + */ + tail = new Node(null); // sentinel globalSlice = new DeleteSlice(tail); } @@ -161,7 +167,7 @@ internal long Add(Term term, DeleteSlice slice) internal long Add(Node newNode) { UninterruptableMonitor.Enter(this); - /* + /* Earlier/old implementation * this non-blocking / 'wait-free' linked list add was inspired by Apache * Harmony's ConcurrentLinkedQueue Implementation. */ @@ -169,7 +175,7 @@ internal long Add(Node newNode) { tail.next = newNode; tail = newNode; - return seqNo.GetAndIncrement(); + return NextSequenceNumber; } finally { @@ -207,9 +213,8 @@ internal void TryApplyGlobalSlice() */ try { - if (UpdateSlice(globalSlice)) + if (UpdateSliceNoSeqNo(globalSlice)) { - // System.out.println(Thread.currentThread() + ": apply globalSlice"); globalSlice.Apply(globalBufferedUpdates, BufferedUpdates.MAX_INT32); } } @@ -259,10 +264,33 @@ internal DeleteSlice NewSlice() return new DeleteSlice(tail); } - internal bool UpdateSlice(DeleteSlice slice) + /* Negative result means there were new deletes since we last applied*/ + internal long UpdateSlice(DeleteSlice slice) { - if (slice.sliceTail != tail) // If we are the same just + UninterruptableMonitor.Enter(this); + long seqNo = NextSequenceNumber; + try { + if (slice.sliceTail != tail) + { + // new deletes arrived since we last checked + slice.sliceTail = tail; + seqNo = -seqNo; + } + } + finally + { + UninterruptableMonitor.Exit(this); + } + return seqNo; + } + + /** Just like updateSlice, but does not assign a sequence number */ + internal bool UpdateSliceNoSeqNo(DeleteSlice slice) + { + if (slice.sliceTail != tail) + { + // new deletes arrived since we last checked slice.sliceTail = tail; return true; } @@ -505,5 +533,25 @@ public override string ToString() { return "DWDQ: [ generation: " + generation + " ]"; } + + public long NextSequenceNumber + { + get + { + long seqNo =nextSeqNo.GetAndIncrement(); + Debugging.Assert( seqNo <= maxSeqNo , "seqNo=" + seqNo + " vs maxSeqNo=" + maxSeqNo); + return seqNo; + } + } + + public long LastSequenceNumber => nextSeqNo - 1; + + /* Inserts a gap in the sequence numbers. This is used by IW during flush or commit to ensure any in-flight threads get sequence numbers + * inside the gap */ + public void SkipSequenceNumbers(long jump) + { + nextSeqNo.AddAndGet(jump); + } + } } \ No newline at end of file diff --git a/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs b/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs index ba0d8de426..ff70ca0977 100644 --- a/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs +++ b/src/Lucene.Net/Index/DocumentsWriterFlushControl.cs @@ -697,11 +697,14 @@ internal long MarkForFullFlush() flushingQueue = documentsWriter.deleteQueue; // Set a new delete queue - all subsequent DWPT will use this queue until // we do another full flush - seqNo = documentsWriter.deleteQueue.seqNo.Value + perThreadPool.NumThreadStatesActive; - // nocommit is this (active thread state count) always enough of a gap? what if new indexing thread sneaks in just now? it would - // have to get this next delete queue? + // Insert a gap in seqNo of current active thread count, in the worst case each of those threads now have one operation in flight. It's fine + // if we have some sequence numbers that were never assigned: + seqNo = documentsWriter.deleteQueue.LastSequenceNumber + perThreadPool.NumThreadStatesActive + 2; + flushingQueue.maxSeqNo = seqNo; + DocumentsWriterDeleteQueue newQueue = new DocumentsWriterDeleteQueue(flushingQueue.generation + 1, seqNo + 1); + documentsWriter.deleteQueue = newQueue; } finally { diff --git a/src/Lucene.Net/Index/DocumentsWriterPerThread.cs b/src/Lucene.Net/Index/DocumentsWriterPerThread.cs index 1682150889..fd95f98d31 100644 --- a/src/Lucene.Net/Index/DocumentsWriterPerThread.cs +++ b/src/Lucene.Net/Index/DocumentsWriterPerThread.cs @@ -232,11 +232,10 @@ public DocumentsWriterPerThread(string segmentName, Directory directory, LiveInd this.docState.similarity = indexWriterConfig.Similarity; bytesUsed = Counter.NewCounter(); byteBlockAllocator = new DirectTrackingAllocator(bytesUsed); - pendingUpdates = new BufferedUpdates(); + pendingUpdates = new BufferedUpdates(segmentName); intBlockAllocator = new Int32BlockAllocator(bytesUsed); this.deleteQueue = deleteQueue; if (Debugging.AssertsEnabled) Debugging.Assert(numDocsInRAM == 0,"num docs {0}", numDocsInRAM); - pendingUpdates.Clear(); deleteSlice = deleteQueue.NewSlice(); segmentInfo = new SegmentInfo(directoryOrig, Constants.LUCENE_MAIN_VERSION, segmentName, -1, false, codec, null); @@ -388,8 +387,7 @@ public virtual long UpdateDocuments(IEnumerable> do Abort(filesToDelete); } } - - FinishDocument(null); + numDocsInRAM++; } allDocsIndexed = true; @@ -406,7 +404,16 @@ public virtual long UpdateDocuments(IEnumerable> do } else { - seqNo = deleteQueue.seqNo; + seqNo = deleteQueue.UpdateSlice(deleteSlice); + if (seqNo < 0) + { + seqNo = -seqNo; + deleteSlice.Apply(pendingUpdates, numDocsInRAM - docCount); + } + else + { + deleteSlice.Reset(); + } } return seqNo; } @@ -448,9 +455,16 @@ private long FinishDocument(Term delTerm) } else { - applySlice &= deleteQueue.UpdateSlice(deleteSlice); - // nocommit we don't need to increment here? - seqNo = deleteQueue.seqNo; + seqNo = deleteQueue.UpdateSlice(deleteSlice); + + if (seqNo < 0) + { + seqNo = -seqNo; + } + else + { + applySlice = false; + } } if (applySlice) diff --git a/src/Lucene.Net/Index/IndexWriter.cs b/src/Lucene.Net/Index/IndexWriter.cs index d6ddbebeab..b185790a1d 100644 --- a/src/Lucene.Net/Index/IndexWriter.cs +++ b/src/Lucene.Net/Index/IndexWriter.cs @@ -1530,13 +1530,17 @@ public virtual int MaxDoc } } - // nocommit javadocs + /// + /// Returns the last sequence number, + /// or 0 if no index-changing operations have completed yet. + ///@lucene.experimental + /// public virtual long LastSequenceNumber { get { EnsureOpen(); - return docWriter.deleteQueue.seqNo; + return docWriter.deleteQueue.LastSequenceNumber; } } @@ -1879,7 +1883,7 @@ public virtual long TryDeleteDocument(IndexReader readerIn, int docID) Changed(); } //System.out.println(" yes " + info.info.name + " " + docID); - return docWriter.deleteQueue.seqNo.GetAndIncrement(); + return docWriter.deleteQueue.NextSequenceNumber; } finally { @@ -3023,7 +3027,7 @@ public virtual long DeleteAll() segmentInfos.Changed(); globalFieldNumberMap.Clear(); success = true; - return docWriter.deleteQueue.seqNo; + return docWriter.deleteQueue.NextSequenceNumber; } catch (Exception oom) when (oom.IsOutOfMemoryError()) { @@ -3419,6 +3423,8 @@ public virtual long AddIndexes(params Directory[] dirs) bool successTop = false; + long seqNo; + try { if (infoStream.IsEnabled("IW")) @@ -3463,6 +3469,7 @@ public virtual long AddIndexes(params Directory[] dirs) infos.Add(CopySegmentAsIs(info, newSegName, dsNames, dsFilesCopied, context, copiedFiles)); } } + seqNo = docWriter.deleteQueue.NextSequenceNumber; success = true; } finally @@ -3526,6 +3533,8 @@ public virtual long AddIndexes(params Directory[] dirs) catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "AddIndexes(Directory...)"); + // dead code but javac disagrees: + seqNo = -1; } finally { @@ -3539,7 +3548,7 @@ public virtual long AddIndexes(params Directory[] dirs) } } MaybeMerge(); - return docWriter.deleteQueue.seqNo; + return seqNo; } /// @@ -3584,7 +3593,7 @@ public virtual long AddIndexes(params IndexReader[] readers) { EnsureOpen(); int numDocs = 0; - + long seqNo; try { if (infoStream.IsEnabled("IW")) @@ -3616,8 +3625,7 @@ public virtual long AddIndexes(params IndexReader[] readers) if (!merger.ShouldMerge) { - // no need to increment: - return docWriter.deleteQueue.seqNo; + return docWriter.deleteQueue.NextSequenceNumber; } MergeState mergeState; @@ -3658,7 +3666,7 @@ public virtual long AddIndexes(params IndexReader[] readers) { deleter.DeleteNewFiles(infoPerCommit.GetFiles()); // no need to increment: - return docWriter.deleteQueue.seqNo; + return docWriter.deleteQueue.NextSequenceNumber; } EnsureOpen(); useCompoundFile = mergePolicy.UseCompoundFile(segmentInfos, infoPerCommit); @@ -3728,11 +3736,11 @@ public virtual long AddIndexes(params IndexReader[] readers) if (stopMerges) { deleter.DeleteNewFiles(info.GetFiles()); - // no need to increment: - return docWriter.deleteQueue.seqNo; + return docWriter.deleteQueue.NextSequenceNumber; } EnsureOpen(); segmentInfos.Add(infoPerCommit); + seqNo = docWriter.deleteQueue.NextSequenceNumber; Checkpoint(); } finally @@ -3743,10 +3751,11 @@ public virtual long AddIndexes(params IndexReader[] readers) catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "AddIndexes(IndexReader...)"); + // dead code but javac disagrees: + seqNo = -1; } MaybeMerge(); - // no need to increment: - return docWriter.deleteQueue.seqNo; + return seqNo; } /// From 255ca906a49ce10594a3363de045cd9f7c2102f1 Mon Sep 17 00:00:00 2001 From: Jeevananthan-23 Date: Wed, 8 Nov 2023 21:59:04 +0530 Subject: [PATCH 4/5] seqno: fix text bug --- .../Index/TestIndexingSequenceNumbers.cs | 49 ++++++----- src/Lucene.Net/Index/BufferedUpdates.cs | 2 +- src/Lucene.Net/Index/IndexWriter.cs | 87 ++++++++++++++----- 3 files changed, 93 insertions(+), 45 deletions(-) diff --git a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs index b716fcf8cd..b7103f19ba 100644 --- a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs +++ b/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs @@ -164,7 +164,7 @@ public void TestStressConcurrentCommit() List commits = new(); AtomicInt32 opsSinceCommit = new(); - // multiple threads update the same set of documents, and we randomly commit + // multiple threads update the same set of documents, and we randonly commit for (int i = 0; i < threads.Length; i++) { List ops = new(); @@ -179,9 +179,9 @@ public void TestStressConcurrentCommit() { Operation op = new(); op.threadID = threadID; - if (new Random().Next(500) == 17) + if (Random.Next(500) == 17) { - op.what = 0; + op.what = 2; UninterruptableMonitor.Enter(commitLock); try { @@ -198,19 +198,26 @@ public void TestStressConcurrentCommit() } else { - op.id = new Random().Next(idCount); + op.id = Random.Next(idCount); Term idTerm = new Term("id", op.id.ToString()); - if (new Random().Next(10) == 1) + if (Random.Next(10) == 1) { op.what = 1; - op.seqNo = new Random().Next(2) == 0 ? w.DeleteDocuments(idTerm) : w.DeleteDocuments(new TermQuery(idTerm)); + if (Random.nextBoolean()) + { + op.seqNo = w.DeleteDocuments(idTerm); + } + else + { + op.seqNo = w.DeleteDocuments(new TermQuery(idTerm)); + } } else { Document doc = new Document(); doc.Add(new StoredField("thread", threadID)); doc.Add(new StringField("id", op.id.ToString(), Field.Store.NO)); - if (new Random().Next(2) == 0) + if (Random.Next(2) == 0) { List docs = new List { doc }; op.seqNo = w.UpdateDocuments(idTerm, docs); @@ -218,10 +225,10 @@ public void TestStressConcurrentCommit() else { op.seqNo = w.UpdateDocument(idTerm, doc); - op.what = 0; } - ops.Add(op); + op.what = 0; } + ops.Add(op); } } } @@ -409,7 +416,7 @@ public void TestStressConcurrentDocValuesUpdatesCommit() { Operation op = new Operation(); op.threadID = threadID; - if (new Random().Next(500) == 17) + if (Random.Next(500) == 17) { op.what = 2; UninterruptableMonitor.Enter(commitLock); @@ -428,7 +435,7 @@ public void TestStressConcurrentDocValuesUpdatesCommit() } else { - op.id = new Random().Next(idCount); + op.id = Random.Next(idCount); Term idTerm = new Term("id", "" + op.id); op.seqNo = w.UpdateNumericDocValue(idTerm, "thread", threadID); op.what = 0; @@ -564,7 +571,7 @@ public void TestStressConcurrentAddAndDeleteAndCommit() { Operation op = new Operation(); op.threadID = threadID; - if (new Random().Next(500) == 17) + if (Random.Next(500) == 17) { op.what = 2; lock (commitLock) @@ -578,12 +585,12 @@ public void TestStressConcurrentAddAndDeleteAndCommit() } else { - op.id = new Random().Next(idCount); + op.id = Random.Next(idCount); Term idTerm = new Term("id", "" + op.id); - if (new Random().Next(10) == 1) + if (Random.Next(10) == 1) { op.what = 1; - if (new Random().nextBoolean()) + if (Random.nextBoolean()) { op.seqNo = w.DeleteDocuments(idTerm); } @@ -591,13 +598,14 @@ public void TestStressConcurrentAddAndDeleteAndCommit() { op.seqNo = w.DeleteDocuments(new TermQuery(idTerm)); } + Assert.IsTrue(w.HasDeletions()); //testing concurrenct deletions } else { Document doc = new Document(); doc.Add(new StoredField("threadop", threadID + "-" + ops.size())); doc.Add(new StringField("id", "" + op.id, Field.Store.NO)); - if (new Random().nextBoolean()) + if (Random.nextBoolean()) { List docs = new(); docs.Add(doc); @@ -708,16 +716,15 @@ public void TestStressConcurrentAddAndDeleteAndCommit() Console.WriteLine(" hit: " + s.Doc(hit.Doc).Get("threadop")); } - //why this is com-out bc Bits liveDocs is not supported for now - /* foreach (AtomicReaderContext ctx in r.Leaves) + foreach (AtomicReaderContext ctx in r.Leaves) { Console.WriteLine(" sub=" + ctx.Reader); - Bits liveDocs = ctx.Reader; + IBits liveDocs = ctx.AtomicReader.LiveDocs; for (int docID = 0; docID < ctx.Reader.MaxDoc; docID++) { - Console.WriteLine(" docID=" + docID + " threadop=" + ctx.Reader.Document(docID).Get("threadop") + (liveDocs != null && liveDocs[docID] == false ? " (deleted)" : "")); + Console.WriteLine("docID=" + docID + " threadop=" + ctx.Reader.Document(docID).Get("threadop") + (liveDocs != null && liveDocs.Get(docID) == false ? " (deleted)" : "")); } - }*/ + } assertEquals("commit " + i + " of " + commits.size() + " id=" + id + " reader=" + r, expectedCounts[id], actualCount); } diff --git a/src/Lucene.Net/Index/BufferedUpdates.cs b/src/Lucene.Net/Index/BufferedUpdates.cs index 7e6d970e8f..69946518a1 100644 --- a/src/Lucene.Net/Index/BufferedUpdates.cs +++ b/src/Lucene.Net/Index/BufferedUpdates.cs @@ -29,7 +29,7 @@ namespace Lucene.Net.Index /// single segment. this is used to hold buffered pending /// deletes and updates against the to-be-flushed segment. Once the /// deletes and updates are pushed (on flush in ), they - /// are converted to a FrozenBufferedUpdates instance. + /// are converted to a instance. /// /// NOTE: instances of this class are accessed either via a private /// instance on , or via sync'd code by diff --git a/src/Lucene.Net/Index/IndexWriter.cs b/src/Lucene.Net/Index/IndexWriter.cs index b185790a1d..012c0bd78e 100644 --- a/src/Lucene.Net/Index/IndexWriter.cs +++ b/src/Lucene.Net/Index/IndexWriter.cs @@ -75,6 +75,14 @@ namespace Lucene.Net.Index /// (which just deletes /// and then adds the entire document). When finished adding, deleting /// and updating documents, should be called. + /// + /// + ///Each method that changes the index returns a long + ///sequence number, which expresses the effective order in which each change was applied. + /// also returns a sequence number, describing which + ///changes are in the commit point and which are not. Sequence numbers + ///are transient(not saved into the index in any way) and only valid + ///within a single instance. /// /// /// These changes are buffered in memory and periodically @@ -1531,7 +1539,7 @@ public virtual int MaxDoc } /// - /// Returns the last sequence number, + /// Returns the last sequence number, /// or 0 if no index-changing operations have completed yet. ///@lucene.experimental /// @@ -1553,22 +1561,14 @@ public virtual long LastSequenceNumber /// public virtual bool HasDeletions() { + EnsureOpen(); + if (bufferedUpdatesStream.Any() || docWriter.AnyDeletions() || readerPool.AnyPendingDeletes()) + { + return true; + } UninterruptableMonitor.Enter(this); try { - EnsureOpen(); - if (bufferedUpdatesStream.Any()) - { - return true; - } - if (docWriter.AnyDeletions()) - { - return true; - } - if (readerPool.AnyPendingDeletes()) - { - return true; - } foreach (SegmentCommitInfo info in segmentInfos.Segments) { if (info.HasDeletions) @@ -1576,12 +1576,12 @@ public virtual bool HasDeletions() return true; } } - return false; } finally { UninterruptableMonitor.Exit(this); } + return false; } /// @@ -1618,7 +1618,10 @@ public virtual bool HasDeletions() /// In this case, the invalid characters are silently /// replaced with the Unicode replacement character /// U+FFFD. - /// + /// + /// Return The sequence number + /// for this operation. + /// /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. @@ -1637,7 +1640,10 @@ public virtual long AddDocument(IEnumerable doc) /// See for details on /// index and state after an , and /// flushing/merging temporary free space requirements. - /// + /// + /// Return The sequence number + /// for this operation. + /// /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. @@ -1685,6 +1691,9 @@ public virtual long AddDocument(IEnumerable doc, Analyzer analy /// you should immediately dispose the writer. See /// for details. /// + /// Return The sequence number + /// for this operation. + /// /// @lucene.experimental /// /// if the index is corrupt @@ -1700,6 +1709,8 @@ public virtual long AddDocuments(IEnumerable> docs) /// IDs, such that an external reader will see all or none /// of the documents. /// + /// Return The sequence number + /// for this operation. /// @lucene.experimental /// /// if the index is corrupt @@ -1715,6 +1726,8 @@ public virtual long AddDocuments(IEnumerable> docs, /// assigned document IDs, such that an external reader /// will see all or none of the documents. /// + /// Return The sequence number + /// for this operation. /// @lucene.experimental /// /// if the index is corrupt @@ -1732,6 +1745,8 @@ public virtual long UpdateDocuments(Term delTerm, IEnumerable + /// Return The sequence number + /// for this operation. /// @lucene.experimental /// /// if the index is corrupt @@ -1779,6 +1794,8 @@ public virtual long UpdateDocuments(Term delTerm, IEnumerableNOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// /// the term to identify the documents to be deleted /// if the index is corrupt @@ -1799,9 +1816,9 @@ public virtual long DeleteDocuments(Term term) catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "DeleteDocuments(Term)"); + // dead code but javac disagrees: + return -1; } - // dead code but javac disagrees: - return -1; } /// @@ -1915,6 +1932,8 @@ public virtual long TryDeleteDocument(IndexReader readerIn, int docID) /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// /// array of terms to identify the documents /// to be deleted @@ -1936,9 +1955,9 @@ public virtual long DeleteDocuments(params Term[] terms) catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "DeleteDocuments(Term..)"); + // dead code but javac disagrees: + return -1; } - // dead code but javac disagrees: - return -1; } /// @@ -1947,6 +1966,8 @@ public virtual long DeleteDocuments(params Term[] terms) /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// /// the query to identify the documents to be deleted /// if the index is corrupt @@ -1978,6 +1999,8 @@ public virtual long DeleteDocuments(Query query) /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// /// array of queries to identify the documents /// to be deleted @@ -2013,6 +2036,8 @@ public virtual long DeleteDocuments(params Query[] queries) /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// /// the term to identify the document(s) to be /// deleted @@ -2035,6 +2060,8 @@ public virtual long UpdateDocument(Term term, IEnumerable doc) /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// /// the term to identify the document(s) to be /// deleted @@ -2073,9 +2100,9 @@ public virtual long UpdateDocument(Term term, IEnumerable doc, catch (Exception oom) when (oom.IsOutOfMemoryError()) { HandleOOM(oom, "UpdateDocument"); + // dead code but javac disagrees: + return -1; } - // dead code but javac disagrees: - return -1; } /// @@ -2089,6 +2116,8 @@ public virtual long UpdateDocument(Term term, IEnumerable doc, /// NOTE: if this method hits an you should immediately /// dispose the writer. See for details. /// + /// Return The sequence number + /// for this operation. /// /// /// the term to identify the document(s) to be updated @@ -2139,6 +2168,8 @@ public virtual long UpdateNumericDocValue(Term term, string field, long? value) /// NOTE: if this method hits an you should immediately /// dispose the writer. See for details. /// + /// Return The sequence number + /// for this operation. /// /// /// the term to identify the document(s) to be updated @@ -2978,6 +3009,8 @@ private void RollbackInternal() /// , or /// methods, they may receive /// s. + /// Return The sequence number + /// for this operation. /// public virtual long DeleteAll() { @@ -3408,6 +3441,8 @@ private IEnumerable AcquireWriteLocks(params Directory[] dirs) /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// /// if the index is corrupt /// if there is a low-level IO error @@ -3584,6 +3619,8 @@ public virtual long AddIndexes(params Directory[] dirs) /// NOTE: if you call with false, which /// aborts all running merges, then any thread still running this method might /// hit a . + /// Return The sequence number + /// for this operation. /// /// /// if the index is corrupt @@ -3944,6 +3981,8 @@ protected virtual void DoBeforeFlush() /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// public long PrepareCommit() { @@ -4176,6 +4215,8 @@ public IDictionary CommitData /// NOTE: if this method hits an /// you should immediately dispose the writer. See /// for details. + /// Return The sequence number + /// for this operation. /// public long Commit() { From 00d3942e750ce2a0be0171205e5c030474370cba Mon Sep 17 00:00:00 2001 From: Jeevananthan <71455761+Jeevananthan-23@users.noreply.github.com> Date: Wed, 8 Nov 2023 19:02:28 +0000 Subject: [PATCH 5/5] seqno: fix bug in IW testpreparecommit --- .../Index/TestIndexingSequenceNumbers.cs | 2 +- src/Lucene.Net/Index/IndexWriter.cs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) rename src/{Lucene.Net.Tests.Misc => Lucene.Net.Tests}/Index/TestIndexingSequenceNumbers.cs (99%) diff --git a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs b/src/Lucene.Net.Tests/Index/TestIndexingSequenceNumbers.cs similarity index 99% rename from src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs rename to src/Lucene.Net.Tests/Index/TestIndexingSequenceNumbers.cs index b7103f19ba..33c115b221 100644 --- a/src/Lucene.Net.Tests.Misc/Index/TestIndexingSequenceNumbers.cs +++ b/src/Lucene.Net.Tests/Index/TestIndexingSequenceNumbers.cs @@ -14,7 +14,7 @@ using System.Collections.Generic; using System.Threading; -namespace Lucene.Net.Tests.Misc.Index +namespace Lucene.Net.Tests.Index { //move me to Lucene.Net.Index public class TestIndexingSequenceNumbers : LuceneTestCase diff --git a/src/Lucene.Net/Index/IndexWriter.cs b/src/Lucene.Net/Index/IndexWriter.cs index 012c0bd78e..9aee18e21d 100644 --- a/src/Lucene.Net/Index/IndexWriter.cs +++ b/src/Lucene.Net/Index/IndexWriter.cs @@ -237,14 +237,14 @@ public class IndexWriter : IDisposable, ITwoPhaseCommit, IAccountable private readonly Directory directory; // where this index resides private readonly Analyzer analyzer; // how to analyze text - private long changeCount; // increments every time a change is completed + private readonly AtomicInt64 changeCount = new(); // increments every time a change is completed private long lastCommitChangeCount; // last changeCount that was committed private IList rollbackSegments; // list of segmentInfo we will fallback to if the commit fails internal volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit()) - internal AtomicInt64 pendingSeqNo; - internal long pendingCommitChangeCount; + internal AtomicInt64 pendingSeqNo = new(); + internal AtomicInt64 pendingCommitChangeCount = new(); private ICollection filesToCommit; @@ -2295,7 +2295,7 @@ internal string NewSegmentName() // could close, re-open and re-return the same segment // name that was previously returned which can cause // problems at least with ConcurrentMergeScheduler. - changeCount++; + changeCount.GetAndIncrement(); segmentInfos.Changed(); return "_" + SegmentInfos.SegmentNumberToString(segmentInfos.Counter++, allowLegacyNames: false); // LUCENENET specific - we had this right thru all of the betas, so don't change if the legacy feature is enabled } @@ -3056,7 +3056,7 @@ public virtual long DeleteAll() // Don't bother saving any changes in our segmentInfos readerPool.DropAll(false); // Mark that the index has changed - ++changeCount; + changeCount.IncrementAndGet(); segmentInfos.Changed(); globalFieldNumberMap.Clear(); success = true; @@ -3232,7 +3232,7 @@ internal virtual void CheckpointNoSIS() UninterruptableMonitor.Enter(this); try { - changeCount++; + changeCount.GetAndIncrement(); deleter.Checkpoint(segmentInfos, false); } finally @@ -3248,7 +3248,7 @@ internal void Changed() UninterruptableMonitor.Enter(this); try { - changeCount++; + changeCount.GetAndIncrement(); segmentInfos.Changed(); } finally @@ -4155,7 +4155,7 @@ public void SetCommitData(IDictionary commitUserData) try { segmentInfos.UserData = new Dictionary(commitUserData); - ++changeCount; + changeCount.IncrementAndGet(); } finally {