diff --git a/network/peers.go b/network/peers.go index effe2d3..8d25f69 100644 --- a/network/peers.go +++ b/network/peers.go @@ -70,7 +70,6 @@ func (ps *peers) addPeer(key publicKey, conn net.Conn, prio uint8) (*peer, error p.monitor.peer = p p.monitor.pDelay = ps.core.config.peerTimeout // It doesn't make sense to start the ping delay any shorter than this p.writer.peer = p - p.writer.wbuf = bufio.NewWriter(p.conn) p.time = time.Now() ps.peers[p.key][p] = struct{}{} }) @@ -176,14 +175,14 @@ func (m *peerMonitor) recv(pType wirePacketType) { type peerWriter struct { phony.Inbox peer *peer - wbuf *bufio.Writer + nbuf net.Buffers seq uint64 } func (w *peerWriter) _write(bs []byte, pType wirePacketType) { w.peer.monitor.sent(pType) - // _, _ = w.peer.conn.Write(bs) - _, _ = w.wbuf.Write(bs) + w.nbuf = append(w.nbuf, bs) + w._writev() w.seq++ seq := w.seq w.Act(nil, func() { @@ -193,6 +192,15 @@ func (w *peerWriter) _write(bs []byte, pType wirePacketType) { }) } +func (w *peerWriter) _writev() { + // Keep the original beginning of the queue + orig := w.nbuf[:0] + // Try to send the packets, this will mutate nbuf + _, _ = w.nbuf.WriteTo(w.peer.conn) + // Shift everything forward so nbuf doesn't grow indefinitely + w.nbuf = append(orig[:0], w.nbuf...)[:len(w.nbuf)] +} + func (w *peerWriter) sendPacket(pType wirePacketType, data wireEncodeable) { w.Act(nil, func() { bufSize := uint64(data.size() + 1) @@ -436,7 +444,7 @@ func (p *peer) pop() { } else { p.ready = true p.writer.Act(nil, func() { - p.writer.wbuf.Flush() + p.writer._writev() }) } })