Skip to content

Commit

Permalink
devp2p: upgrade to v5 (EIP-706) (#760)
Browse files Browse the repository at this point in the history
* drop support for v4 (obsolete, doesn't work with all clients since
they use chunking and other obsolete v4 features that we're missing or
don't support it at all)
* rework asyncraises
* always store generated p2p macro code (similar to eth2)
* preparation for chronos cancellation support (more to be done here)
* when peer is disconnected, ensure pending handshakes and requests are
notified (instead of waiting for timeout)
* disallow raising from `onPeerDisconnected` - this simplifies
disconnection coordination among async tasks
* introduce several warning logs for protocol breaches - these should be
removed eventually, pending q/a on the rlpx layer in general
* fix snappy compression - the payload without msgId should be
compressed
* remove strict checks on unused fields in RLPx message header (this
matches geth behavior and the spirit of EIP-8)
* add snappy dep
  • Loading branch information
arnetheduck authored Nov 8, 2024
1 parent 034b788 commit 88e4be4
Show file tree
Hide file tree
Showing 8 changed files with 638 additions and 580 deletions.
3 changes: 2 additions & 1 deletion eth.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ requires "nim >= 1.6.0",
"testutils",
"unittest2",
"results",
"minilru"
"minilru",
"snappy"

let nimc = getEnv("NIMC", "nim") # Which nim compiler to use
let lang = getEnv("NIMLANG", "c") # Which backend (c/cpp/js)
Expand Down
6 changes: 0 additions & 6 deletions eth/p2p.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ proc newEthereumNode*(
networkId: NetworkId,
clientId = "nim-eth-p2p",
addAllCapabilities = true,
useCompression: bool = false,
minPeers = 10,
bootstrapNodes: seq[ENode] = @[],
bindUdpPort: Port,
Expand All @@ -105,11 +104,6 @@ proc newEthereumNode*(
keys.seckey, address, bootstrapNodes, bindUdpPort, bindIp, rng)

result.rng = rng

when useSnappy:
result.protocolVersion = if useCompression: devp2pSnappyVersion
else: devp2pVersion

result.protocolStates.newSeq protocolCount()

result.peerPool = newPeerPool(
Expand Down
87 changes: 35 additions & 52 deletions eth/p2p/p2p_protocol_dsl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type
msgResponse

Message* = ref object
id*: Opt[uint64]
id*: uint64
ident*: NimNode
kind*: MessageKind
procDef*: NimNode
Expand Down Expand Up @@ -351,15 +351,17 @@ proc init*(T: type P2PProtocol, backendFactory: BackendFactory,
if not result.backend.afterProtocolInit.isNil:
result.backend.afterProtocolInit(result)

proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = Opt.none(uint64)) =
proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, canRaise: bool, msgId = Opt.none(uint64)) =
## This procs adds a set of common helpers available in all messages handlers
## (e.g. `perProtocolMsgId`, `peer.state`, etc).

userHandlerProc.addPragma ident"gcsafe"

# we only take the pragma
let dummy = quote do:
proc dummy(): Future[void] {.async: (raises: [EthP2PError]).}
let dummy = if canRaise:
quote do:
proc dummy(): Future[void] {.async: (raises: [CancelledError, EthP2PError]).}
else:
quote do:
proc dummy(): Future[void] {.async: (raises: []).}

if p.isRlpx:
userHandlerProc.addPragma dummy.pragma[0]
Expand Down Expand Up @@ -402,27 +404,16 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = Opt.no
template networkState(`peerVar`: `PeerType`): `NetworkStateType` {.used.} =
`NetworkStateType`(`getNetworkState`(`peerVar`.network, `protocolInfo`))

proc addExceptionHandler(userHandlerProc: NimNode) =
let bodyTemp = userHandlerProc.body
userHandlerProc.body = quote do:
try:
`bodyTemp`
except CancelledError as exc:
raise newException(EthP2PError, exc.msg)
except CatchableError as exc:
raise newException(EthP2PError, exc.msg)

proc addPreludeDefs(userHandlerProc: NimNode, definitions: NimNode) =
userHandlerProc.body[0].add definitions

proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string): NimNode =
proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string, canRaise: bool): NimNode =
## Turns a "named" do block to a regular async proc
## (e.g. onPeerConnected do ...)
result = newTree(nnkProcDef)
doBlock.copyChildrenTo(result)
result.name = ident(p.name & handlerName) # genSym(nskProc, p.name & handlerName)
p.augmentUserHandler result
result.addExceptionHandler()
p.augmentUserHandler result, canRaise

proc addTimeoutParam(procDef: NimNode, defaultValue: int64) =
var
Expand Down Expand Up @@ -477,7 +468,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, msgId: uint64,
recBody = newTree(nnkDistinctTy, recName)

result = Message(protocol: protocol,
id: Opt.some(msgId),
id: msgId,
ident: msgIdent,
kind: kind,
procDef: procDef,
Expand All @@ -489,7 +480,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, msgId: uint64,
if procDef.body.kind != nnkEmpty:
var userHandler = copy procDef

protocol.augmentUserHandler userHandler, Opt.some(msgId)
protocol.augmentUserHandler userHandler, true, Opt.some(msgId)
userHandler.name = ident(msgName & "UserHandler")

# Request and Response handlers get an extra `reqId` parameter if the
Expand Down Expand Up @@ -518,7 +509,6 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, msgId: uint64,
of msgResponse: userHandler.applyDecorator protocol.incomingResponseDecorator
else: discard

userHandler.addExceptionHandler()
result.userHandler = userHandler
protocol.outRecvProcs.add result.userHandler

Expand All @@ -543,7 +533,7 @@ proc addMsg(p: P2PProtocol, msgId: uint64, procDef: NimNode) =
let
responseIdent = ident($procDef.name & "Response")
response = Message(protocol: p,
id: Opt.none(uint64),
id: msgId,
ident: responseIdent,
kind: msgResponse,
recName: returnType,
Expand Down Expand Up @@ -589,7 +579,10 @@ proc createSendProc*(msg: Message,
name = if nameSuffix.len == 0: msg.identWithExportMarker
else: ident($msg.ident & nameSuffix)

pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe")
dummy = quote do:
proc dummy(): Future[void] {.async: (raises: [CancelledError, EthP2PError], raw: true).}

pragmas = if procType == nnkProcDef: dummy.pragma
else: newEmptyNode()

var def = newNimNode(procType).add(
Expand Down Expand Up @@ -641,7 +634,7 @@ proc createSendProc*(msg: Message,
of msgNotification:
discard

def[3][0] = if procType == nnkMacroDef:
def[3][0] = if procType in [nnkMacroDef, nnkTemplateDef]:
ident "untyped"
elif msg.kind == msgRequest and not isRawSender:
Fut(msg.requestResultType)
Expand Down Expand Up @@ -751,9 +744,9 @@ proc netInit*(p: P2PProtocol): NimNode =
p.backend.NetworkType,
p.NetworkStateType)

proc createHandshakeTemplate*(msg: Message,
rawSendProc, handshakeImpl,
nextMsg: NimNode): SendProc =
proc createHandshakeTemplate*(
msg: Message, rawSendProc, handshakeImpl, nextMsg: NimNode
): SendProc =
let
handshakeExchanger = msg.createSendProc(procType = nnkTemplateDef)
forwardCall = newCall(rawSendProc).appendAllInputParams(handshakeExchanger.def)
Expand All @@ -763,19 +756,13 @@ proc createHandshakeTemplate*(msg: Message,
forwardCall[1] = peerVar
forwardCall.del(forwardCall.len - 1)

let peerVar = genSym(nskLet ,"peer")
let peerVar = genSym(nskLet, "peer")
handshakeExchanger.setBody quote do:
try:
let `peerVar` = `peerValue`
let sendingFuture = `forwardCall`
`handshakeImpl`(`peerVar`,
sendingFuture,
`nextMsg`(`peerVar`, `msgRecName`),
`timeoutVar`)
except PeerDisconnected as exc:
raise newException(EthP2PError, exc.msg)
except P2PInternalError as exc:
raise newException(EthP2PError, exc.msg)
let `peerVar` = `peerValue`
let sendingFuture = `forwardCall`
`handshakeImpl`[`msgRecName`](
`peerVar`, sendingFuture, `nextMsg`(`peerVar`, `msgRecName`), `timeoutVar`
)

return handshakeExchanger

Expand Down Expand Up @@ -844,10 +831,10 @@ proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) =
inc nextId

elif eqIdent(n[0], "onPeerConnected"):
p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected")
p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected", true)

elif eqIdent(n[0], "onPeerDisconnected"):
p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected")
p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected", false)

else:
error(repr(n) & " is not a recognized call in P2P protocol definitions", n)
Expand Down Expand Up @@ -894,11 +881,8 @@ proc genTypeSection*(p: P2PProtocol): NimNode =
if msg.procDef == nil:
continue

# FIXME: Can `msg.id` be missing, at all?
doAssert msg.id.isSome()

let
msgId = msg.id.value
msgId = msg.id
msgName = msg.ident
msgRecName = msg.recName
msgStrongRecName = msg.strongRecName
Expand Down Expand Up @@ -983,12 +967,11 @@ macro emitForSingleBackend(

result = p.genCode()

when defined(p2pProtocolDebug):
try:
result.storeMacroResult true
except IOError:
# IO error so the generated nim code might not be stored, don't sweat it.
discard
try:
result.storeMacroResult true
except IOError:
# IO error so the generated nim code might not be stored, don't sweat it.
discard

macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped =
let name = $(options[0])
Expand Down
32 changes: 15 additions & 17 deletions eth/p2p/private/p2p_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import
".."/../[rlp], ../../common/[base, keys],
".."/[enode, kademlia, discovery, rlpxtransport]

export base.NetworkId, rlpxtransport

const
useSnappy* = defined(useSnappy)
export base.NetworkId, rlpxtransport, kademlia

type
EthereumNode* = ref object
Expand All @@ -39,8 +36,6 @@ type
listeningServer*: StreamServer
protocolStates*: seq[RootRef]
discovery*: DiscoveryProtocol
when useSnappy:
protocolVersion*: uint64
rng*: ref HmacDrbgContext

Peer* = ref object
Expand All @@ -55,8 +50,7 @@ type
protocolStates*: seq[RootRef]
outstandingRequests*: seq[Deque[OutstandingRequest]] # per `msgId` table
awaitedMessages*: seq[FutureBase] # per `msgId` table
when useSnappy:
snappyEnabled*: bool
snappyEnabled*: bool
clientId*: string

SeenNode* = object
Expand Down Expand Up @@ -119,8 +113,9 @@ type
# Private fields:
peerStateInitializer*: PeerStateInitializer
networkStateInitializer*: NetworkStateInitializer
handshake*: HandshakeStep
disconnectHandler*: DisconnectionHandler

onPeerConnected*: OnPeerConnectedHandler
onPeerDisconnected*: OnPeerDisconnectedHandler

MessageInfo* = ref object
id*: uint64 # this is a `msgId` (as opposed to a `reqId`)
Expand All @@ -131,6 +126,7 @@ type
printer*: MessageContentPrinter
requestResolver*: RequestResolver
nextMsgResolver*: NextMsgResolver
failResolver*: FailResolver

Dispatcher* = ref object # private
# The dispatcher stores the mapping of negotiated message IDs between
Expand All @@ -156,13 +152,12 @@ type
OutstandingRequest* = object
id*: uint64 # a `reqId` that may be used for response
future*: FutureBase
timeoutAt*: Moment

# Private types:
MessageHandlerDecorator* = proc(msgId: uint64, n: NimNode): NimNode

ThunkProc* = proc(x: Peer, msgId: uint64, data: Rlp): Future[void]
{.gcsafe, async: (raises: [RlpError, EthP2PError]).}
ThunkProc* = proc(x: Peer, data: Rlp): Future[void]
{.async: (raises: [CancelledError, EthP2PError]).}

MessageContentPrinter* = proc(msg: pointer): string
{.gcsafe, raises: [].}
Expand All @@ -173,17 +168,20 @@ type
NextMsgResolver* = proc(msgData: Rlp, future: FutureBase)
{.gcsafe, raises: [RlpError].}

FailResolver* = proc(reason: DisconnectionReason, future: FutureBase)
{.gcsafe, raises: [].}

PeerStateInitializer* = proc(peer: Peer): RootRef
{.gcsafe, raises: [].}

NetworkStateInitializer* = proc(network: EthereumNode): RootRef
{.gcsafe, raises: [].}

HandshakeStep* = proc(peer: Peer): Future[void]
{.gcsafe, async: (raises: [EthP2PError]).}
OnPeerConnectedHandler* = proc(peer: Peer): Future[void]
{.async: (raises: [CancelledError, EthP2PError]).}

DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason):
Future[void] {.gcsafe, async: (raises: [EthP2PError]).}
OnPeerDisconnectedHandler* = proc(peer: Peer, reason: DisconnectionReason):
Future[void] {.async: (raises: []).}

ConnectionState* = enum
None,
Expand Down
Loading

0 comments on commit 88e4be4

Please sign in to comment.