Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply 8644 on 9260 #9312

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
153 commits
Select commit Hold shift + click to select a range
d361ba6
sweep: add new state `TxFatal` for erroneous sweepings
yyforyongyu Apr 30, 2024
2217843
sweep: add new error `ErrZeroFeeRateDelta`
yyforyongyu Oct 25, 2024
d51bde9
sweep: add new interface method `Immediate`
yyforyongyu Oct 25, 2024
be1ec51
sweep: handle inputs locally instead of relying on the tx
yyforyongyu Oct 25, 2024
075e31e
sweep: add `handleInitialBroadcast` to handle initial broadcast
yyforyongyu Oct 25, 2024
3671a38
sweep: remove redundant error from `Broadcast`
yyforyongyu Apr 30, 2024
da3a6ab
sweep: add method `handleBumpEventError` and fix `markInputFailed`
yyforyongyu Apr 30, 2024
37536c7
sweep: add method `isMature` on `SweeperInput`
yyforyongyu Apr 30, 2024
de76205
sweep: make sure defaultDeadline is derived from the mature height
yyforyongyu Apr 30, 2024
0216b9c
sweep: remove redundant loopvar assign
yyforyongyu Oct 25, 2024
4abd9ba
sweep: break `initialBroadcast` into two steps
yyforyongyu Nov 7, 2024
e783439
sweep: make sure nil tx is handled
yyforyongyu Nov 7, 2024
46e0a43
chainio: introduce `chainio` to handle block synchronization
yyforyongyu Jun 27, 2024
c77c230
chainio: implement `Blockbeat`
yyforyongyu Jun 27, 2024
08520b0
chainio: add helper methods to dispatch beats
yyforyongyu Oct 31, 2024
0fa9406
chainio: add `BlockbeatDispatcher` to dispatch blockbeats
yyforyongyu Jun 27, 2024
029aa31
chainio: add partial implementation of `Consumer` interface
yyforyongyu Oct 17, 2024
26e034a
multi: implement `Consumer` on subsystems
yyforyongyu Oct 29, 2024
a00120f
sweep: remove block subscription in `UtxoSweeper` and `TxPublisher`
yyforyongyu Jun 4, 2024
36ef0c3
sweep: remove redundant notifications during shutdown
yyforyongyu Nov 18, 2024
bb78060
contractcourt: remove `waitForHeight` in resolvers
yyforyongyu Jun 4, 2024
58153f7
contractcourt: remove block subscription in chain arbitrator
yyforyongyu Oct 29, 2024
981e9c2
contractcourt: remove block subscription in channel arbitrator
yyforyongyu Oct 29, 2024
e3167ca
contractcourt: remove the `immediate` param used in `Resolve`
yyforyongyu Jun 4, 2024
a24ca9e
contractcourt: start channel arbitrator with blockbeat
yyforyongyu Oct 29, 2024
952869c
multi: start consumers with a starting blockbeat
yyforyongyu Oct 29, 2024
bb42ff5
lnd: add new method `startLowLevelServices`
yyforyongyu Oct 17, 2024
1292c9b
lnd: start `blockbeatDispatcher` and register consumers
yyforyongyu Oct 17, 2024
ac409a0
contractcourt: fix linter `funlen`
yyforyongyu Oct 29, 2024
4005e2a
multi: improve loggings
yyforyongyu May 22, 2024
5b3b60c
chainio: use `errgroup` to limit num of goroutines
yyforyongyu Nov 19, 2024
660fa82
contractcourt: add verbose logging in resolvers
yyforyongyu Jun 20, 2024
cd1a184
contractcourt: add spend path helpers in timeout/success resolver
yyforyongyu Nov 13, 2024
91cd342
contractcourt: add sweep senders in `htlcSuccessResolver`
yyforyongyu Nov 14, 2024
a5c05ae
contractcourt: add resolver handlers in `htlcSuccessResolver`
yyforyongyu Nov 14, 2024
f1a9e6d
contractcourt: remove redundant return value in `claimCleanUp`
yyforyongyu Nov 14, 2024
982470c
contractcourt: add sweep senders in `htlcTimeoutResolver`
yyforyongyu Nov 14, 2024
dda8208
contractcourt: add methods to checkpoint states
yyforyongyu Jul 16, 2024
fab3acf
contractcourt: add resolve handlers in `htlcTimeoutResolver`
yyforyongyu Jul 16, 2024
0d5908e
contractcourt: add `Launch` method to anchor/breach resolver
yyforyongyu Jun 24, 2024
e75f396
contractcourt: add `Launch` method to commit resolver
yyforyongyu Jun 20, 2024
aa7fcd0
contractcourt: add `Launch` method to htlc success resolver
yyforyongyu Jul 15, 2024
9ade1ef
contractcourt: add `Launch` method to htlc timeout resolver
yyforyongyu Jul 16, 2024
cea75bf
invoices: exit early when the subscriber chan is nil
yyforyongyu Nov 17, 2024
566eb80
contractcourt: add `Launch` method to incoming contest resolver
yyforyongyu Nov 17, 2024
f4e8e75
contractcourt: add `Launch` method to outgoing contest resolver
yyforyongyu Jun 20, 2024
600732e
contractcourt: fix concurrent access to `resolved`
yyforyongyu Jul 10, 2024
0d1e81b
contractcourt: fix concurrent access to `launched`
yyforyongyu Jul 11, 2024
aeafa63
contractcourt: break `launchResolvers` into two steps
yyforyongyu Jun 25, 2024
29521a1
contractcourt: offer outgoing htlc one block earlier before its expiry
yyforyongyu Nov 25, 2024
097079e
contractcourt: implement `Consumer` on `chainWatcher`
yyforyongyu Jun 20, 2024
8f6f9e2
contractcourt: register spend notification during init
yyforyongyu Nov 16, 2024
6870b73
contractcourt: add method `handleCommitSpend`
yyforyongyu Nov 16, 2024
9631dbb
contractcourt: handle blockbeat in `chainWatcher`
yyforyongyu Nov 16, 2024
ca92160
contractcourt: notify blockbeat for `chainWatcher`
yyforyongyu Nov 16, 2024
530ac1b
contractcourt: provide a shortcut to `ChannelPoint`
yyforyongyu Jun 28, 2024
808e3a6
multi: add new method `ChainArbitrator.RedispatchBlockbeat`
yyforyongyu Nov 13, 2024
651d4d4
contractcourt: add close event handlers in `ChannelArbitrator`
yyforyongyu Nov 16, 2024
b058383
contractcourt: process channel close event on new beat
yyforyongyu Nov 13, 2024
3d48f0c
contractcourt: register conf notification once and cancel when confirmed
yyforyongyu Nov 22, 2024
d785469
chainntnfs: skip dispatched conf details
yyforyongyu Nov 25, 2024
5c9c6d0
docs: add release notes for `blockbeat` series
yyforyongyu Nov 25, 2024
9757c4b
multi: optimize loggings around changes from `blockbeat`
yyforyongyu Oct 17, 2024
9bd342e
lntest+itest: fix `testSweepCPFPAnchorOutgoingTimeout`
yyforyongyu Oct 24, 2024
72e17a4
itest: fix `testSweepCPFPAnchorIncomingTimeout`
yyforyongyu Oct 24, 2024
3023c35
itest: fix `testSweepHTLCs`
yyforyongyu Oct 24, 2024
79a8260
itest: fix `testSweepCommitOutputAndAnchor`
yyforyongyu Oct 24, 2024
29bb9b8
itest: fix `testBumpForceCloseFee`
yyforyongyu Oct 17, 2024
d116a56
itest: fix `testPaymentSucceededHTLCRemoteSwept`
yyforyongyu Oct 24, 2024
50d8d75
lntest+itest: start flattening the multi-hop tests
yyforyongyu Oct 17, 2024
1607d4b
itest: simplify and flatten `testMultiHopReceiverChainClaim`
yyforyongyu Oct 18, 2024
2d3b28b
lntest+itest: flatten `testMultiHopLocalForceCloseOnChainHtlcTimeout`
yyforyongyu Oct 18, 2024
493650e
lntest+itest: flatten `testMultiHopRemoteForceCloseOnChainHtlcTimeout`
yyforyongyu Oct 19, 2024
6c50559
itest: flatten `testMultiHopHtlcLocalChainClaim`
yyforyongyu Oct 21, 2024
aa0ca88
itest: flatten `testMultiHopHtlcRemoteChainClaim`
yyforyongyu Oct 22, 2024
bdca3af
itest: flatten `testMultiHopHtlcAggregation`
yyforyongyu Oct 23, 2024
291919a
itest: flatten `testHtlcTimeoutResolverExtractPreimageLocal`
yyforyongyu Oct 23, 2024
6db372c
itest: flatten `testHtlcTimeoutResolverExtractPreimageRemote`
yyforyongyu Oct 23, 2024
820a4e5
itest: rename file to reflect the tests
yyforyongyu Oct 23, 2024
5581c7a
itest: remove unnecessary force close
yyforyongyu Oct 23, 2024
35aa809
itest: remove redundant block mining in `testFailingChannel`
yyforyongyu Oct 24, 2024
fda9ef1
itest: remove redunant block mining in `testChannelFundingWithUnstabl…
yyforyongyu Oct 24, 2024
70c2f3f
itest: remove redudant block in `testPsbtChanFundingWithUnstableUtxos`
yyforyongyu Oct 24, 2024
241da54
itest: remove redundant blocks in channel backup tests
yyforyongyu Oct 24, 2024
b3a1489
itest+lntest: fix channel force close test
yyforyongyu Jun 29, 2024
6cf665d
itest: flatten and fix `testWatchtower`
yyforyongyu Oct 25, 2024
897803b
itest: remove redundant block in multiple tests
yyforyongyu Oct 25, 2024
fe7d5c1
itest: assert payment status after sending
yyforyongyu Oct 24, 2024
b8da896
lntest+itest: remove the usage of `ht.AssertActiveHtlcs`
yyforyongyu Nov 5, 2024
84c8aa6
htlcswitch: handle nil circuit properly when settling
yyforyongyu Nov 10, 2024
f777264
routing: fix nil pointer dereference in `exitWithErr`
yyforyongyu Nov 7, 2024
0725cb5
lnwallet: add debug logs
yyforyongyu Nov 5, 2024
13ef2ee
itest: print num of blocks for debugging
yyforyongyu Oct 25, 2024
bbc4945
itest: shuffle test cases to even out blocks mined in tranches
yyforyongyu Oct 24, 2024
a7b69c6
workflows: pass action ID as the shuffle seed
yyforyongyu Nov 7, 2024
cffea2f
itest: remove direct reference to stanby nodes
yyforyongyu Oct 26, 2024
f33e55f
itest: remove the use of standby nodes
yyforyongyu Nov 20, 2024
6a743f4
itest: remove unused method `setupFourHopNetwork`
yyforyongyu Nov 20, 2024
b12a16f
itest+lntest: remove standby nodes
yyforyongyu Nov 20, 2024
6d3ed89
itest: remove unnecessary channel close and node shutdown
yyforyongyu Oct 29, 2024
822b71a
lntest: make sure node is properly shut down
yyforyongyu Nov 2, 2024
0019cb3
lntest: add human-readble names and check num of nodes
yyforyongyu Nov 20, 2024
1ffe921
itest: fix `testOpenChannelUpdateFeePolicy`
yyforyongyu Oct 29, 2024
c6819c0
itest: fix flake in `testSendDirectPayment`
yyforyongyu Nov 20, 2024
50e2495
itest: fix spawning temp miner
yyforyongyu Oct 29, 2024
71eab38
itest: fix flake for neutrino backend
yyforyongyu Oct 30, 2024
494f1e5
itest: flatten PSBT funding test cases
yyforyongyu Oct 30, 2024
8082b93
itest: fix and document flake in sweeping tests
yyforyongyu Oct 30, 2024
844b100
itest: remove loop in `wsTestCaseBiDirectionalSubscription`
yyforyongyu Oct 30, 2024
c270401
itest: fix flake in `testRevokedCloseRetributionZeroValueRemoteOutput`
yyforyongyu Nov 2, 2024
c6b7aa7
itest: fix flake in `testSwitchOfflineDelivery`
yyforyongyu Nov 3, 2024
dda51c2
itest+routing: fix flake in `runFeeEstimationTestCase`
yyforyongyu Nov 3, 2024
d388c88
itest: use `ht.CreateSimpleNetwork` whenever applicable
yyforyongyu Nov 3, 2024
45ee9e5
itest: put mpp tests in one file
yyforyongyu Nov 4, 2024
3c4b63a
lntest+itest: remove `AssertNumActiveEdges`
yyforyongyu Nov 8, 2024
e005582
itest: fix flake in runPsbtChanFundingWithNodes
yyforyongyu Nov 10, 2024
4f7d952
itest: fix flake in `testPrivateUpdateAlias`
yyforyongyu Nov 16, 2024
076885b
itest: fix flake in `update_pending_open_channels`
yyforyongyu Nov 16, 2024
cb3b285
lntest: increase `rpcmaxwebsockets` for `btcd`
yyforyongyu Nov 20, 2024
a6bbb3f
itest: document details about MPP-related tests
yyforyongyu Nov 21, 2024
f9c0cd5
itest+lntest: fix flake in MPP-related tests
yyforyongyu Nov 21, 2024
d8d2a54
lntest: fix flakeness in `openChannelsForNodes`
yyforyongyu Nov 22, 2024
bdfa085
itest: optimize blocks mined in `testGarbageCollectLinkNodes`
yyforyongyu Nov 8, 2024
98c7a92
itest: break remote signer into independent cases
yyforyongyu Nov 8, 2024
136c6f3
itest: break down channel restore commit types cases
yyforyongyu Nov 8, 2024
a3473ec
itest: break down utxo selection funding tests
yyforyongyu Nov 8, 2024
db84d56
itest: break all multihop test cases
yyforyongyu Nov 8, 2024
006f656
itest: break down scid alias channel update tests
yyforyongyu Nov 8, 2024
676b925
itest: break down open channel fee policy
yyforyongyu Nov 8, 2024
6b84e60
itest: break down payment failed tests
yyforyongyu Nov 8, 2024
27b0489
itest: break down channel backup restore tests
yyforyongyu Nov 8, 2024
7a08b78
itest: break down wallet import account tests
yyforyongyu Nov 8, 2024
41d1203
itest: break down basic funding flow tests
yyforyongyu Nov 9, 2024
c0d284c
itest: break down single hop send to route
yyforyongyu Nov 9, 2024
c18e356
itest: break down taproot tests
yyforyongyu Nov 9, 2024
ce88b8b
itest: break down channel fundmax tests
yyforyongyu Nov 9, 2024
aace64e
itest: further reduce block mined in tests
yyforyongyu Nov 9, 2024
3535cca
itest: track and skip flaky tests for windows
yyforyongyu Nov 7, 2024
f2d5b36
lntest: increase node start timeout and payment benchmark timeout
yyforyongyu Nov 9, 2024
41f2387
lntest: make sure policies are populated in `AssertChannelInGraph`
yyforyongyu Nov 21, 2024
b011d8c
workflows: use `btcd` for macOS
yyforyongyu Nov 9, 2024
be55355
itest+lntest: add new method `FundNumCoins`
yyforyongyu Nov 25, 2024
87ea4f0
lntest: limit the num of blocks mined in each test
yyforyongyu Nov 25, 2024
7ed3548
docs: update release notes
yyforyongyu Nov 26, 2024
7a07c37
Reapply "kvdb/postgres: remove global application level lock"
aakselrod Oct 30, 2024
087b977
go.mod: use local kvdb to reapply removal of global postgres lock
aakselrod Oct 30, 2024
a06d29c
itest: fix flake in multi-hop payments
aakselrod Nov 18, 2024
1b79205
batch: handle serialization errors correctly
aakselrod Nov 2, 2024
41ccd4c
channeldb: handle previously-unhandled errors
aakselrod Nov 5, 2024
32edc74
sqldb: improve serialization error handling
aakselrod Nov 6, 2024
837e092
Makefile: tune params for db-instance for postgres itests
aakselrod Nov 6, 2024
0ce4384
Makefile: log to file instead of console
aakselrod Nov 15, 2024
0fbf7b5
github workflow: save postgres log to zip file
aakselrod Nov 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ jobs:
run: ./scripts/install_bitcoind.sh $BITCOIN_VERSION

- name: run ${{ matrix.name }}
run: make itest-parallel tranches=${{ env.TRANCHES }} ${{ matrix.args }}
run: make itest-parallel tranches=${{ env.TRANCHES }} ${{ matrix.args }} shuffleseed=${{ github.run_id }}${{ strategy.job-index }}

- name: Send coverage
if: ${{ contains(matrix.args, 'cover=1') }}
Expand All @@ -297,7 +297,7 @@ jobs:
- name: Zip log files on failure
if: ${{ failure() }}
timeout-minutes: 5 # timeout after 5 minute
run: 7z a logs-itest-${{ matrix.name }}.zip itest/**/*.log
run: 7z a logs-itest-${{ matrix.name }}.zip itest/**/*.log itest/postgres.log

- name: Upload log files on failure
uses: actions/upload-artifact@v3
Expand Down Expand Up @@ -332,7 +332,7 @@ jobs:
key-prefix: integration-test

- name: run itest
run: make itest-parallel tranches=${{ env.TRANCHES }} windows=1
run: make itest-parallel tranches=${{ env.TRANCHES }} windows=1 shuffleseed=${{ github.run_id }}

- name: kill any remaining lnd processes
if: ${{ failure() }}
Expand Down Expand Up @@ -375,14 +375,8 @@ jobs:
go-version: '${{ env.GO_VERSION }}'
key-prefix: integration-test

- name: install bitcoind
run: |
wget https://bitcoincore.org/bin/bitcoin-core-${BITCOIN_VERSION}.0/bitcoin-${BITCOIN_VERSION}.0-arm64-apple-darwin.tar.gz
tar zxvf bitcoin-${BITCOIN_VERSION}.0-arm64-apple-darwin.tar.gz
mv bitcoin-${BITCOIN_VERSION}.0 /tmp/bitcoin

- name: run itest
run: PATH=$PATH:/tmp/bitcoin/bin make itest-parallel tranches=${{ env.TRANCHES }} backend=bitcoind
run: PATH=$PATH:/tmp/bitcoin/bin make itest-parallel tranches=${{ env.TRANCHES }} shuffleseed=${{ github.run_id }}

- name: Zip log files on failure
if: ${{ failure() }}
Expand Down
25 changes: 15 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,25 @@ ifeq ($(dbbackend),postgres)
docker rm lnd-postgres --force || echo "Starting new postgres container"

# Start a fresh postgres instance. Allow a maximum of 500 connections so
# that multiple lnd instances with a maximum number of connections of 50
# each can run concurrently.
docker run --name lnd-postgres -e POSTGRES_PASSWORD=postgres -p 6432:5432 -d postgres:13-alpine -N 500
docker logs -f lnd-postgres &
# that multiple lnd instances with a maximum number of connections of 20
# each can run concurrently. Note that many of the settings here are
# specifically for integration testing and are not fit for running
# production nodes.
docker run --name lnd-postgres -e POSTGRES_PASSWORD=postgres -p 6432:5432 -d postgres:13-alpine -N 1500 -c max_pred_locks_per_transaction=1024 -c max_locks_per_transaction=128 -c jit=off -c work_mem=8MB -c checkpoint_timeout=10min -c enable_seqscan=off
docker logs -f lnd-postgres >itest/postgres.log 2>&1 &

# Wait for the instance to be started.
sleep $(POSTGRES_START_DELAY)
endif

clean-itest-logs:
rm -rf itest/*.log itest/.logs-*

#? itest-only: Only run integration tests without re-building binaries
itest-only: db-instance
itest-only: clean-itest-logs db-instance
@$(call print, "Running integration tests with ${backend} backend.")
rm -rf itest/*.log itest/.logs-*; date
EXEC_SUFFIX=$(EXEC_SUFFIX) scripts/itest_part.sh 0 1 $(TEST_FLAGS) $(ITEST_FLAGS) -test.v
date
EXEC_SUFFIX=$(EXEC_SUFFIX) scripts/itest_part.sh 0 1 $(SHUFFLE_SEED) $(TEST_FLAGS) $(ITEST_FLAGS) -test.v
$(COLLECT_ITEST_COVERAGE)

#? itest: Build and run integration tests
Expand All @@ -218,10 +223,10 @@ itest: build-itest itest-only
itest-race: build-itest-race itest-only

#? itest-parallel: Build and run integration tests in parallel mode, running up to ITEST_PARALLELISM test tranches in parallel (default 4)
itest-parallel: build-itest db-instance
itest-parallel: clean-itest-logs build-itest db-instance
@$(call print, "Running tests")
rm -rf itest/*.log itest/.logs-*; date
EXEC_SUFFIX=$(EXEC_SUFFIX) scripts/itest_parallel.sh $(ITEST_PARALLELISM) $(NUM_ITEST_TRANCHES) $(TEST_FLAGS) $(ITEST_FLAGS)
date
EXEC_SUFFIX=$(EXEC_SUFFIX) scripts/itest_parallel.sh $(ITEST_PARALLELISM) $(NUM_ITEST_TRANCHES) $(SHUFFLE_SEED) $(TEST_FLAGS) $(ITEST_FLAGS)
$(COLLECT_ITEST_COVERAGE)

#? itest-clean: Kill all running itest processes
Expand Down
15 changes: 14 additions & 1 deletion batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/sqldb"
)

// errSolo is a sentinel error indicating that the requester should re-run the
Expand Down Expand Up @@ -55,7 +56,19 @@ func (b *batch) run() {
for i, req := range b.reqs {
err := req.Update(tx)
if err != nil {
failIdx = i
// If we get a serialization error, we
// want the underlying SQL retry
// mechanism to retry the entire batch.
// Otherwise, we can succeed in an
// sqldb retry and still re-execute the
// failing request individually.
dbErr := sqldb.MapSQLError(err)
if !sqldb.IsSerializationError(dbErr) {
failIdx = i

return dbErr
}

return err
}
}
Expand Down
152 changes: 152 additions & 0 deletions chainio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Chainio

`chainio` is a package designed to provide blockchain data access to various
subsystems within `lnd`. When a new block is received, it is encapsulated in a
`Blockbeat` object and disseminated to all registered consumers. Consumers may
receive these updates either concurrently or sequentially, based on their
registration configuration, ensuring that each subsystem maintains a
synchronized view of the current block state.

The main components include:

- `Blockbeat`: An interface that provides information about the block.

- `Consumer`: An interface that specifies how subsystems handle the blockbeat.

- `BlockbeatDispatcher`: The core service responsible for receiving each block
and distributing it to all consumers.

Additionally, the `BeatConsumer` struct provides a partial implementation of
the `Consumer` interface. This struct helps reduce code duplication, allowing
subsystems to avoid re-implementing the `ProcessBlock` method and provides a
commonly used `NotifyBlockProcessed` method.


### Register a Consumer

Consumers within the same queue are notified **sequentially**, while all queues
are notified **concurrently**. A queue consists of a slice of consumers, which
are notified in left-to-right order. Developers are responsible for determining
dependencies in block consumption across subsystems: independent subsystems
should be notified concurrently, whereas dependent subsystems should be
notified sequentially.

To notify the consumers concurrently, put them in different queues,
```go
// consumer1 and consumer2 will be notified concurrently.
queue1 := []chainio.Consumer{consumer1}
blockbeatDispatcher.RegisterQueue(consumer1)

queue2 := []chainio.Consumer{consumer2}
blockbeatDispatcher.RegisterQueue(consumer2)
```

To notify the consumers sequentially, put them in the same queue,
```go
// consumers will be notified sequentially via,
// consumer1 -> consumer2 -> consumer3
queue := []chainio.Consumer{
consumer1,
consumer2,
consumer3,
}
blockbeatDispatcher.RegisterQueue(queue)
```

### Implement the `Consumer` Interface

Implementing the `Consumer` interface is straightforward. Below is an example
of how
[`sweep.TxPublisher`](https://github.com/lightningnetwork/lnd/blob/5cec466fad44c582a64cfaeb91f6d5fd302fcf85/sweep/fee_bumper.go#L310)
implements this interface.

To start, embed the partial implementation `chainio.BeatConsumer`, which
already provides the `ProcessBlock` implementation and commonly used
`NotifyBlockProcessed` method, and exposes `BlockbeatChan` for the consumer to
receive blockbeats.

```go
type TxPublisher struct {
started atomic.Bool
stopped atomic.Bool

chainio.BeatConsumer

...
```

We should also remember to initialize this `BeatConsumer`,

```go
...
// Mount the block consumer.
tp.BeatConsumer = chainio.NewBeatConsumer(tp.quit, tp.Name())
```

Finally, in the main event loop, read from `BlockbeatChan`, process the
received blockbeat, and, crucially, call `tp.NotifyBlockProcessed` to inform
the blockbeat dispatcher that processing is complete.

```go
for {
select {
case beat := <-tp.BlockbeatChan:
// Consume this blockbeat, usually it means updating the subsystem
// using the new block data.

// Notify we've processed the block.
tp.NotifyBlockProcessed(beat, nil)

...
```

### Existing Queues

Currently, we have a single queue of consumers dedicated to handling force
closures. This queue includes `ChainArbitrator`, `UtxoSweeper`, and
`TxPublisher`, with `ChainArbitrator` managing two internal consumers:
`chainWatcher` and `ChannelArbitrator`. The blockbeat flows sequentially
through the chain as follows: `ChainArbitrator => chainWatcher =>
ChannelArbitrator => UtxoSweeper => TxPublisher`. The following diagram
illustrates the flow within the public subsystems.

```mermaid
sequenceDiagram
autonumber
participant bb as BlockBeat
participant cc as ChainArb
participant us as UtxoSweeper
participant tp as TxPublisher

note left of bb: 0. received block x,<br>dispatching...

note over bb,cc: 1. send block x to ChainArb,<br>wait for its done signal
bb->>cc: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
cc->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,us: 2. send block x to UtxoSweeper, wait for its done signal
bb->>us: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
us->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end

note over bb,tp: 3. send block x to TxPublisher, wait for its done signal
bb->>tp: block x
rect rgba(165, 0, 85, 0.8)
critical signal processed
tp->>bb: processed block
option Process error or timeout
bb->>bb: error and exit
end
end
```
55 changes: 55 additions & 0 deletions chainio/blockbeat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package chainio

import (
"fmt"

"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
)

// Beat implements the Blockbeat interface. It contains the block epoch and a
// customized logger.
//
// TODO(yy): extend this to check for confirmation status - which serves as the
// single source of truth, to avoid the potential race between receiving blocks
// and `GetTransactionDetails/RegisterSpendNtfn/RegisterConfirmationsNtfn`.
type Beat struct {
// epoch is the current block epoch the blockbeat is aware of.
epoch chainntnfs.BlockEpoch

// log is the customized logger for the blockbeat which prints the
// block height.
log btclog.Logger
}

// Compile-time check to ensure Beat satisfies the Blockbeat interface.
var _ Blockbeat = (*Beat)(nil)

// NewBeat creates a new beat with the specified block epoch and a customized
// logger.
func NewBeat(epoch chainntnfs.BlockEpoch) *Beat {
b := &Beat{
epoch: epoch,
}

// Create a customized logger for the blockbeat.
logPrefix := fmt.Sprintf("Height[%6d]:", b.Height())
b.log = build.NewPrefixLog(logPrefix, clog)

return b
}

// Height returns the height of the block epoch.
//
// NOTE: Part of the Blockbeat interface.
func (b *Beat) Height() int32 {
return b.epoch.Height
}

// logger returns the logger for the blockbeat.
//
// NOTE: Part of the private blockbeat interface.
func (b *Beat) logger() btclog.Logger {
return b.log
}
28 changes: 28 additions & 0 deletions chainio/blockbeat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package chainio

import (
"errors"
"testing"

"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/require"
)

var errDummy = errors.New("dummy error")

// TestNewBeat tests the NewBeat and Height functions.
func TestNewBeat(t *testing.T) {
t.Parallel()

// Create a testing epoch.
epoch := chainntnfs.BlockEpoch{
Height: 1,
}

// Create the beat and check the internal state.
beat := NewBeat(epoch)
require.Equal(t, epoch, beat.epoch)

// Check the height function.
require.Equal(t, epoch.Height, beat.Height())
}
Loading
Loading