Skip to content

Commit

Permalink
add json filter
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanDeveloper committed Jan 31, 2024
1 parent d38b511 commit 9afb12c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 15 deletions.
12 changes: 6 additions & 6 deletions heidpi/heiDPI_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,9 @@ def heidpi_log_event(config_dict, json_dict, additional_processing):
json_dict_copy = copy.deepcopy(json_dict)
json_dict_copy['timestamp'] = get_timestamp()

logging.debug("Running additional processing")

if additional_processing != None:
additional_processing(config_dict, json_dict_copy)

logging.debug("Finished additional processing")

ignore_fields = config_dict["ignore_fields"]
if ignore_fields != []:
list(map(json_dict_copy.pop, ignore_fields, [None] * len(ignore_fields)))
Expand Down Expand Up @@ -102,10 +98,12 @@ def heidpi_flow_processing(config_dict: dict, json_dict: dict):
if "ndpi" in json_dict and "flow_risk" in json_dict["ndpi"] and config_dict["ignore_risks"] != []:
list(map(json_dict["ndpi"]["flow_risk"].pop, config_dict["ignore_risks"], [None] * len(config_dict["ignore_risks"])))

def heidpi_worker(address, function):
def heidpi_worker(address, function, filter):
nsock = heiDPIsrvd.nDPIsrvdSocket()
nsock.connect(address)
nsock.loop(function, None, None)
if filter != "":
nsock.addFilter(filter_str=filter)

def heidpi_type_analyzer(json_dict, instance, current_flow, global_user_data):
if SHOW_FLOW_EVENTS and ("flow_event_id" in json_dict):
Expand Down Expand Up @@ -161,6 +159,8 @@ def main():
parser.add_argument('--write', type=dir_path, action=heiDPI_env.env_default('WRITE'), default='/var/log', help='heiDPI write path for logs')

parser.add_argument('--config', type=file_path, action=heiDPI_env.env_default('CONFIG'), default=f'{os.getcwd()}/config.yml', help='heiDPI write path for logs')

parser.add_argument('--filter', type=str, action=heiDPI_env.env_default('FILTER'), required=False, default="", help="nDPId filter string, e.g. --filter 'ndpi' in json_dict and 'proto' in json_dict['ndpi']")

parser.add_argument('--show-daemon-events', type=int, action=heiDPI_env.env_default('SHOW_DAEMON_EVENTS'), default=0, required=False, help='heiDPI shows daemon events')
parser.add_argument('--show-packet-events', type=int, action=heiDPI_env.env_default('SHOW_PACKET_EVENTS'), default=0, required=False, help='heiDPI shows packet events')
Expand Down Expand Up @@ -231,7 +231,7 @@ def main():

POOL_ERROR = ThreadPool(ERROR_CONFIG['threads'])

heidpi_worker(address, heidpi_type_analyzer)
heidpi_worker(address, heidpi_type_analyzer, args.filter)

if __name__ == '__main__':
main()
58 changes: 49 additions & 9 deletions heidpi/heiDPIsrvd.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,43 @@ def __init__(self):
def __str__(self):
return 'Socket timeout.'

class JsonFilter():
def __init__(self, filter_string):
self.filter_string = filter_string
self.filter = compile(filter_string, '<string>', 'eval')
def evaluate(self, json_dict):
if type(json_dict) is not dict:
raise nDPIsrvdException('Could not evaluate JSON Filter: expected dictionary, got {}'.format(type(json_dict)))
return eval(self.filter, {'json_dict': json_dict})

class nDPIsrvdSocket:
def __init__(self):
self.sock_family = None
self.flow_mgr = FlowManager()
self.received_bytes = 0
self.json_filter = list()

def addFilter(self, filter_str):
self.json_filter.append(JsonFilter(filter_str))

def evalFilters(self, json_dict):
for jf in self.json_filter:
try:
json_filter_retval = jf.evaluate(json_dict)
except Exception as err:
print()
sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string))
raise err

if not isinstance(json_filter_retval, bool):
print()
sys.stderr.write('Error while evaluating expression "{}"\n'.format(jf.filter_string))
raise nDPIsrvdException('JSON Filter returned an invalid type: expected bool, got {}'.format(type(json_filter_retval)))

if json_filter_retval is False:
return False

return True

def connect(self, addr):
if type(addr) is tuple:
Expand All @@ -288,6 +320,7 @@ def connect(self, addr):
self.digitlen = 0
self.lines = []
self.failed_lines = []
self.filtered_lines = 0

def timeout(self, timeout):
self.sock.settimeout(timeout)
Expand Down Expand Up @@ -346,6 +379,7 @@ def parse(self, callback_json, callback_flow_cleanup, global_user_data):
try:
json_dict = json.loads(received_line[0].decode('ascii', errors='replace'), strict=True)
except json.decoder.JSONDecodeError as e:
json_dict = dict()
self.failed_lines += [received_line]
self.lines = self.lines[1:]
raise(e)
Expand All @@ -356,19 +390,24 @@ def parse(self, callback_json, callback_flow_cleanup, global_user_data):
retval = False
continue

try:
if callback_json(json_dict, instance, self.flow_mgr.getFlow(instance, json_dict), global_user_data) is not True:
self.failed_lines += [received_line]
retval = False
except Exception as e:
self.failed_lines += [received_line]
self.lines = self.lines[1:]
raise(e)
current_flow = self.flow_mgr.getFlow(instance, json_dict)
filter_eval = self.evalFilters(json_dict)
if filter_eval is True:
try:
if callback_json(json_dict, instance, current_flow, global_user_data) is not True:
self.failed_lines += [received_line]
retval = False
except Exception as e:
self.failed_lines += [received_line]
self.lines = self.lines[1:]
raise(e)
else:
self.filtered_lines += 1

for _, flow in self.flow_mgr.getFlowsToCleanup(instance, json_dict).items():
if callback_flow_cleanup is None:
pass
elif callback_flow_cleanup(instance, flow, global_user_data) is not True:
elif filter_eval is True and callback_flow_cleanup(instance, flow, global_user_data) is not True:
self.failed_lines += [received_line]
self.lines = self.lines[1:]
retval = False
Expand Down Expand Up @@ -401,6 +440,7 @@ def verify(self):
raise nDPIsrvdException('Failed lines > 0: {}'.format(len(self.failed_lines)))
return self.flow_mgr.verifyFlows()


def toSeconds(usec):
return usec / (1000 * 1000)

Expand Down

0 comments on commit 9afb12c

Please sign in to comment.