Skip to content

Commit

Permalink
Implement single connection benchmark
Browse files Browse the repository at this point in the history
Add new types of dialers, one for recording one for
replaying the traffic. Record example traffic and then
use it to benchmark frame serialization and deserialization
without using real net.Conn.
  • Loading branch information
sylwiaszunejko committed Nov 8, 2024
1 parent f62fc6e commit f6655e7
Show file tree
Hide file tree
Showing 11 changed files with 2,130 additions and 9 deletions.
158 changes: 158 additions & 0 deletions dialer/recorder/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package recorder

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"time"

"github.com/gocql/gocql"
"github.com/gocql/gocql/dialer"
)

func NewRecordDialer(dir string) *RecordDialer {
return &RecordDialer{
dir: dir,
}
}

type RecordDialer struct {
dir string
net.Dialer
}

func (d *RecordDialer) DialContext(ctx context.Context, network, addr string) (conn net.Conn, err error) {
fmt.Println("Dial Context Record Dialer")
sourcePort := gocql.ScyllaGetSourcePort(ctx)
fmt.Println("Source port: ", sourcePort)
dialerWithLocalAddr := d.Dialer
dialerWithLocalAddr.LocalAddr, err = net.ResolveTCPAddr(network, fmt.Sprintf(":%d", sourcePort))
if err != nil {
fmt.Println(err)
return nil, err
}

conn, err = dialerWithLocalAddr.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}

return NewConnectionRecorder(path.Join(d.dir, fmt.Sprintf("%s-%d", addr, sourcePort)), conn)
}

func NewConnectionRecorder(fname string, conn net.Conn) (net.Conn, error) {
fd_writes, err := os.OpenFile(fname+"Writes", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
fd_reads, err2 := os.OpenFile(fname+"Reads", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err2 != nil {
return nil, err2
}
return &ConnectionRecorder{fd_writes: fd_writes, fd_reads: fd_reads, orig: conn, write_record: FrameWriter{new: true}, read_record: FrameWriter{new: true}}, nil
}

type FrameWriter struct {
new bool
to_record int
record dialer.Record
}

func (f *FrameWriter) Write(b []byte, n int, file *os.File) (err error) {
if f.new {
f.to_record = -1
f.record = dialer.Record{}
}

recorded_ealier := len(f.record.Data)
f.record.Data = append(f.record.Data, b[:n]...)

if f.to_record == -1 && len(f.record.Data) >= 9 {
p := 4
stream_id := int(f.record.Data[2])
if b[0] > 0x02 {
p = 5
stream_id = int(f.record.Data[2])<<8 | int(f.record.Data[3])
}

f.to_record = p + 4 + int(f.record.Data[p+0])<<24 | int(f.record.Data[p+1])<<16 | int(f.record.Data[p+2])<<8 | int(f.record.Data[p+3]) - recorded_ealier
f.record.StreamID = stream_id
} else if f.to_record == -1 {
return err
}

f.to_record = f.to_record - n
if f.to_record <= 0 {
f.new = true
// Write JSON record to file
jsonData, marshalErr := json.Marshal(f.record)
if marshalErr != nil {
return fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := file.Write(append(jsonData, '\n'))
if writeErr != nil {
return fmt.Errorf("failed to record: %w", writeErr)
}
}
return err
}

type ConnectionRecorder struct {
fd_writes *os.File
fd_reads *os.File
orig net.Conn
read_record FrameWriter
write_record FrameWriter
}

func (c *ConnectionRecorder) Read(b []byte) (n int, err error) {
n, err = c.orig.Read(b)
if err != nil && err != io.EOF {
return n, err
}

return n, c.read_record.Write(b, n, c.fd_reads)
}

func (c *ConnectionRecorder) Write(b []byte) (n int, err error) {
n, err = c.orig.Write(b)
if err != nil {
return n, err
}

return n, c.write_record.Write(b, n, c.fd_writes)
}

func (c ConnectionRecorder) Close() error {
if err := c.fd_writes.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
if err := c.fd_reads.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
return c.orig.Close()
}

func (c ConnectionRecorder) LocalAddr() net.Addr {
return c.orig.LocalAddr()
}

func (c ConnectionRecorder) RemoteAddr() net.Addr {
return c.orig.RemoteAddr()
}

func (c ConnectionRecorder) SetDeadline(t time.Time) error {
return c.orig.SetDeadline(t)
}

func (c ConnectionRecorder) SetReadDeadline(t time.Time) error {
return c.orig.SetReadDeadline(t)
}

func (c ConnectionRecorder) SetWriteDeadline(t time.Time) error {
return c.orig.SetWriteDeadline(t)
}
225 changes: 225 additions & 0 deletions dialer/replayer/replayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package replayer

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"time"

"github.com/gocql/gocql"
"github.com/gocql/gocql/dialer"
)

func NewReplayDialer(dir string) *ReplayDialer {
return &ReplayDialer{
dir: dir,
}
}

type ReplayDialer struct {
dir string
net.Dialer
}

func (d *ReplayDialer) DialContext(ctx context.Context, network, addr string) (conn net.Conn, err error) {
sourcePort := gocql.ScyllaGetSourcePort(ctx)
return NewConnectionReplayer(path.Join(d.dir, fmt.Sprintf("%s-%d", addr, sourcePort)))
}

func NewConnectionReplayer(fname string) (net.Conn, error) {
frames, err := loadResponseFramesFromFiles(fname+"Reads", fname+"Writes")
if err != nil {
return nil, err
}
return &ConnectionReplayer{frames: frames, frameIdsToReplay: []int{}, streamIdsToReplay: []int{}, frameIdx: 0, frameResponsePosition: 0, gotRequest: make(chan struct{}, 1)}, nil
}

type ConnectionReplayer struct {
frames []*FrameRecorded
frameIdsToReplay []int
streamIdsToReplay []int
frameIdx int
frameResponsePosition int
gotRequest chan struct{}
closed bool
}

func (c *ConnectionReplayer) frameStreamID() int {
return c.streamIdsToReplay[c.frameIdx]
}

func (c *ConnectionReplayer) getPendingFrame() *FrameRecorded {
if c.frameIdx < 0 || c.frameIdx >= len(c.frameIdsToReplay) {
return nil
}
frameId := c.frameIdsToReplay[c.frameIdx]
if frameId < 0 || frameId >= len(c.frames) {
return nil
}
return c.frames[frameId]
}

func (c *ConnectionReplayer) pushStreamIDToReplay(b []byte, idx int) {
if b[0] > 0x02 {
c.streamIdsToReplay = append(c.streamIdsToReplay, int(b[2])<<8|int(b[3]))
} else {
c.streamIdsToReplay = append(c.streamIdsToReplay, int(b[2]))
}
c.frameIdsToReplay = append(c.frameIdsToReplay, idx)

select {
case c.gotRequest <- struct{}{}:
default:
}
}

func replaceFrameStreamID(b []byte, stream int) {
if b[0] > 0x02 {
b[2] = byte(stream >> 8)
b[3] = byte(stream)
} else {
b[2] = byte(stream)
}
}

func (c *ConnectionReplayer) Read(b []byte) (n int, err error) {
frame := c.getPendingFrame()
for frame == nil {
<-c.gotRequest
frame = c.getPendingFrame()
}
if c.Closed() {
return 0, io.EOF
}
response := frame.Response[c.frameResponsePosition:]

if len(b) < len(response) {
copy(b, response[:len(b)])
c.frameResponsePosition = c.frameResponsePosition + len(b)
return len(b), err
}

copy(b, response)
if c.frameResponsePosition == 0 {
replaceFrameStreamID(b, c.frameStreamID())
}

c.frameIdx = c.frameIdx + 1
c.frameResponsePosition = 0
return len(response), err
}

func (c *ConnectionReplayer) Write(b []byte) (n int, err error) {
writeHash := dialer.GetFrameHash(b)

for i, q := range c.frames {
if q.Hash == writeHash {
c.pushStreamIDToReplay(b, i)
return len(b), nil
}
}
panic(fmt.Errorf("unable to find a response to replay"))
}

func (c *ConnectionReplayer) Close() error {
close(c.gotRequest)
c.closed = true
return nil
}

func (c *ConnectionReplayer) Closed() bool {
return c.closed
}

type MockAddr struct {
network string
address string
}

func (m *MockAddr) Network() string {
return m.network
}

func (m *MockAddr) String() string {
return m.address
}

func (c ConnectionReplayer) LocalAddr() net.Addr {
return &MockAddr{
network: "tcp",
address: "10.0.0.1:54321",
}
}

func (c ConnectionReplayer) RemoteAddr() net.Addr {
return &MockAddr{
network: "tcp",
address: "192.168.1.100:12345",
}
}

func (c ConnectionReplayer) SetDeadline(t time.Time) error {
return nil
}

func (c ConnectionReplayer) SetReadDeadline(t time.Time) error {
return nil
}

func (c ConnectionReplayer) SetWriteDeadline(t time.Time) error {
return nil
}

func loadFramesFromFile(filename string) (map[int]dialer.Record, error) {
records := make(map[int]dialer.Record)

file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("failed to open file %s: %w", filename, err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
var record dialer.Record
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
fmt.Printf("Error decoding JSON in %s: %s\n", filename, err)
continue
}
records[record.StreamID] = record
}

if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading file %s: %w", filename, err)
}
return records, nil
}

func loadResponseFramesFromFiles(read_file, write_file string) ([]*FrameRecorded, error) {
read_records, err := loadFramesFromFile(read_file)
if err != nil {
return nil, err
}
write_records, err := loadFramesFromFile(write_file)
if err != nil {
return nil, err
}

var frames = []*FrameRecorded{}
for streamID, record1 := range read_records {
if record2, exists := write_records[streamID]; exists {
frames = append(frames, &FrameRecorded{Response: record1.Data, Hash: dialer.GetFrameHash(record2.Data)})
}
}
return frames, nil
}

type FrameRecorded struct {
Hash int64
Response []byte
}
Loading

0 comments on commit f6655e7

Please sign in to comment.