Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add resource manager for separate DHT libp2p host #54

Merged
merged 2 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,52 @@
"log"

"github.com/dustin/go-humanize"
"github.com/pbnjay/memory"

"github.com/ipfs/rainbow/internal/fd"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/network"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/pbnjay/memory"
)

// Note: this comes from kubo/core/node/libp2p/rcmgr_defaults.go with minimal
// adaptations.

var infiniteResourceLimits = rcmgr.InfiniteLimits.ToPartialLimitConfig().System

func makeResourceMgrs(maxMemory uint64, maxFD int, connMgrHighWater int, separateDHT bool) (bitswapHost, dhtHost network.ResourceManager, err error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here might be overkill and is pretty guesstimated, just wanted something other than disabling the resource manager for people to test with.

Recommendations appreciated 🙏

if maxMemory == 0 {
maxMemory = uint64((float64(memory.TotalMemory()) * 0.85))
}
if maxFD == 0 {
maxFD = fd.GetNumFDs() / 2
}

Check warning on line 26 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L20-L26

Added lines #L20 - L26 were not covered by tests

if !separateDHT {
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeResourceManagerConfig(maxMemory, maxFD, connMgrHighWater)))
if err != nil {
return nil, nil, err
}
return mgr, nil, nil

Check warning on line 33 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L28-L33

Added lines #L28 - L33 were not covered by tests
}

bitswapHostMem := uint64(float64(maxMemory) * 0.85)
bitswapHostFDs := int(float64(maxFD) * 0.75)
bitswapHostRcMgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeResourceManagerConfig(bitswapHostMem, bitswapHostFDs, connMgrHighWater)))
if err != nil {
return nil, nil, err
}

Check warning on line 41 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L36-L41

Added lines #L36 - L41 were not covered by tests

dhtHostMem := maxMemory - bitswapHostMem
dhtHostFDs := maxFD - bitswapHostFDs
dhtHostRcMgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeSeparateDHTClientResourceManagerConfig(dhtHostMem, dhtHostFDs)))
if err != nil {
return nil, nil, err
}

Check warning on line 48 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L43-L48

Added lines #L43 - L48 were not covered by tests

return bitswapHostRcMgr, dhtHostRcMgr, nil

Check warning on line 50 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L50

Added line #L50 was not covered by tests
}

func makeResourceManagerConfig(maxMemory uint64, maxFD int, connMgrHighWater int) (limitConfig rcmgr.ConcreteLimitConfig) {
if maxMemory == 0 {
maxMemory = uint64((float64(memory.TotalMemory()) * 0.85))
Expand Down Expand Up @@ -142,3 +177,85 @@
// We already have a complete value thus pass in an empty ConcreteLimitConfig.
return partialLimits.Build(rcmgr.ConcreteLimitConfig{})
}

func makeSeparateDHTClientResourceManagerConfig(maxMemory uint64, maxFD int) (limitConfig rcmgr.ConcreteLimitConfig) {
// Being a DHT client should require very limited inbound connections or streams so we set those very low
systemConnsInbound := 30
systemStreamsPerPeerInbound := 10

// For simplicity we set as much else to unlimited as possible
partialLimits := rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory),
FD: rcmgr.LimitVal(maxFD),

Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound),
ConnsOutbound: rcmgr.Unlimited,

Streams: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
},

// Transient connections won't cause any memory to be accounted for by the resource manager/accountant.
// Only established connections do.
// As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened.
// We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope.
Transient: rcmgr.ResourceLimits{
Memory: rcmgr.LimitVal64(maxMemory / 4),
FD: rcmgr.LimitVal(maxFD / 4),

Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4),
ConnsOutbound: rcmgr.Unlimited,

Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.Unlimited,
StreamsOutbound: rcmgr.Unlimited,
},

AllowlistedSystem: infiniteResourceLimits,
AllowlistedTransient: infiniteResourceLimits,
ServiceDefault: infiniteResourceLimits,
ServicePeerDefault: infiniteResourceLimits,
ProtocolDefault: infiniteResourceLimits,
ProtocolPeerDefault: infiniteResourceLimits,
Conn: infiniteResourceLimits,
Stream: infiniteResourceLimits,

// Limit the resources consumed by a peer.
// This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers.
// We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally).
// In that case we want to keep that peer's resource consumption contained.
// To keep this simple, we only constrain inbound connections and streams.
PeerDefault: rcmgr.ResourceLimits{
Memory: rcmgr.Unlimited64,
FD: rcmgr.Unlimited,
Conns: rcmgr.Unlimited,
ConnsInbound: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.Unlimited,
Streams: rcmgr.Unlimited,
StreamsInbound: rcmgr.LimitVal(systemStreamsPerPeerInbound),
StreamsOutbound: rcmgr.Unlimited,
},
}

scalingLimitConfig := rcmgr.DefaultLimits
libp2p.SetDefaultServiceLimits(&scalingLimitConfig)

// Anything set above in partialLimits that had a value of rcmgr.DefaultLimit will be overridden.
// Anything in scalingLimitConfig that wasn't defined in partialLimits above will be added (e.g., libp2p's default service limits).
partialLimits = partialLimits.Build(scalingLimitConfig.Scale(int64(maxMemory), maxFD)).ToPartialLimitConfig()

log.Printf(`

go-libp2p Separate DHT Resource Manager limits based on:
- --max-memory: %s
- --max-fd: %d

`, humanize.Bytes(maxMemory), maxFD)

// We already have a complete value thus pass in an empty ConcreteLimitConfig.
return partialLimits.Build(rcmgr.ConcreteLimitConfig{})

Check warning on line 260 in rcmgr.go

View check run for this annotation

Codecov / codecov/patch

rcmgr.go#L181-L260

Added lines #L181 - L260 were not covered by tests
}
7 changes: 3 additions & 4 deletions setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -118,8 +117,7 @@
return nil, err
}

limiter := rcmgr.NewFixedLimiter(makeResourceManagerConfig(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi))
mgr, err := rcmgr.NewResourceManager(limiter)
bitswapRcMgr, dhtRcMgr, err := makeResourceMgrs(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi, !cfg.DHTSharedHost)

Check warning on line 120 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L120

Added line #L120 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -132,7 +130,7 @@
libp2p.BandwidthReporter(bwc),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.ResourceManager(mgr),
libp2p.ResourceManager(bitswapRcMgr),

Check warning on line 133 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L133

Added line #L133 was not covered by tests
}

if len(cfg.AnnounceAddrs) > 0 {
Expand Down Expand Up @@ -204,6 +202,7 @@
libp2p.BandwidthReporter(bwc),
libp2p.DefaultTransports,
libp2p.DefaultMuxers,
libp2p.ResourceManager(dhtRcMgr),

Check warning on line 205 in setup.go

View check run for this annotation

Codecov / codecov/patch

setup.go#L205

Added line #L205 was not covered by tests
)
if err != nil {
return nil, err
Expand Down
Loading