Skip to content

Commit

Permalink
feat: initial peer governor (#288)
Browse files Browse the repository at this point in the history
* return connection instead of wrapper in connmanager
* migrate host/topology tracking to peergov
* move additional connection close handling into connmanager
* generate connection closed event
* migrate outbound connection tracking to peergov
* query peergov for peers in peersharing
  • Loading branch information
agaffney authored Dec 17, 2024
1 parent 6717fc3 commit 4aadb1c
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 436 deletions.
3 changes: 1 addition & 2 deletions blockfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ func (n *Node) blockfetchClientRequestRange(
if conn == nil {
return fmt.Errorf("failed to lookup connection ID: %s", connId.String())
}
oConn := conn.Conn
if err := oConn.BlockFetch().Client.GetBlockRange(start, end); err != nil {
if err := conn.BlockFetch().Client.GetBlockRange(start, end); err != nil {
return err
}
return nil
Expand Down
7 changes: 3 additions & 4 deletions chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func (n *Node) chainsyncClientStart(connId ouroboros.ConnectionId) error {
if conn == nil {
return fmt.Errorf("failed to lookup connection ID: %s", connId.String())
}
oConn := conn.Conn
intersectPoints, err := n.ledgerState.RecentChainPoints(
chainsyncIntersectPointCount,
)
Expand All @@ -63,15 +62,15 @@ func (n *Node) chainsyncClientStart(connId ouroboros.ConnectionId) error {
if len(intersectPoints) == 0 {
if n.config.intersectTip {
// Start initial chainsync from current chain tip
tip, err := oConn.ChainSync().Client.GetCurrentTip()
tip, err := conn.ChainSync().Client.GetCurrentTip()
if err != nil {
return err
}
intersectPoints = append(
intersectPoints,
tip.Point,
)
return oConn.ChainSync().Client.Sync(intersectPoints)
return conn.ChainSync().Client.Sync(intersectPoints)
} else if len(n.config.intersectPoints) > 0 {
// Start initial chainsync at specific point(s)
intersectPoints = append(
Expand All @@ -80,7 +79,7 @@ func (n *Node) chainsyncClientStart(connId ouroboros.ConnectionId) error {
)
}
}
return oConn.ChainSync().Client.Sync(intersectPoints)
return conn.ChainSync().Client.Sync(intersectPoints)
}

func (n *Node) chainsyncServerFindIntersect(
Expand Down
169 changes: 22 additions & 147 deletions connmanager/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,59 +15,21 @@
package connmanager

import (
"fmt"
"io"
"log/slog"
"sync"

"github.com/blinklabs-io/dingo/event"
"github.com/blinklabs-io/dingo/topology"

ouroboros "github.com/blinklabs-io/gouroboros"
)

// ConnectionManagerConnClosedFunc is a function that takes a connection ID and an optional error
type ConnectionManagerConnClosedFunc func(ouroboros.ConnectionId, error)

// ConnectionManagerTag represents the various tags that can be associated with a host or connection
type ConnectionManagerTag uint16

const (
ConnectionManagerTagNone ConnectionManagerTag = iota

ConnectionManagerTagHostLocalRoot
ConnectionManagerTagHostPublicRoot
ConnectionManagerTagHostBootstrapPeer
ConnectionManagerTagHostP2PLedger
ConnectionManagerTagHostP2PGossip

ConnectionManagerTagRoleInitiator
ConnectionManagerTagRoleResponder
// TODO: add more tags
)

func (c ConnectionManagerTag) String() string {
tmp := map[ConnectionManagerTag]string{
ConnectionManagerTagHostLocalRoot: "HostLocalRoot",
ConnectionManagerTagHostPublicRoot: "HostPublicRoot",
ConnectionManagerTagHostBootstrapPeer: "HostBootstrapPeer",
ConnectionManagerTagHostP2PLedger: "HostP2PLedger",
ConnectionManagerTagHostP2PGossip: "HostP2PGossip",
ConnectionManagerTagRoleInitiator: "RoleInitiator",
ConnectionManagerTagRoleResponder: "RoleResponder",
// TODO: add more tags to match those added above
}
ret, ok := tmp[c]
if !ok {
return "Unknown"
}
return ret
}

type ConnectionManager struct {
config ConnectionManagerConfig
hosts []ConnectionManagerHost
connections map[ouroboros.ConnectionId]*ConnectionManagerConnection
connections map[ouroboros.ConnectionId]*ouroboros.Connection
connectionsMutex sync.Mutex
}

Expand All @@ -80,12 +42,6 @@ type ConnectionManagerConfig struct {
OutboundSourcePort uint
}

type ConnectionManagerHost struct {
Address string
Port uint
Tags map[ConnectionManagerTag]bool
}

func NewConnectionManager(cfg ConnectionManagerConfig) *ConnectionManager {
if cfg.Logger == nil {
cfg.Logger = slog.New(slog.NewJSONHandler(io.Discard, nil))
Expand All @@ -94,7 +50,7 @@ func NewConnectionManager(cfg ConnectionManagerConfig) *ConnectionManager {
return &ConnectionManager{
config: cfg,
connections: make(
map[ouroboros.ConnectionId]*ConnectionManagerConnection,
map[ouroboros.ConnectionId]*ouroboros.Connection,
),
}
}
Expand All @@ -106,75 +62,32 @@ func (c *ConnectionManager) Start() error {
return nil
}

func (c *ConnectionManager) AddHost(
address string,
port uint,
tags ...ConnectionManagerTag,
) {
tmpTags := map[ConnectionManagerTag]bool{}
for _, tag := range tags {
tmpTags[tag] = true
}
cmHost := ConnectionManagerHost{
Address: address,
Port: port,
Tags: tmpTags,
}

c.config.Logger.Debug(
fmt.Sprintf(
"connmanager: adding host: %+v",
cmHost,
),
)

c.hosts = append(
c.hosts,
cmHost,
)
}

func (c *ConnectionManager) AddHostsFromTopology(
topologyConfig *topology.TopologyConfig,
) {
for _, bootstrapPeer := range topologyConfig.BootstrapPeers {
c.AddHost(
bootstrapPeer.Address,
bootstrapPeer.Port,
ConnectionManagerTagHostBootstrapPeer,
)
}
for _, localRoot := range topologyConfig.LocalRoots {
for _, host := range localRoot.AccessPoints {
c.AddHost(
host.Address,
host.Port,
ConnectionManagerTagHostLocalRoot,
)
}
}
for _, publicRoot := range topologyConfig.PublicRoots {
for _, host := range publicRoot.AccessPoints {
c.AddHost(
host.Address,
host.Port,
ConnectionManagerTagHostPublicRoot,
)
}
}
}

func (c *ConnectionManager) AddConnection(conn *ouroboros.Connection) {
connId := conn.Id()
c.connectionsMutex.Lock()
c.connections[connId] = &ConnectionManagerConnection{
Conn: conn,
}
c.connections[connId] = conn
c.connectionsMutex.Unlock()
go func() {
err := <-conn.ErrorChan()
// Remove connection
c.RemoveConnection(connId)
// Generate event
if c.config.EventBus != nil {
c.config.EventBus.Publish(
ConnectionClosedEventType,
event.NewEvent(
ConnectionClosedEventType,
ConnectionClosedEvent{
ConnectionId: connId,
Error: err,
},
),
)
}
// Call configured connection closed callback func
c.config.ConnClosedFunc(connId, err)
if c.config.ConnClosedFunc != nil {
c.config.ConnClosedFunc(connId, err)
}
}()
}

Expand All @@ -186,46 +99,8 @@ func (c *ConnectionManager) RemoveConnection(connId ouroboros.ConnectionId) {

func (c *ConnectionManager) GetConnectionById(
connId ouroboros.ConnectionId,
) *ConnectionManagerConnection {
) *ouroboros.Connection {
c.connectionsMutex.Lock()
defer c.connectionsMutex.Unlock()
return c.connections[connId]
}

func (c *ConnectionManager) GetConnectionsByTags(
tags ...ConnectionManagerTag,
) []*ConnectionManagerConnection {
var ret []*ConnectionManagerConnection
c.connectionsMutex.Lock()
for _, conn := range c.connections {
skipConn := false
for _, tag := range tags {
if _, ok := conn.Tags[tag]; !ok {
skipConn = true
break
}
}
if !skipConn {
ret = append(ret, conn)
}
}
c.connectionsMutex.Unlock()
return ret
}

type ConnectionManagerConnection struct {
Conn *ouroboros.Connection
Tags map[ConnectionManagerTag]bool
}

func (c *ConnectionManagerConnection) AddTags(tags ...ConnectionManagerTag) {
for _, tag := range tags {
c.Tags[tag] = true
}
}

func (c *ConnectionManagerConnection) RemoveTags(tags ...ConnectionManagerTag) {
for _, tag := range tags {
delete(c.Tags, tag)
}
}
11 changes: 8 additions & 3 deletions connmanager/connection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/goleak"
)

/*
func TestConnectionManagerTagString(t *testing.T) {
testDefs := map[connmanager.ConnectionManagerTag]string{
connmanager.ConnectionManagerTagHostP2PLedger: "HostP2PLedger",
Expand All @@ -47,6 +48,7 @@ func TestConnectionManagerTagString(t *testing.T) {
}
}
}
*/

func TestConnectionManagerConnError(t *testing.T) {
defer goleak.VerifyNone(t)
Expand Down Expand Up @@ -77,6 +79,7 @@ func TestConnectionManagerConnError(t *testing.T) {
},
)
testIdx := 2
var connIds []ouroboros.ConnectionId
for i := 0; i < 3; i++ {
mockConversation := ouroboros_mock.ConversationKeepAlive
if i == testIdx {
Expand Down Expand Up @@ -106,13 +109,15 @@ func TestConnectionManagerConnError(t *testing.T) {
expectedConnId = oConn.Id()
}
connManager.AddConnection(oConn)
connIds = append(connIds, oConn.Id())
}
select {
case <-doneChan:
// Shutdown other connections
for _, tmpConn := range connManager.GetConnectionsByTags() {
if tmpConn.Conn.Id() != expectedConnId {
tmpConn.Conn.Close()
for _, connId := range connIds {
if connId != expectedConnId {
tmpConn := connManager.GetConnectionById(connId)
tmpConn.Close()
}
}
// TODO: actually wait for shutdown
Expand Down
6 changes: 6 additions & 0 deletions connmanager/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@ import (

const (
InboundConnectionEventType = "connmanager.inbound-conn"
ConnectionClosedEventType = "connmanager.conn-closed"
)

type InboundConnectionEvent struct {
ConnectionId ouroboros.ConnectionId
LocalAddr net.Addr
RemoteAddr net.Addr
}

type ConnectionClosedEvent struct {
ConnectionId ouroboros.ConnectionId
Error error
}
Loading

0 comments on commit 4aadb1c

Please sign in to comment.