-
Notifications
You must be signed in to change notification settings - Fork 0
/
ssh.py
132 lines (101 loc) · 4.33 KB
/
ssh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import os
import argparse
from warnings import filterwarnings
filterwarnings("ignore")
from twisted.internet import reactor, endpoints, defer
from twisted.conch.ssh import factory, keys, userauth, connection, transport
from twisted.cred import portal, credentials, error
from twisted.logger import textFileLogObserver
from twisted.python import log
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from zope.interface import implementer
script_dir = os.path.dirname(os.path.abspath(__file__))
@implementer(portal.IRealm)
class SimpleSSHRealm:
def requestAvatar(self, avatar_id, mind, *interfaces):
if conchinterfaces.IConchUser in interfaces:
return interfaces[0], SimpleSSHAvatar(avatar_id), lambda: None
else:
raise Exception("No supported interfaces found.")
def getRSAKeys():
public_key_path = os.path.join(script_dir, "id_rsa.pub")
private_key_path = os.path.join(script_dir, "id_rsa")
if not (os.path.exists(public_key_path) and os.path.exists(private_key_path)):
ssh_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
public_key = ssh_key.public_key().public_bytes(
serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH
)
private_key = ssh_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
)
with open(public_key_path, "wb") as key_file:
key_file.write(public_key)
with open(private_key_path, "wb") as key_file:
key_file.write(private_key)
else:
with open(public_key_path, "rb") as key_file:
public_key = key_file.read()
with open(private_key_path, "rb") as key_file:
private_key = key_file.read()
return public_key, private_key
class CustomSSHServerTransport(transport.SSHServerTransport):
def __init__(self, our_version_string):
self.ourVersionString = our_version_string.encode()
transport.SSHServerTransport.__init__(self)
class SimpleSSHFactory(factory.SSHFactory):
def __init__(self, our_version_string):
self.ourVersionString = our_version_string
publicKeys = {b"ssh-rsa": keys.Key.fromString(data=getRSAKeys()[0])}
privateKeys = {b"ssh-rsa": keys.Key.fromString(data=getRSAKeys()[1])}
services = {
b"ssh-userauth": userauth.SSHUserAuthServer,
b"ssh-connection": connection.SSHConnection,
}
def buildProtocol(self, addr):
t = CustomSSHServerTransport(self.ourVersionString)
t.supportedPublicKeys = self.publicKeys.keys()
t.factory = self
return t
class LoggingPasswordChecker:
credentialInterfaces = [credentials.IUsernamePassword]
def requestAvatarId(self, creds):
log.msg(
f"Login attempt - Username: {creds.username}, Password: {creds.password}"
)
return defer.fail(error.UnauthorizedLogin())
def main():
parser = argparse.ArgumentParser(description="Run a simple SSH honeypot server.")
parser.add_argument(
"--host", type=str, default="0.0.0.0", help="Host to bind the SSH server to."
)
parser.add_argument(
"--port", type=int, default=2222, help="Port to bind the SSH server to."
)
parser.add_argument(
"--version",
type=str,
default="SSH-2.0-OpenSSH_7.4",
help="Custom SSH version string to display.",
)
args = parser.parse_args()
LOG_FILE_PATH = os.path.join(script_dir, "ssh_honeypot.log")
print(f"SSH HONEYPOT ACTIVE ON HOST: {args.host}, PORT: {args.port}")
print(f"ALL attempts will be logged in: {LOG_FILE_PATH}")
log_observer = textFileLogObserver(open(LOG_FILE_PATH, "a"))
log.startLoggingWithObserver(log_observer, setStdout=False)
ssh_factory = SimpleSSHFactory(args.version)
ssh_realm = SimpleSSHRealm()
ssh_portal = portal.Portal(ssh_realm)
ssh_portal.registerChecker(LoggingPasswordChecker())
ssh_factory.portal = ssh_portal
endpoint = endpoints.TCP4ServerEndpoint(reactor, args.port, interface=args.host)
endpoint.listen(ssh_factory)
reactor.run()
if __name__ == "__main__":
main()