Skip to content

Commit

Permalink
zmq4: add multi-part Send API
Browse files Browse the repository at this point in the history
benchstat between Send and SendMulti:
name               old time/op  new time/op  delta
PubSubSendMulti-8   26.9s ± 0%    7.2s ± 0%   ~     (p=1.000 n=1+1)
  • Loading branch information
Inphi authored and sbinet committed Dec 7, 2019
1 parent 33d608a commit 78ce94b
Show file tree
Hide file tree
Showing 17 changed files with 251 additions and 8 deletions.
52 changes: 52 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func (c *Conn) SendMsg(msg Msg) error {
if c.Closed() {
return ErrClosedConn
}
if msg.multipart {
return c.sendMulti(msg)
}

nframes := len(msg.Frames)
for i, frame := range msg.Frames {
var flag byte
Expand Down Expand Up @@ -276,6 +280,54 @@ func (c *Conn) RecvCmd() (Cmd, error) {
return cmd, nil
}

func (c *Conn) sendMulti(msg Msg) error {
var buffers net.Buffers

nframes := len(msg.Frames)
for i, frame := range msg.Frames {
var flag byte
if i < nframes-1 {
flag ^= hasMoreBitFlag
}

size := len(frame)
isLong := size > 255
if isLong {
flag ^= isLongBitFlag
}

var (
hdr = [8 + 1]byte{flag}
hsz int
)
if isLong {
hsz = 9
binary.BigEndian.PutUint64(hdr[1:], uint64(size))
} else {
hsz = 2
hdr[1] = uint8(size)
}

switch c.sec.Type() {
case NullSecurity:
buffers = append(buffers, hdr[:hsz], frame)
default:
var secBuf bytes.Buffer
if _, err := c.sec.Encrypt(&secBuf, frame); err != nil {
return err
}
buffers = append(buffers, hdr[:hsz], secBuf.Bytes())
}
}

if _, err := buffers.WriteTo(c.rw); err != nil {
c.checkIO(err)
return err
}

return nil
}

func (c *Conn) send(isCommand bool, body []byte, flag byte) error {
// Long flag
size := len(body)
Expand Down
7 changes: 7 additions & 0 deletions cxx_zmq4_compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ func (sck *csocket) Send(msg Msg) error {
return sck.sock.SendMessage(msg.Frames)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (sck *csocket) SendMulti(msg Msg) error {
return sck.sock.SendMessage(msg.Frames)
}

// Recv receives a complete message.
func (sck *csocket) Recv() (Msg, error) {
frames, err := sck.sock.RecvMessage()
Expand Down
7 changes: 7 additions & 0 deletions dealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func (dealer *dealerSocket) Send(msg Msg) error {
return dealer.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (dealer *dealerSocket) SendMulti(msg Msg) error {
return dealer.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (dealer *dealerSocket) Recv() (Msg, error) {
return dealer.sck.Recv()
Expand Down
7 changes: 4 additions & 3 deletions msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ const (

// Msg is a ZMTP message, possibly composed of multiple frames.
type Msg struct {
Frames [][]byte
Type MsgType
err error
Frames [][]byte
Type MsgType
multipart bool
err error
}

func NewMsg(frame []byte) Msg {
Expand Down
7 changes: 7 additions & 0 deletions pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func (pair *pairSocket) Send(msg Msg) error {
return pair.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (pair *pairSocket) SendMulti(msg Msg) error {
return pair.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (pair *pairSocket) Recv() (Msg, error) {
return pair.sck.Recv()
Expand Down
11 changes: 11 additions & 0 deletions pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ func (pub *pubSocket) Send(msg Msg) error {
return pub.sck.w.write(ctx, msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (pub *pubSocket) SendMulti(msg Msg) error {
msg.multipart = true
ctx, cancel := context.WithTimeout(pub.sck.ctx, pub.sck.timeout())
defer cancel()
return pub.sck.w.write(ctx, msg)
}

// Recv receives a complete message.
func (*pubSocket) Recv() (Msg, error) {
msg := Msg{err: xerrors.Errorf("zmq4: PUB sockets can't recv messages")}
Expand Down Expand Up @@ -322,6 +332,7 @@ func (w *pubMWriter) sendMsg(msg Msg) {
topic := string(msg.Frames[0])
w.mu.Lock()
defer w.mu.Unlock()
// TODO(inphi): distribute messages across subscribers at once
for i := range w.ws {
ww := w.ws[i]
if ww.subscribed(topic) {
Expand Down
7 changes: 7 additions & 0 deletions pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func (*pullSocket) Send(msg Msg) error {
return xerrors.Errorf("zmq4: PULL sockets can't send messages")
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (pull *pullSocket) SendMulti(msg Msg) error {
return xerrors.Errorf("zmq4: PULL sockets can't send messages")
}

// Recv receives a complete message.
func (pull *pullSocket) Recv() (Msg, error) {
return pull.sck.Recv()
Expand Down
7 changes: 7 additions & 0 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ func (push *pushSocket) Send(msg Msg) error {
return push.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (push *pushSocket) SendMulti(msg Msg) error {
return push.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (*pushSocket) Recv() (Msg, error) {
return Msg{}, xerrors.Errorf("zmq4: PUSH sockets can't recv messages")
Expand Down
8 changes: 8 additions & 0 deletions rep.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (rep *repSocket) Send(msg Msg) error {
return rep.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (rep *repSocket) SendMulti(msg Msg) error {
msg.Frames = append([][]byte{nil}, msg.Frames...)
return rep.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (rep *repSocket) Recv() (Msg, error) {
msg, err := rep.sck.Recv()
Expand Down
8 changes: 8 additions & 0 deletions req.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (req *reqSocket) Send(msg Msg) error {
return req.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (req *reqSocket) SendMulti(msg Msg) error {
msg.Frames = append([][]byte{nil}, msg.Frames...)
return req.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (req *reqSocket) Recv() (Msg, error) {
msg, err := req.sck.Recv()
Expand Down
8 changes: 8 additions & 0 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ func (router *routerSocket) Send(msg Msg) error {
return router.sck.w.write(ctx, msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (router *routerSocket) SendMulti(msg Msg) error {
msg.multipart = true
return router.Send(msg)
}

// Recv receives a complete message.
func (router *routerSocket) Recv() (Msg, error) {
return router.sck.Recv()
Expand Down
10 changes: 10 additions & 0 deletions socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func (sck *socket) Send(msg Msg) error {
return sck.w.write(ctx, msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (sck *socket) SendMulti(msg Msg) error {
msg.multipart = true
ctx, cancel := context.WithTimeout(sck.ctx, sck.timeout())
defer cancel()
return sck.w.write(ctx, msg)
}

// Recv receives a complete message.
func (sck *socket) Recv() (Msg, error) {
ctx, cancel := context.WithCancel(sck.ctx)
Expand Down
7 changes: 7 additions & 0 deletions sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ func (sub *subSocket) Send(msg Msg) error {
return sub.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (sub *subSocket) SendMulti(msg Msg) error {
return sub.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (sub *subSocket) Recv() (Msg, error) {
return sub.sck.Recv()
Expand Down
7 changes: 7 additions & 0 deletions xpub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func (xpub *xpubSocket) Send(msg Msg) error {
return xpub.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (xpub *xpubSocket) SendMulti(msg Msg) error {
return xpub.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (xpub *xpubSocket) Recv() (Msg, error) {
return xpub.sck.Recv()
Expand Down
7 changes: 7 additions & 0 deletions xsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func (xsub *xsubSocket) Send(msg Msg) error {
return xsub.sck.Send(msg)
}

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
func (xsub *xsubSocket) SendMulti(msg Msg) error {
return xsub.sck.SendMulti(msg)
}

// Recv receives a complete message.
func (xsub *xsubSocket) Recv() (Msg, error) {
return xsub.sck.Recv()
Expand Down
5 changes: 5 additions & 0 deletions zmq4.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ type Socket interface {
// Send blocks until the message can be queued or the send deadline expires.
Send(msg Msg) error

// SendMulti puts the message on the outbound send queue.
// SendMulti blocks until the message can be queued or the send deadline expires.
// The message will be sent as a multipart message.
SendMulti(msg Msg) error

// Recv receives a complete message.
Recv() (Msg, error)

Expand Down
Loading

0 comments on commit 78ce94b

Please sign in to comment.