diff --git a/setup_bitswap.go b/setup_bitswap.go index 374406d..55e6688 100644 --- a/setup_bitswap.go +++ b/setup_bitswap.go @@ -21,6 +21,8 @@ import ( "github.com/libp2p/go-libp2p/core/routing" ) +const perBlockTimeout = time.Second * 10 + func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface { bsctx := metri.CtxScope(ctx, "ipfs_bitswap") bn := bsnet.NewFromIpfsHost(h, cr) @@ -78,7 +80,13 @@ func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routi bsclient.WithoutDuplicatedBlockStats(), ) bn.Start(bswap) - return bswap + + wrapExch := &timeoutBlockExchange{ + inner: bswap, + perBlockTimeout: perBlockTimeout, + } + + return wrapExch } type noopPeerLedger struct{} @@ -120,3 +128,56 @@ func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks // Rainbow does not notify when we get new blocks in our Blockservice. return nil } + +type timeoutBlockExchange struct { + inner exchange.Interface + perBlockTimeout time.Duration +} + +func (t *timeoutBlockExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + ctx, cancel := context.WithTimeout(ctx, t.perBlockTimeout) + defer cancel() + return t.inner.GetBlock(ctx, c) +} + +func (t *timeoutBlockExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) { + ctx, cancel := context.WithCancel(ctx) + blocksCh, err := t.inner.GetBlocks(ctx, cids) + if err != nil { + cancel() + return nil, err + } + + retCh := make(chan blocks.Block) + go func() { + defer close(retCh) + defer cancel() + timer := time.NewTimer(t.perBlockTimeout) + for b := range blocksCh { + select { + case retCh <- b: + if !timer.Stop() { + <-timer.C + } + timer.Reset(t.perBlockTimeout) + case <-timer.C: + return + } + } + if !timer.Stop() { + <-timer.C + } + }() + + return retCh, nil +} + +func (t *timeoutBlockExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error { + return t.inner.NotifyNewBlocks(ctx, blocks...) +} + +func (t *timeoutBlockExchange) Close() error { + return t.inner.Close() +} + +var _ exchange.Interface = (*timeoutBlockExchange)(nil)