-
Notifications
You must be signed in to change notification settings - Fork 2
/
databaseinterface.py
123 lines (100 loc) · 3.97 KB
/
databaseinterface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""databaseinterface.py
Povides interface with a MySQL database for storing malware sample metadata.
"""
# Copyright 2015 Ray Canzanese
#
# This file is part of malwareharvester.
#
# malwareharvester is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# malwareharvester is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with malwareharvester. If not, see <http://www.gnu.org/licenses/>.
import mysql.connector
DATA_DIRECTORY = "/path/to/malware/samples"
HOST_DATA = "host_data"
DATBASE = "database.sqlite"
DATA_DIR = "data"
LOGFILE = "debug_log.txt"
TABLE = "malware"
class database_interface:
"""The class we use for interacting with the malware database"""
def __init__(self, host=None, user=None, password=None, database=None):
"""Open connection to database and set cursor and database pointer."""
self.db = mysql.connector.connect(host=host, user=user,
password=password, database=database)
self.cur = self.db.cursor()
# Get table headers
QUERY = "show columns from " + TABLE
self.cur.execute(QUERY)
test = self.cur.fetchall()
self.headings = []
for result in test:
self.headings.append(result[0])
def __del__(self):
"""Close database."""
self.db.close()
def contains_sha1(self, sha1):
"""Return true if database contains sha1."""
self.cur.execute("SELECT * FROM " + TABLE + " WHERE SHA1=%s", (sha1,))
if self.cur.fetchone():
return True
else:
return False
def contains_md5(self, md5):
"""Return true if database contains md5."""
self.cur.execute("SELECT * FROM " + TABLE + " WHERE MD5=%s", (md5,))
if self.cur.fetchone():
return True
else:
return False
def add_malware(self, sha1):
"""Add a new malware sample by sha1"""
self.cur.execute("INSERT into malware (SHA1) VALUES (%s)", (sha1,))
self.db.commit()
def update_dict(self, sha1, data):
"""Update entry for sha1 with information in data dict."""
for key in data:
value = data[key]
if key not in self.headings:
print(key + " NOT in headings!")
add_query = "ALTER TABLE " + TABLE + " ADD %s TINYTEXT"
self.cur.execute(add_query, (key,))
self.headings.append(key)
self.db.commit()
# Now add the data
query = "UPDATE " + TABLE + " SET " + key + "=%s WHERE SHA1=%s"
self.cur.execute(query, (value, sha1))
self.db.commit()
def query(self, query, values=None):
"""Execute a user-specified SQL query that returns data.
Keyword arguments:
query - the query using %s notation for user-entered data
values - the user entered data as a list or tuple
"""
self.cur.execute(query, values)
results = self.cur.fetchall()
return results
def update(self, query, values=None):
"""Execute a user-specified SQL query that updates data.
Keyword arguments:
query - the query using %s notation for user-entered data
values - the user entered data as a list or tuple
"""
self.cur.execute(query, values)
self.db.commit()
def commit(self):
"""Run database commit command."""
self.db.commit()
def delete_md5(self, md5):
"""Delete a database entry."""
self.cur.execute("""DELETE FROM malware WHERE MD5=%s """, (md5,))
self.db.commit()