Skip to content

Commit

Permalink
poc for generating traceroute traffic from jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
irl committed Aug 26, 2018
1 parent 20d65a2 commit ffa72d1
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 5 deletions.
60 changes: 58 additions & 2 deletions pathspider/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import queue
from datetime import datetime

from scapy.all import send

from pathspider.network import ipv4_address
from pathspider.network import ipv6_address
from pathspider.network import ipv4_address_public
Expand Down Expand Up @@ -142,6 +144,33 @@ def configurator(self):
def worker(self, worker_number):
raise NotImplementedError("Cannot instantiate an abstract Spider")

def traceroute_worker(self, worker_number):
while self.running:
try:
job = self.jobqueue.get_nowait()
# Break on shutdown sentinel
if job == SHUTDOWN_SENTINEL:
self.jobqueue.task_done()
self.__logger.debug(
"shutting down worker %d on sentinel",
worker_number)
worker_active = False
with self.active_worker_lock:
self.active_worker_count -= 1
self.__logger.debug("%d workers still active",
self.active_worker_count)
break

self.__logger.debug("got a job: " + repr(job))
except queue.Empty:
time.sleep(QUEUE_SLEEP)
else:
conns = []
for seq in range(0, len(self.connections)):
conns.append(self._trace_wrapper(job, seq))
self.__logger.debug("trace job complete: " + repr(job))
self.jobqueue.task_done()

def _connect_wrapper(self, job, config, connect=None):
start = str(datetime.utcnow())
if connect is None:
Expand All @@ -153,6 +182,33 @@ def _connect_wrapper(self, job, config, connect=None):
conn['spdr_start'] = start
return conn

def trace(self, job, seq, template=None):
logger = logging.getLogger('hopspider')
if template is None:
try:
template = self.forge(job, seq)
except NotImplementedError:
logger.error("This plugin has not implemented packet forging"
"which is required for standalone traceroute")
sys.exit(1)
for hop in range(1, 31):
pkt = template.copy()
pkt.ttl = hop
# TODO: A series of functions are needed to "mark" packets
# using different strategies so we can identify them
# when they come back.
send(pkt, verbose=0)
return {'sp': pkt.getlayer(1).sport}

def _trace_wrapper(self, job, seq):
start = str(datetime.utcnow())
conn = self.trace(job, seq)
conn['spdr_start'] = start
return conn

def forge(self, job, seq):
raise NotImplementedError("This plugin has not implemented packet forging")

def create_observer(self):
"""
Create a flow observer.
Expand Down Expand Up @@ -379,7 +435,7 @@ def _finalise_conns(self, job, jobId, conns):
self.resqueue.put(conn)
config += 1

def start(self):
def start(self, worker=None):
"""
This function starts a PATHspider plugin by:
Expand Down Expand Up @@ -432,7 +488,7 @@ def start(self):
self.active_worker_count = self.worker_count
for i in range(self.worker_count):
worker_thread = threading.Thread(
args=(self.worker, i),
args=(worker or self.worker, i),
target=self.exception_wrapper,
name='worker_{}'.format(i),
daemon=True)
Expand Down
2 changes: 2 additions & 0 deletions pathspider/cmd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pathspider.cmd.metadata
import pathspider.cmd.observe
import pathspider.cmd.test
import pathspider.cmd.trace

cmds = [
pathspider.cmd.analyze,
Expand All @@ -17,6 +18,7 @@
pathspider.cmd.metadata,
pathspider.cmd.observe,
pathspider.cmd.test,
pathspider.cmd.trace,
]

def handle_args(argv):
Expand Down
98 changes: 98 additions & 0 deletions pathspider/cmd/trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@

import argparse
import logging
import json
import sys
import threading
import csv

from straight.plugin import load

from pathspider.base import PluggableSpider
from pathspider.base import SHUTDOWN_SENTINEL
from pathspider.cmd.measure import job_feeder_csv
from pathspider.cmd.measure import job_feeder_ndjson
from pathspider.network import interface_up

plugins = load("pathspider.plugins", subclasses=PluggableSpider)

def run_traceroute(args):
logger = logging.getLogger("pathspider")

try:
if hasattr(args, "spider"):
if interface_up(args.interface):
spider = args.spider(args.workers, "int:" + args.interface, args)
else:
logger.error("The chosen interface is not up! Cannot continue.")
sys.exit(1)
else:
logger.error("Plugin not found! Cannot continue.")
logger.error("Use --help to list all plugins.")
sys.exit(1)

logger.info("activating spider...")

spider.start(worker=spider.traceroute_worker)

logger.debug("starting job feeder...")
if args.csv_input:
job_feeder = job_feeder_csv
else:
job_feeder = job_feeder_ndjson

threading.Thread(target=job_feeder, args=(args.input, spider)).start()

with open(args.output, 'w') as outputfile:
logger.info("opening output file "+args.output)
while True:
result = spider.outqueue.get()
if result == SHUTDOWN_SENTINEL:
logger.info("output complete")
break
if not args.output_flows:
result.pop("flow_results", None)
result.pop("missed_flows", None)
outputfile.write(json.dumps(result) + "\n")
logger.debug("wrote a result")
spider.outqueue.task_done()

except KeyboardInterrupt:
logger.error("Received keyboard interrupt, dying now.")

def register_args(subparsers):
class SubcommandHelpFormatter(argparse.RawDescriptionHelpFormatter):
def _format_action(self, action):
parts = super()._format_action(action)
if action.nargs == argparse.PARSER:
parts = "\n".join([line for line in parts.split("\n")[1:]])
parts += "\n\nSpider safely!"
return parts

parser = subparsers.add_parser(name='trace',
help="Perform a PATHspider traceroute",
formatter_class=SubcommandHelpFormatter)
parser.add_argument('-i', '--interface', default="eth0",
help="The interface to use for the observer. (Default: eth0)")
parser.add_argument('-w', '--workers', type=int, default=20,
help="Number of workers to use. (Default: 20)")
parser.add_argument('--input', default='/dev/stdin', metavar='INPUTFILE',
help=("A file containing a list of PATHspider jobs. "
"Defaults to standard input."))
parser.add_argument('--csv-input', action='store_true',
help=("Indicate CSV format."))
parser.add_argument('--output', default='/dev/stdout', metavar='OUTPUTFILE',
help=("The file to output results data to. "
"Defaults to standard output."))
parser.add_argument('--output-flows', action='store_true',
help="Include flow results in output.")

# Set the command entry point
parser.set_defaults(cmd=run_traceroute)

# Add plugins
plugin_subparsers = parser.add_subparsers(title="Plugins",
description="The following plugins are available for use:",
metavar='PLUGIN', help='plugin to use')
for plugin in plugins:
plugin.register_args(plugin_subparsers)
3 changes: 0 additions & 3 deletions pathspider/forge.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ def connect(self, job, seq):
send(pkt, verbose=0)
return {'sp': pkt.getlayer(1).sport}

def forge(self, job, config):
raise NotImplementedError("Cannot register an abstract plugin")

@classmethod
def register_args(cls, subparsers):
# pylint: disable=no-member
Expand Down

0 comments on commit ffa72d1

Please sign in to comment.