Skip to content

Commit

Permalink
Merge pull request #6 from m-lab/full-program
Browse files Browse the repository at this point in the history
Full program
  • Loading branch information
pboothe authored Oct 30, 2019
2 parents 41f5aae + e2c17df commit ae9b04e
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 32 deletions.
33 changes: 1 addition & 32 deletions demuxer/demuxer.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,17 @@
// Package demuxer contains the tools for sending packets to the right goroutine to save them to disk.
package demuxer

import (
"context"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/m-lab/go/anonymize"
"github.com/m-lab/packet-headers/metrics"
"github.com/m-lab/packet-headers/saver"
"github.com/prometheus/client_golang/prometheus"
)

// FullFlow characterizes a TCP/IP flow without judgement about what direction
// the flow is. The lexicographically lowest IP/Port combination should always
// be first. It is not meant to be human-readable, and is instead only designed
// to be used as a key in a map.
type FullFlow struct {
lo, hi string
loP, hiP uint16
}

// fromPacket converts a packet's TCP 4-tuple into a FullFlow suitable for being
// a map key. Never pass fromPacket a non-TCP/IP packet - it will crash.
func fromPacket(p gopacket.Packet) FullFlow {
nl := p.NetworkLayer()
var ip1, ip2 string
switch nl.LayerType() {
case layers.LayerTypeIPv4:
ip1 = string(nl.(*layers.IPv4).SrcIP)
ip2 = string(nl.(*layers.IPv4).DstIP)
case layers.LayerTypeIPv6:
ip1 = string(nl.(*layers.IPv6).SrcIP)
ip2 = string(nl.(*layers.IPv6).DstIP)
}
f := p.TransportLayer().(*layers.TCP)
ip1P := uint16(f.SrcPort)
ip2P := uint16(f.DstPort)
if ip1 < ip2 || (ip1 == ip2 && ip1P < ip2P) {
return FullFlow{ip1, ip2, ip1P, ip2P}
}
return FullFlow{ip2, ip1, ip2P, ip1P}
}

// UUIDEvent is the datatype sent to a demuxer's UUIDChan to notify it about the
// UUID of new flows.
type UUIDEvent struct {
Expand Down
46 changes: 46 additions & 0 deletions demuxer/fullflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package demuxer

import (
"net"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
)

// FullFlow characterizes a TCP/IP flow without judgement about what direction
// the flow is. The lexicographically lowest IP/Port combination should always
// be first. It is not meant to be human-readable, and is instead only designed
// to be used as a key in a map.
type FullFlow struct {
lo, hi string
loP, hiP uint16
}

// fromPacket converts a packet's TCP 4-tuple into a FullFlow suitable for being
// a map key. Never pass fromPacket a non-TCP/IP packet - it will crash.
func fromPacket(p gopacket.Packet) FullFlow {
nl := p.NetworkLayer()
var ip1, ip2 net.IP
switch nl.LayerType() {
case layers.LayerTypeIPv4:
ip1 = nl.(*layers.IPv4).SrcIP
ip2 = nl.(*layers.IPv4).DstIP
case layers.LayerTypeIPv6:
ip1 = nl.(*layers.IPv6).SrcIP
ip2 = nl.(*layers.IPv6).DstIP
}
f := p.TransportLayer().(*layers.TCP)
ip1P := uint16(f.SrcPort)
ip2P := uint16(f.DstPort)
return FullFlowFrom4Tuple(ip1, ip1P, ip2, ip2P)
}

// FullFlowFrom4Tuple creates a FullFlow (suitable for use as a map key) from a TCP 4-tuple.
func FullFlowFrom4Tuple(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16) FullFlow {
srcIPS := string(srcIP)
dstIPS := string(dstIP)
if srcIPS < dstIPS || (srcIPS == dstIPS && srcPort < dstPort) {
return FullFlow{srcIPS, dstIPS, srcPort, dstPort}
}
return FullFlow{dstIPS, srcIPS, dstPort, srcPort}
}
54 changes: 54 additions & 0 deletions demuxer/fullflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package demuxer

import (
"net"
"testing"
)

func TestFullFlowFrom4Tuple(t *testing.T) {
tests := []struct {
name string
srcIP net.IP
srcPort uint16
dstIP net.IP
dstPort uint16
}{
{
name: "Different hosts",
srcIP: net.ParseIP("10.1.1.1").To4(),
srcPort: 2000,
dstIP: net.ParseIP("192.168.0.1").To4(),
dstPort: 1000,
},
{
name: "Same host, different ports",
srcIP: net.ParseIP("10.2.3.4").To4(),
srcPort: 2000,
dstIP: net.ParseIP("10.2.3.4").To4(),
dstPort: 1000,
},
{
name: "Different v6 hosts",
srcIP: net.ParseIP("2:3::").To16(),
srcPort: 2000,
dstIP: net.ParseIP("4:5::").To16(),
dstPort: 1000,
},
{
name: "Same v6 host, different ports",
srcIP: net.ParseIP("1::").To16(),
srcPort: 2000,
dstIP: net.ParseIP("1::").To16(),
dstPort: 1000,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f1 := FullFlowFrom4Tuple(tt.srcIP, tt.srcPort, tt.dstIP, tt.dstPort)
f2 := FullFlowFrom4Tuple(tt.dstIP, tt.dstPort, tt.srcIP, tt.srcPort)
if f1 != f2 {
t.Errorf("%+v != %+v", f1, f2)
}
})
}
}
87 changes: 87 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"context"
"flag"
"os"
"sync"
"time"

"github.com/m-lab/go/anonymize"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"

"github.com/m-lab/go/flagx"
"github.com/m-lab/go/prometheusx"
"github.com/m-lab/go/rtx"
"github.com/m-lab/go/warnonerror"
"github.com/m-lab/packet-headers/demuxer"
"github.com/m-lab/packet-headers/tcpeventhandler"
"github.com/m-lab/tcp-info/eventsocket"
)

var (
dir = flag.String("datadir", ".", "The directory to which data is written")
eventSocket = flag.String("eventsocket", "", "The absolute pathname of the unix-domain socket to which events will be posted.")
captureDuration = flag.Duration("captureduration", 30*time.Second, "Only save the first captureduration of each flow, to prevent long-lived flows from spamming the hard drive.")
flowTimeout = flag.Duration("flowtimeout", 30*time.Second, "Once there have been no packets for a flow for at least flowtimeout, the flow can be assumed to be closed.")
maxHeaderSize = flag.Int("maxheadersize", 256, "The maximum size of packet headers allowed. A lower value allows the pcap process to be less wasteful but risks more esoteric IPv6 headers (which can theoretically be up to the full size of the packet but in practice seem to be under 128) getting truncated.")
netInterface = flag.String("interface", "eth0", "The interface on which to capture packets.")

// Context and injected variables to allow smoke testing of main()
mainCtx, mainCancel = context.WithCancel(context.Background())
pcapOpenLive = pcap.OpenLive
)

func main() {
flag.Parse()
rtx.Must(flagx.ArgsFromEnv(flag.CommandLine), "Could not get args from env")

defer mainCancel()
psrv := prometheusx.MustServeMetrics()
defer warnonerror.Close(psrv, "Could not stop metric server")

rtx.Must(os.Chdir(*dir), "Could not cd to directory %q", *dir)

// A waitgroup to make sure main() doesn't exit before all its components
// get cleaned up.
cleanupWG := sync.WaitGroup{}

// Get ready to save the incoming packets to files.
dm := demuxer.New(anonymize.New(anonymize.IPAnonymizationFlag), *dir, *captureDuration)

// Inform the demuxer of new UUIDs
h := tcpeventhandler.New(mainCtx, dm.UUIDChan)
cleanupWG.Add(1)
go func() {
eventsocket.MustRun(mainCtx, *eventSocket, h)
cleanupWG.Done()
}()

// Open a packet capture
handle, err := pcapOpenLive(*netInterface, int32(*maxHeaderSize), true, pcap.BlockForever)
rtx.Must(err, "Could not create libpcap client")
rtx.Must(handle.SetBPFFilter("tcp"), "Could not set up BPF filter for TCP")
// Stop packet capture when the context is canceled.
cleanupWG.Add(1)
go func() {
<-mainCtx.Done()
handle.Close()
cleanupWG.Done()
}()

// Set up the packet capture.
packetSource := gopacket.NewPacketSource(handle, layers.LinkTypeEthernet)

// Set up the timer for flow timeouts.
flowTimeoutTicker := time.NewTicker(*flowTimeout)
defer flowTimeoutTicker.Stop()

// Capture packets forever, or until mainCtx is cancelled.
dm.CapturePackets(mainCtx, packetSource.Packets(), flowTimeoutTicker.C)

// Wait until all cleanup routines have terminated.
cleanupWG.Wait()
}
53 changes: 53 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"context"
"io/ioutil"
"os"
"testing"
"time"

"github.com/m-lab/go/prometheusx"

"github.com/google/gopacket/pcap"
"github.com/m-lab/go/rtx"
"github.com/m-lab/tcp-info/eventsocket"
)

func fakePcapOpenLive(device string, snaplen int32, promisc bool, timeout time.Duration) (*pcap.Handle, error) {
return pcap.OpenOffline("testdata/v6.pcap")
}

func TestMainSmokeTest(t *testing.T) {
dir, err := ioutil.TempDir("", "TestMainSmokeTest")
rtx.Must(err, "Could not create temp dir")
defer os.RemoveAll(dir)

// Set up a tcpinfo service. Don't use mainCtx for it because if the socket
// goes away while main() is running then main() (correctly) crashes. We
// don't want the exit of main() to race with the termination of this
// server.
tcpiCtx, tcpiCancel := context.WithCancel(context.Background())
defer tcpiCancel()
*eventSocket = dir + "/tcpevents.sock"
tcpi := eventsocket.New(*eventSocket)
tcpi.Listen()
go tcpi.Serve(tcpiCtx)

// Wait until the eventsocket appears.
for _, err := os.Stat(*eventSocket); err != nil; _, err = os.Stat(*eventSocket) {
}

// Tests are unlikely to have enough privileges to open packet captures, so
// use a fake version that reads from one of our testfiles.
pcapOpenLive = fakePcapOpenLive
go func() {
time.Sleep(1)
mainCancel()
}()

// Listen on any port for metrics.
*prometheusx.ListenAddress = ":0"
main()
// No crash and successful termination == success
}
9 changes: 9 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package metrics is the central storage lcoation for all program metrics.
package metrics

import (
Expand Down Expand Up @@ -77,5 +78,13 @@ var (
},
)

BadEventsFromTCPInfo = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "pcap_tcpeventhandler_bad_events_total",
Help: "How many unparseable events have been sent from tcp-info. This should always be zero.",
},
[]string{"reason"},
)

// TODO(https://github.com/m-lab/packet-headers/issues/5) Create some histograms for SLIs
)
1 change: 1 addition & 0 deletions saver/saver.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package saver provides the toold for saving a single flow's packets to disk.
package saver

import (
Expand Down
61 changes: 61 additions & 0 deletions tcpeventhandler/tcpeventhandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Package tcpeventhandler deals with the output from the eventsocket served by the tcp-info binary.
package tcpeventhandler

import (
"context"
"log"
"net"
"time"

"github.com/m-lab/packet-headers/demuxer"
"github.com/m-lab/packet-headers/metrics"
"github.com/m-lab/tcp-info/eventsocket"
"github.com/m-lab/tcp-info/inetdiag"
)

type handler struct {
uuidChan chan<- demuxer.UUIDEvent

// As a general rule it is bad practice to save a context inside a struct.
// Here we do so because Demuxer.GetSaver needs a context passed in, and
// handler calls that function. The only other option is to plumb the
// context through the eventsocket client API, which seems wrong somehow?
//
// TODO: Decide whether or not to plumb the context of the
// eventsocket.Client event loop into the eventsocket api.
ctx context.Context
}

// Open processes an Open message for a new flow, sending its UUID to the demuxer.
func (h *handler) Open(timestamp time.Time, uuid string, id *inetdiag.SockID) {
if id == nil {
metrics.BadEventsFromTCPInfo.WithLabelValues("nilid").Inc()
return
}
srcIP := net.ParseIP(id.SrcIP)
dstIP := net.ParseIP(id.DstIP)
if srcIP == nil || dstIP == nil {
log.Printf("SrcIP: %s -> %s, DstIP: %s -> %s", id.SrcIP, srcIP, id.DstIP, dstIP)
metrics.BadEventsFromTCPInfo.WithLabelValues("badip").Inc()
return
}
// Can't use a struct literal here due to embedding.
ev := demuxer.UUIDEvent{}
ev.Flow = demuxer.FullFlowFrom4Tuple(srcIP, id.SPort, dstIP, id.DPort)
ev.UUID = uuid
ev.Timestamp = timestamp
h.uuidChan <- ev
}

// Close does nothing. Timeouts are the authoritative closing mechanism.
func (h *handler) Close(timestamp time.Time, uuid string) {
}

// New makes a new eventsocket.Handler that informs the demuxer of new flow
// creation.
func New(ctx context.Context, uuidChan chan<- demuxer.UUIDEvent) eventsocket.Handler {
return &handler{
uuidChan: uuidChan,
ctx: ctx,
}
}
Loading

0 comments on commit ae9b04e

Please sign in to comment.