Skip to content

Commit

Permalink
Merge pull request #26 from aboutcircles/jaensen/crc-504-reorg-handli…
Browse files Browse the repository at this point in the history
…ng-doesnt-work-reliably

fix: jaensen/crc-504-reorg-handling-doesnt-work-reliably
  • Loading branch information
jaensen authored Dec 15, 2024
2 parents f04da24 + 71c9b3a commit f0c7468
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 36 deletions.
41 changes: 25 additions & 16 deletions Circles.Index.CirclesV1/LogParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public IEnumerable<IIndexEvent> ParseTransaction(Block block, int transactionInd
return Enumerable.Empty<IIndexEvent>();
}

public IEnumerable<IIndexEvent> ParseLog(Block block, Transaction transaction, TxReceipt receipt, LogEntry log, int logIndex)
public IEnumerable<IIndexEvent> ParseLog(Block block, Transaction transaction, TxReceipt receipt, LogEntry log,
int logIndex)
{
List<IIndexEvent> events = new();
if (log.Topics.Length == 0)
Expand Down Expand Up @@ -135,40 +136,48 @@ private IIndexEvent CrcHubTransfer(Block block, TxReceipt receipt, LogEntry log,

private IEnumerable<IIndexEvent> CrcSignup(Block block, TxReceipt receipt, LogEntry log, int logIndex)
{
// Extract user address and token address from the log entry.
string user = "0x" + log.Topics[1].ToString().Substring(Consts.AddressEmptyBytesPrefixLength);
Address tokenAddress = new Address(log.Data.Slice(12));

// Generate the Signup event.
IIndexEvent signupEvent = new Signup(
receipt.BlockNumber
, (long)block.Timestamp
, receipt.Index
, logIndex
, receipt.TxHash!.ToString()
, user
, tokenAddress.ToString(true, false));

CirclesTokenAddresses.TryAdd(tokenAddress, null);
receipt.BlockNumber,
(long)block.Timestamp,
receipt.Index,
logIndex,
receipt.TxHash!.ToString(),
user,
tokenAddress.ToString(true, false));

// Attempt to register the token address. If the token is already known, skip
// generating the transfer event to avoid duplicates (e.g., during reorgs).
bool isNewToken = CirclesTokenAddresses.TryAdd(tokenAddress, null);

if (!isNewToken)
{
// If the token was already known, return only the Signup event.
return new[] { signupEvent };
}

// If the token is new, attempt to locate and generate the corresponding transfer event.
IIndexEvent? signupBonusEvent = null;

// Every signup comes together with an Erc20 transfer (the signup bonus).
// Since the signup event is emitted after the transfer, the token wasn't known yet when we encountered the transfer.
// Look for the transfer again and process it.
for (int i = 0; i < receipt.Logs!.Length; i++)
{
var repeatedLogEntry = receipt.Logs[i];
if (repeatedLogEntry.Address != tokenAddress)
{
continue;
continue; // Skip logs unrelated to the token address.
}

if (repeatedLogEntry.Topics[0] == _transferTopic)
{
signupBonusEvent = Erc20Transfer(block, receipt, repeatedLogEntry, i);
break;
break; // Only one matching transfer event is expected.
}
}

// Return the Signup event, along with the Transfer event if it was found.
return signupBonusEvent == null
? new[] { signupEvent }
: new[] { signupEvent, signupBonusEvent };
Expand Down
3 changes: 2 additions & 1 deletion Circles.Index.Common/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public async Task Flush()

sb.AppendLine(ex.ToString());
Console.WriteLine(sb.ToString());
return true;

return false;
});
}
});
Expand Down
24 changes: 11 additions & 13 deletions Circles.Index.Postgres/PostgresDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,22 +230,20 @@ SELECT MAX(""blockNumber"") as block_number FROM ""System_Block""

public long? FirstGap()
{
var connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString)
{
CommandTimeout = 120
};
using var connection = new NpgsqlConnection(connectionStringBuilder.ConnectionString);
using var connection = new NpgsqlConnection(connectionString);
connection.Open();

NpgsqlCommand cmd = connection.CreateCommand();
cmd.CommandText = $@"
SELECT first_gap
FROM generate_series(
(SELECT MIN(""blockNumber"") FROM ""System_Block""),
(SELECT MAX(""blockNumber"") FROM ""System_Block"")
) AS first_gap
LEFT JOIN ""System_Block"" sb ON first_gap = sb.""blockNumber""
WHERE sb.""blockNumber"" IS NULL
SELECT (prev.""blockNumber"" + 1) AS gap_start
FROM (
SELECT ""blockNumber"", LEAD(""blockNumber"") OVER (ORDER BY ""blockNumber"") AS next_block_number
FROM (
SELECT ""blockNumber"" FROM ""System_Block"" ORDER BY ""blockNumber"" DESC LIMIT 1000000
) AS sub
) AS prev
WHERE prev.next_block_number - prev.""blockNumber"" > 1
ORDER BY gap_start
LIMIT 1;
";

Expand Down Expand Up @@ -336,7 +334,7 @@ public async Task DeleteFromBlockOnwards(long reorgAt)
await using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();

await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.Serializable);
await using var transaction = await connection.BeginTransactionAsync();
try
{
foreach (var table in Schema.Tables.Values)
Expand Down
25 changes: 19 additions & 6 deletions Circles.Index/Plugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ public async Task Init(INethermindApi nethermindApi)
}
}

Interlocked.Exchange(ref _newItemsArrived, 1);
Interlocked.Exchange(ref _latestHeadToIndex, args.Block.Number);

HandleNewHead();
HandleNewHead(args.Block.Number);
};
}

Expand Down Expand Up @@ -187,15 +184,31 @@ private static void InitCaches(InterfaceLogger logger, IDatabase database)
}
}

private void HandleNewHead()
/// <summary>
/// Handles when a new head block is received.
/// It updates a buffer with the latest block number and starts the ProcessBlocksAsync task if it's not already running.
/// This method is non-blocking.
/// </summary>
/// <param name="blockNo">The new chain head</param>
private void HandleNewHead(long blockNo)
{
// Start the processing task if not already running
// Signal that new items have arrived
Interlocked.Exchange(ref _newItemsArrived, 1);

// Signal which block is the new head
Interlocked.Exchange(ref _latestHeadToIndex, blockNo);

// Start the processing task if it's not already running
if (Interlocked.CompareExchange(ref _isProcessing, 1, 0) == 0)
{
Task.Run(ProcessBlocksAsync, _cancellationTokenSource.Token);
}
}

/// <summary>
/// Handles the latest head block (according to the value in _latestHeadToIndex) by passing it to the _indexerMachine.
/// If a new event arrived during processing, it will keep processing until no new events arrived and the value of _latestHeadToIndex is -1.
/// </summary>
private async Task ProcessBlocksAsync()
{
try
Expand Down

0 comments on commit f0c7468

Please sign in to comment.