diff --git a/rcmgr.go b/rcmgr.go index bfe15b1..e5aff9b 100644 --- a/rcmgr.go +++ b/rcmgr.go @@ -4,10 +4,12 @@ import ( "log" "github.com/dustin/go-humanize" + "github.com/pbnjay/memory" + "github.com/ipfs/rainbow/internal/fd" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/network" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - "github.com/pbnjay/memory" ) // Note: this comes from kubo/core/node/libp2p/rcmgr_defaults.go with minimal @@ -15,6 +17,39 @@ import ( var infiniteResourceLimits = rcmgr.InfiniteLimits.ToPartialLimitConfig().System +func makeResourceMgrs(maxMemory uint64, maxFD int, connMgrHighWater int, separateDHT bool) (bitswapHost, dhtHost network.ResourceManager, err error) { + if maxMemory == 0 { + maxMemory = uint64((float64(memory.TotalMemory()) * 0.85)) + } + if maxFD == 0 { + maxFD = fd.GetNumFDs() / 2 + } + + if !separateDHT { + mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeResourceManagerConfig(maxMemory, maxFD, connMgrHighWater))) + if err != nil { + return nil, nil, err + } + return mgr, nil, nil + } + + bitswapHostMem := uint64(float64(maxMemory) * 0.85) + bitswapHostFDs := int(float64(maxFD) * 0.75) + bitswapHostRcMgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeResourceManagerConfig(bitswapHostMem, bitswapHostFDs, connMgrHighWater))) + if err != nil { + return nil, nil, err + } + + dhtHostMem := maxMemory - bitswapHostMem + dhtHostFDs := maxFD - bitswapHostFDs + dhtHostRcMgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(makeSeparateDHTClientResourceManagerConfig(dhtHostMem, dhtHostFDs))) + if err != nil { + return nil, nil, err + } + + return bitswapHostRcMgr, dhtHostRcMgr, nil +} + func makeResourceManagerConfig(maxMemory uint64, maxFD int, connMgrHighWater int) (limitConfig rcmgr.ConcreteLimitConfig) { if maxMemory == 0 { maxMemory = uint64((float64(memory.TotalMemory()) * 0.85)) @@ -142,3 +177,85 @@ go-libp2p Resource Manager limits based on: // We already have a complete value thus pass in an empty ConcreteLimitConfig. return partialLimits.Build(rcmgr.ConcreteLimitConfig{}) } + +func makeSeparateDHTClientResourceManagerConfig(maxMemory uint64, maxFD int) (limitConfig rcmgr.ConcreteLimitConfig) { + // Being a DHT client should require very limited inbound connections or streams so we set those very low + systemConnsInbound := 30 + systemStreamsPerPeerInbound := 10 + + // For simplicity we set as much else to unlimited as possible + partialLimits := rcmgr.PartialLimitConfig{ + System: rcmgr.ResourceLimits{ + Memory: rcmgr.LimitVal64(maxMemory), + FD: rcmgr.LimitVal(maxFD), + + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.LimitVal(systemConnsInbound), + ConnsOutbound: rcmgr.Unlimited, + + Streams: rcmgr.Unlimited, + StreamsOutbound: rcmgr.Unlimited, + StreamsInbound: rcmgr.Unlimited, + }, + + // Transient connections won't cause any memory to be accounted for by the resource manager/accountant. + // Only established connections do. + // As a result, we can't rely on System.Memory to protect us from a bunch of transient connection being opened. + // We limit the same values as the System scope, but only allow the Transient scope to take 25% of what is allowed for the System scope. + Transient: rcmgr.ResourceLimits{ + Memory: rcmgr.LimitVal64(maxMemory / 4), + FD: rcmgr.LimitVal(maxFD / 4), + + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.LimitVal(systemConnsInbound / 4), + ConnsOutbound: rcmgr.Unlimited, + + Streams: rcmgr.Unlimited, + StreamsInbound: rcmgr.Unlimited, + StreamsOutbound: rcmgr.Unlimited, + }, + + AllowlistedSystem: infiniteResourceLimits, + AllowlistedTransient: infiniteResourceLimits, + ServiceDefault: infiniteResourceLimits, + ServicePeerDefault: infiniteResourceLimits, + ProtocolDefault: infiniteResourceLimits, + ProtocolPeerDefault: infiniteResourceLimits, + Conn: infiniteResourceLimits, + Stream: infiniteResourceLimits, + + // Limit the resources consumed by a peer. + // This doesn't protect us against intentional DoS attacks since an attacker can easily spin up multiple peers. + // We specify this limit against unintentional DoS attacks (e.g., a peer has a bug and is sending too much traffic intentionally). + // In that case we want to keep that peer's resource consumption contained. + // To keep this simple, we only constrain inbound connections and streams. + PeerDefault: rcmgr.ResourceLimits{ + Memory: rcmgr.Unlimited64, + FD: rcmgr.Unlimited, + Conns: rcmgr.Unlimited, + ConnsInbound: rcmgr.DefaultLimit, + ConnsOutbound: rcmgr.Unlimited, + Streams: rcmgr.Unlimited, + StreamsInbound: rcmgr.LimitVal(systemStreamsPerPeerInbound), + StreamsOutbound: rcmgr.Unlimited, + }, + } + + scalingLimitConfig := rcmgr.DefaultLimits + libp2p.SetDefaultServiceLimits(&scalingLimitConfig) + + // Anything set above in partialLimits that had a value of rcmgr.DefaultLimit will be overridden. + // Anything in scalingLimitConfig that wasn't defined in partialLimits above will be added (e.g., libp2p's default service limits). + partialLimits = partialLimits.Build(scalingLimitConfig.Scale(int64(maxMemory), maxFD)).ToPartialLimitConfig() + + log.Printf(` + +go-libp2p Separate DHT Resource Manager limits based on: + - --max-memory: %s + - --max-fd: %d + +`, humanize.Bytes(maxMemory), maxFD) + + // We already have a complete value thus pass in an empty ConcreteLimitConfig. + return partialLimits.Build(rcmgr.ConcreteLimitConfig{}) +} diff --git a/setup.go b/setup.go index 17ece4d..b96a90f 100644 --- a/setup.go +++ b/setup.go @@ -46,7 +46,6 @@ import ( "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats/view" @@ -118,8 +117,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached return nil, err } - limiter := rcmgr.NewFixedLimiter(makeResourceManagerConfig(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi)) - mgr, err := rcmgr.NewResourceManager(limiter) + bitswapRcMgr, dhtRcMgr, err := makeResourceMgrs(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi, !cfg.DHTSharedHost) if err != nil { return nil, err } @@ -132,7 +130,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached libp2p.BandwidthReporter(bwc), libp2p.DefaultTransports, libp2p.DefaultMuxers, - libp2p.ResourceManager(mgr), + libp2p.ResourceManager(bitswapRcMgr), } if len(cfg.AnnounceAddrs) > 0 { @@ -199,18 +197,12 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached if cfg.DHTSharedHost { dhtHost = h } else { - dhtLimiter := rcmgr.NewFixedLimiter(makeResourceManagerConfig(cfg.MaxMemory, cfg.MaxFD, cfg.ConnMgrHi)) - dhtMgr, err := rcmgr.NewResourceManager(dhtLimiter) - if err != nil { - return nil, err - } - dhtHost, err = libp2p.New( libp2p.NoListenAddrs, libp2p.BandwidthReporter(bwc), libp2p.DefaultTransports, libp2p.DefaultMuxers, - libp2p.ResourceManager(dhtMgr), + libp2p.ResourceManager(dhtRcMgr), ) if err != nil { return nil, err