Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
v0.8.0 featuring BEP 51 "DHT Infohash Indexing"
Browse files Browse the repository at this point in the history
  • Loading branch information
boramalper committed May 18, 2019
1 parent aae6709 commit 1fdfa13
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 368 deletions.
19 changes: 2 additions & 17 deletions cmd/magneticod/bittorrent/metadata/leech.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ func (l *Leech) readUmMessage() ([]byte, error) {
func (l *Leech) connect(deadline time.Time) error {
var err error

l.conn, err = net.DialTCP("tcp4", nil, l.peerAddr)
x, err := net.DialTimeout("tcp4", l.peerAddr.String(), 1*time.Second)
if err != nil {
return errors.Wrap(err, "dial")
}
l.conn = x.(*net.TCPConn)

// > If sec == 0, operating system discards any unsent or unacknowledged data [after Close()
// > has been called].
Expand All @@ -249,22 +250,6 @@ func (l *Leech) connect(deadline time.Time) error {
return errors.Wrap(err, "SetLinger")
}

err = l.conn.SetKeepAlive(true)
if err != nil {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
return errors.Wrap(err, "SetKeepAlive")
}

err = l.conn.SetKeepAlivePeriod(10 * time.Second)
if err != nil {
if err := l.conn.Close(); err != nil {
zap.L().Panic("couldn't close leech connection!", zap.Error(err))
}
return errors.Wrap(err, "SetKeepAlivePeriod")
}

err = l.conn.SetNoDelay(true)
if err != nil {
if err := l.conn.Close(); err != nil {
Expand Down
85 changes: 56 additions & 29 deletions cmd/magneticod/bittorrent/metadata/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package metadata

import (
"math/rand"
"net"
"sync"
"time"

Expand All @@ -24,14 +25,18 @@ type Metadata struct {
}

type Sink struct {
PeerID []byte
deadline time.Duration
maxNLeeches int
drain chan Metadata
incomingInfoHashes map[[20]byte]struct{}
PeerID []byte
deadline time.Duration
maxNLeeches int
drain chan Metadata

incomingInfoHashes map[[20]byte][]net.TCPAddr
incomingInfoHashesMx sync.Mutex
terminated bool
termination chan interface{}

terminated bool
termination chan interface{}

deleted int
}

func randomID() []byte {
Expand All @@ -52,7 +57,7 @@ func randomID() []byte {
* - Last two digits for the minor version number
* - Patch version number is not encoded.
*/
prefix := []byte("-MC0007-")
prefix := []byte("-MC0008-")

var rando []byte
for i := 20 - len(prefix); i >= 0; i-- {
Expand All @@ -74,17 +79,28 @@ func NewSink(deadline time.Duration, maxNLeeches int) *Sink {
ms.PeerID = randomID()
ms.deadline = deadline
ms.maxNLeeches = maxNLeeches
ms.drain = make(chan Metadata)
ms.incomingInfoHashes = make(map[[20]byte]struct{})
ms.drain = make(chan Metadata, 10)
ms.incomingInfoHashes = make(map[[20]byte][]net.TCPAddr)
ms.termination = make(chan interface{})

go func() {
for range time.Tick(deadline) {
ms.incomingInfoHashesMx.Lock()
l := len(ms.incomingInfoHashes)
ms.incomingInfoHashesMx.Unlock()
zap.L().Info("Sink status",
zap.Int("activeLeeches", l),
zap.Int("nDeleted", ms.deleted),
zap.Int("drainQueue", len(ms.drain)),
)
ms.deleted = 0
}
}()

return ms
}

func (ms *Sink) Sink(res dht.Result) {
infoHash := res.InfoHash()
peerAddr := res.PeerAddr()

if ms.terminated {
zap.L().Panic("Trying to Sink() an already closed Sink!")
}
Expand All @@ -96,23 +112,22 @@ func (ms *Sink) Sink(res dht.Result) {
return
}

infoHash := res.InfoHash()
peerAddrs := res.PeerAddrs()

if _, exists := ms.incomingInfoHashes[infoHash]; exists {
return
} else if len(peerAddrs) > 0 {
peer := peerAddrs[0]
ms.incomingInfoHashes[infoHash] = peerAddrs[1:]

go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush,
OnError: ms.onLeechError,
}).Do(time.Now().Add(ms.deadline))
}
// BEWARE!
// Although not crucial, the assumption is that Sink.Sink() will be called by only one
// goroutine (i.e. it's not thread-safe), lest there might be a race condition between where we
// check whether res.infoHash exists in the ms.incomingInfoHashes, and where we add the infoHash
// to the incomingInfoHashes at the end of this function.

zap.L().Debug("Sunk!", zap.Int("leeches", len(ms.incomingInfoHashes)), util.HexField("infoHash", infoHash[:]))

go NewLeech(infoHash, peerAddr, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush,
OnError: ms.onLeechError,
}).Do(time.Now().Add(ms.deadline))

ms.incomingInfoHashes[infoHash] = struct{}{}
}

func (ms *Sink) Drain() <-chan Metadata {
Expand All @@ -136,17 +151,29 @@ func (ms *Sink) flush(result Metadata) {
ms.drain <- result
// Delete the infoHash from ms.incomingInfoHashes ONLY AFTER once we've flushed the
// metadata!
ms.incomingInfoHashesMx.Lock()
defer ms.incomingInfoHashesMx.Unlock()

var infoHash [20]byte
copy(infoHash[:], result.InfoHash)
ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock()
}

func (ms *Sink) onLeechError(infoHash [20]byte, err error) {
zap.L().Debug("leech error", util.HexField("infoHash", infoHash[:]), zap.Error(err))

ms.incomingInfoHashesMx.Lock()
delete(ms.incomingInfoHashes, infoHash)
ms.incomingInfoHashesMx.Unlock()
defer ms.incomingInfoHashesMx.Unlock()

if len(ms.incomingInfoHashes[infoHash]) > 0 {
peer := ms.incomingInfoHashes[infoHash][0]
ms.incomingInfoHashes[infoHash] = ms.incomingInfoHashes[infoHash][1:]
go NewLeech(infoHash, &peer, ms.PeerID, LeechEventHandlers{
OnSuccess: ms.flush,
OnError: ms.onLeechError,
}).Do(time.Now().Add(ms.deadline))
} else {
ms.deleted++
delete(ms.incomingInfoHashes, infoHash)
}
}
32 changes: 17 additions & 15 deletions cmd/magneticod/dht/mainline/indexingService.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ type IndexingServiceEventHandlers struct {
}

type IndexingResult struct {
infoHash [20]byte
peerAddr *net.TCPAddr
infoHash [20]byte
peerAddrs []net.TCPAddr
}

func (ir IndexingResult) InfoHash() [20]byte {
return ir.infoHash
}

func (ir IndexingResult) PeerAddr() *net.TCPAddr {
return ir.peerAddr
func (ir IndexingResult) PeerAddrs() []net.TCPAddr {
return ir.peerAddrs
}

func NewIndexingService(laddr string, interval time.Duration, eventHandlers IndexingServiceEventHandlers) *IndexingService {
Expand Down Expand Up @@ -86,11 +86,6 @@ func (is *IndexingService) Terminate() {

func (is *IndexingService) index() {
for range time.Tick(is.interval) {
// TODO
// For some reason, we can't still detect congestion and this keeps increasing...
// Disable for now.
// s.maxNeighbors = uint(float32(s.maxNeighbors) * 1.001)

is.routingTableMutex.Lock()
if len(is.routingTable) == 0 {
is.bootstrap()
Expand Down Expand Up @@ -190,15 +185,22 @@ func (is *IndexingService) onGetPeersResponse(msg *Message, addr *net.UDPAddr) {
return
}

peerAddrs := make([]net.TCPAddr, 0)
for _, peer := range msg.R.Values {
is.eventHandlers.OnResult(IndexingResult{
infoHash: infoHash,
peerAddr: &net.TCPAddr{
IP: peer.IP,
Port: peer.Port,
},
if peer.Port == 0 {
continue
}

peerAddrs = append(peerAddrs, net.TCPAddr{
IP: peer.IP,
Port: peer.Port,
})
}

is.eventHandlers.OnResult(IndexingResult{
infoHash: infoHash,
peerAddrs: peerAddrs,
})
}

func (is *IndexingService) onSampleInfohashesResponse(msg *Message, addr *net.UDPAddr) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/magneticod/dht/mainline/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ func NewSampleInfohashesQuery(id []byte, t []byte, target []byte) *Message {
Y: "q",
T: t,
Q: "sample_infohashes",
A: QueryArguments {
ID: id,
A: QueryArguments{
ID: id,
Target: target,
},
}
Expand Down
5 changes: 0 additions & 5 deletions cmd/magneticod/dht/mainline/service.go

This file was deleted.

5 changes: 2 additions & 3 deletions cmd/magneticod/dht/mainline/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ func (t *Transport) readMessages() {
zap.L().Warn("READ CONGESTION!", zap.Error(err))
t.onCongestion()
} else if err != nil {
// TODO: isn't there a more reliable way to detect if UDPConn is closed?
zap.L().Warn("Could NOT read an UDP packet!", zap.Error(err))
}

if n == 0 {
/* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit
/* Datagram sockets in various domains (e.g., the UNIX and Internet domains) permit
* zero-length datagrams. When such a datagram is received, the return value (n) is 0.
*/
continue
Expand Down Expand Up @@ -150,7 +149,7 @@ func (t *Transport) WriteMessages(msg *Message, addr *net.UDPAddr) {
*
* Source: https://docs.python.org/3/library/asyncio-protocol.html#flow-control-callbacks
*/
//zap.L().Warn("WRITE CONGESTION!", zap.Error(err))
zap.L().Warn("WRITE CONGESTION!", zap.Error(err))
if t.onCongestion != nil {
t.onCongestion()
}
Expand Down
Loading

0 comments on commit 1fdfa13

Please sign in to comment.