Skip to content

Commit

Permalink
Merge pull request #308 from sylwiaszunejko/frame_serialization
Browse files Browse the repository at this point in the history
Implement single connection benchmark
  • Loading branch information
sylwiaszunejko authored Nov 12, 2024
2 parents c594ff2 + e5df002 commit d85e54e
Show file tree
Hide file tree
Showing 12 changed files with 2,137 additions and 10 deletions.
158 changes: 158 additions & 0 deletions dialer/recorder/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package recorder

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"time"

"github.com/gocql/gocql"
"github.com/gocql/gocql/dialer"
)

func NewRecordDialer(dir string) *RecordDialer {
return &RecordDialer{
dir: dir,
}
}

type RecordDialer struct {
dir string
net.Dialer
}

func (d *RecordDialer) DialContext(ctx context.Context, network, addr string) (conn net.Conn, err error) {
fmt.Println("Dial Context Record Dialer")
sourcePort := gocql.ScyllaGetSourcePort(ctx)
fmt.Println("Source port: ", sourcePort)
dialerWithLocalAddr := d.Dialer
dialerWithLocalAddr.LocalAddr, err = net.ResolveTCPAddr(network, fmt.Sprintf(":%d", sourcePort))
if err != nil {
fmt.Println(err)
return nil, err
}

conn, err = dialerWithLocalAddr.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}

return NewConnectionRecorder(path.Join(d.dir, fmt.Sprintf("%s-%d", addr, sourcePort)), conn)
}

func NewConnectionRecorder(fname string, conn net.Conn) (net.Conn, error) {
fd_writes, err := os.OpenFile(fname+"Writes", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
fd_reads, err2 := os.OpenFile(fname+"Reads", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err2 != nil {
return nil, err2
}
return &ConnectionRecorder{fd_writes: fd_writes, fd_reads: fd_reads, orig: conn, write_record: FrameWriter{new: true}, read_record: FrameWriter{new: true}}, nil
}

type FrameWriter struct {
new bool
to_record int
record dialer.Record
}

func (f *FrameWriter) Write(b []byte, n int, file *os.File) (err error) {
if f.new {
f.to_record = -1
f.record = dialer.Record{}
}

recorded_ealier := len(f.record.Data)
f.record.Data = append(f.record.Data, b[:n]...)

if f.to_record == -1 && len(f.record.Data) >= 9 {
p := 4
stream_id := int(f.record.Data[2])
if b[0] > 0x02 {
p = 5
stream_id = int(f.record.Data[2])<<8 | int(f.record.Data[3])
}

f.to_record = p + 4 + int(f.record.Data[p+0])<<24 | int(f.record.Data[p+1])<<16 | int(f.record.Data[p+2])<<8 | int(f.record.Data[p+3]) - recorded_ealier
f.record.StreamID = stream_id
} else if f.to_record == -1 {
return err
}

f.to_record = f.to_record - n
if f.to_record <= 0 {
f.new = true
// Write JSON record to file
jsonData, marshalErr := json.Marshal(f.record)
if marshalErr != nil {
return fmt.Errorf("failed to encode JSON record: %w", marshalErr)
}
_, writeErr := file.Write(append(jsonData, '\n'))
if writeErr != nil {
return fmt.Errorf("failed to record: %w", writeErr)
}
}
return err
}

type ConnectionRecorder struct {
fd_writes *os.File
fd_reads *os.File
orig net.Conn
read_record FrameWriter
write_record FrameWriter
}

func (c *ConnectionRecorder) Read(b []byte) (n int, err error) {
n, err = c.orig.Read(b)
if err != nil && err != io.EOF {
return n, err
}

return n, c.read_record.Write(b, n, c.fd_reads)
}

func (c *ConnectionRecorder) Write(b []byte) (n int, err error) {
n, err = c.orig.Write(b)
if err != nil {
return n, err
}

return n, c.write_record.Write(b, n, c.fd_writes)
}

func (c ConnectionRecorder) Close() error {
if err := c.fd_writes.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
if err := c.fd_reads.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
return c.orig.Close()
}

func (c ConnectionRecorder) LocalAddr() net.Addr {
return c.orig.LocalAddr()
}

func (c ConnectionRecorder) RemoteAddr() net.Addr {
return c.orig.RemoteAddr()
}

func (c ConnectionRecorder) SetDeadline(t time.Time) error {
return c.orig.SetDeadline(t)
}

func (c ConnectionRecorder) SetReadDeadline(t time.Time) error {
return c.orig.SetReadDeadline(t)
}

func (c ConnectionRecorder) SetWriteDeadline(t time.Time) error {
return c.orig.SetWriteDeadline(t)
}
225 changes: 225 additions & 0 deletions dialer/replayer/replayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package replayer

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"path"
"time"

"github.com/gocql/gocql"
"github.com/gocql/gocql/dialer"
)

func NewReplayDialer(dir string) *ReplayDialer {
return &ReplayDialer{
dir: dir,
}
}

type ReplayDialer struct {
dir string
net.Dialer
}

func (d *ReplayDialer) DialContext(ctx context.Context, network, addr string) (conn net.Conn, err error) {
sourcePort := gocql.ScyllaGetSourcePort(ctx)
return NewConnectionReplayer(path.Join(d.dir, fmt.Sprintf("%s-%d", addr, sourcePort)))
}

func NewConnectionReplayer(fname string) (net.Conn, error) {
frames, err := loadResponseFramesFromFiles(fname+"Reads", fname+"Writes")
if err != nil {
return nil, err
}
return &ConnectionReplayer{frames: frames, frameIdsToReplay: []int{}, streamIdsToReplay: []int{}, frameIdx: 0, frameResponsePosition: 0, gotRequest: make(chan struct{}, 1)}, nil
}

type ConnectionReplayer struct {
frames []*FrameRecorded
frameIdsToReplay []int
streamIdsToReplay []int
frameIdx int
frameResponsePosition int
gotRequest chan struct{}
closed bool
}

func (c *ConnectionReplayer) frameStreamID() int {
return c.streamIdsToReplay[c.frameIdx]
}

func (c *ConnectionReplayer) getPendingFrame() *FrameRecorded {
if c.frameIdx < 0 || c.frameIdx >= len(c.frameIdsToReplay) {
return nil
}
frameId := c.frameIdsToReplay[c.frameIdx]
if frameId < 0 || frameId >= len(c.frames) {
return nil
}
return c.frames[frameId]
}

func (c *ConnectionReplayer) pushStreamIDToReplay(b []byte, idx int) {
if b[0] > 0x02 {
c.streamIdsToReplay = append(c.streamIdsToReplay, int(b[2])<<8|int(b[3]))
} else {
c.streamIdsToReplay = append(c.streamIdsToReplay, int(b[2]))
}
c.frameIdsToReplay = append(c.frameIdsToReplay, idx)

select {
case c.gotRequest <- struct{}{}:
default:
}
}

func replaceFrameStreamID(b []byte, stream int) {
if b[0] > 0x02 {
b[2] = byte(stream >> 8)
b[3] = byte(stream)
} else {
b[2] = byte(stream)
}
}

func (c *ConnectionReplayer) Read(b []byte) (n int, err error) {
frame := c.getPendingFrame()
for frame == nil {
<-c.gotRequest
frame = c.getPendingFrame()
}
if c.Closed() {
return 0, io.EOF
}
response := frame.Response[c.frameResponsePosition:]

if len(b) < len(response) {
copy(b, response[:len(b)])
c.frameResponsePosition = c.frameResponsePosition + len(b)
return len(b), err
}

copy(b, response)
if c.frameResponsePosition == 0 {
replaceFrameStreamID(b, c.frameStreamID())
}

c.frameIdx = c.frameIdx + 1
c.frameResponsePosition = 0
return len(response), err
}

func (c *ConnectionReplayer) Write(b []byte) (n int, err error) {
writeHash := dialer.GetFrameHash(b)

for i, q := range c.frames {
if q.Hash == writeHash {
c.pushStreamIDToReplay(b, i)
return len(b), nil
}
}
panic(fmt.Errorf("unable to find a response to replay"))
}

func (c *ConnectionReplayer) Close() error {
close(c.gotRequest)
c.closed = true
return nil
}

func (c *ConnectionReplayer) Closed() bool {
return c.closed
}

type MockAddr struct {
network string
address string
}

func (m *MockAddr) Network() string {
return m.network
}

func (m *MockAddr) String() string {
return m.address
}

func (c ConnectionReplayer) LocalAddr() net.Addr {
return &MockAddr{
network: "tcp",
address: "10.0.0.1:54321",
}
}

func (c ConnectionReplayer) RemoteAddr() net.Addr {
return &MockAddr{
network: "tcp",
address: "192.168.1.100:12345",
}
}

func (c ConnectionReplayer) SetDeadline(t time.Time) error {
return nil
}

func (c ConnectionReplayer) SetReadDeadline(t time.Time) error {
return nil
}

func (c ConnectionReplayer) SetWriteDeadline(t time.Time) error {
return nil
}

func loadFramesFromFile(filename string) (map[int]dialer.Record, error) {
records := make(map[int]dialer.Record)

file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("failed to open file %s: %w", filename, err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
var record dialer.Record
if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
fmt.Printf("Error decoding JSON in %s: %s\n", filename, err)
continue
}
records[record.StreamID] = record
}

if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading file %s: %w", filename, err)
}
return records, nil
}

func loadResponseFramesFromFiles(read_file, write_file string) ([]*FrameRecorded, error) {
read_records, err := loadFramesFromFile(read_file)
if err != nil {
return nil, err
}
write_records, err := loadFramesFromFile(write_file)
if err != nil {
return nil, err
}

var frames = []*FrameRecorded{}
for streamID, record1 := range read_records {
if record2, exists := write_records[streamID]; exists {
frames = append(frames, &FrameRecorded{Response: record1.Data, Hash: dialer.GetFrameHash(record2.Data)})
}
}
return frames, nil
}

type FrameRecorded struct {
Hash int64
Response []byte
}
Loading

0 comments on commit d85e54e

Please sign in to comment.