From 6efff5251b887306f46c1aeff8984d6292390378 Mon Sep 17 00:00:00 2001 From: Heikki Nousiainen Date: Tue, 29 Sep 2020 21:45:27 +0300 Subject: [PATCH] Support connecting through SOCKS5 proxies (#2169) Implement support for SOCKS5 proxies. Implement a new proxy wrapper that handles SOCKS5 connection, authentication and requesting connections to the actual Kafka broker endpoint. The proxy can be configured via a new keyword argument `socks5_proxy` to consumers, producers or admin client. The value is URL with optional username and password. E.g. `socks5://user:secret@proxy.example.com:10800` The implementation is done in state machine that emulates repeated calls to connect_ex. The rationale with this design is minimal changes to the actual BrokerConnection object. --- kafka/admin/client.py | 2 + kafka/client_async.py | 4 +- kafka/conn.py | 17 ++- kafka/consumer/group.py | 2 + kafka/producer/kafka.py | 2 + kafka/socks5_wrapper.py | 248 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 271 insertions(+), 4 deletions(-) create mode 100644 kafka/socks5_wrapper.py diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 8eb7504a7..8cfbfc535 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -147,6 +147,7 @@ class KafkaAdminClient(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy url. Default: None """ DEFAULT_CONFIG = { @@ -182,6 +183,7 @@ class KafkaAdminClient(object): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, # metrics configs 'metric_reporters': [], diff --git a/kafka/client_async.py b/kafka/client_async.py index f32cd3a36..354a44fbc 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,7 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + socks5_proxy (str): Socks5 proxy URL. Default: None """ DEFAULT_CONFIG = { @@ -192,7 +193,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, } def __init__(self, **configs): diff --git a/kafka/conn.py b/kafka/conn.py index 1efb8a0a1..df903f264 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -33,6 +33,7 @@ from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient +from kafka.socks5_wrapper import Socks5Wrapper from kafka.version import __version__ @@ -191,6 +192,7 @@ class BrokerConnection(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + socks5_proxy (str): Socks5 proxy url. Default: None """ DEFAULT_CONFIG = { @@ -224,7 +226,8 @@ class BrokerConnection(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'socks5_proxy': None, } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") @@ -236,6 +239,7 @@ def __init__(self, host, port, afi, **configs): self._sock_afi = afi self._sock_addr = None self._api_versions = None + self._socks5_proxy = None self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -368,7 +372,11 @@ def connect(self): log.debug('%s: creating new socket', self) assert self._sock is None self._sock_afi, self._sock_addr = next_lookup - self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) + if self.config["socks5_proxy"] is not None: + self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi) + self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM) + else: + self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) for option in self.config['socket_options']: log.debug('%s: setting socket option %s', self, option) @@ -385,7 +393,10 @@ def connect(self): # to check connection status ret = None try: - ret = self._sock.connect_ex(self._sock_addr) + if self._socks5_proxy: + ret = self._socks5_proxy.connect_ex(self._sock_addr) + else: + ret = self._sock.connect_ex(self._sock_addr) except socket.error as err: ret = err.errno diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..969969932 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -245,6 +245,7 @@ class KafkaConsumer(six.Iterator): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy URL. Default: None Note: Configuration parameters are described in more detail at @@ -308,6 +309,7 @@ class KafkaConsumer(six.Iterator): 'sasl_oauth_token_provider': None, 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, + 'socks5_proxy': None, } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dd1cc508c..431642776 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -281,6 +281,7 @@ class KafkaProducer(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + socks5_proxy (str): Socks5 proxy URL. Default: None Note: Configuration parameters are described in more detail at @@ -335,6 +336,7 @@ class KafkaProducer(object): 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'kafka_client': KafkaClient, + 'socks5_proxy': None, } _COMPRESSORS = { diff --git a/kafka/socks5_wrapper.py b/kafka/socks5_wrapper.py new file mode 100644 index 000000000..18bea7c8d --- /dev/null +++ b/kafka/socks5_wrapper.py @@ -0,0 +1,248 @@ +try: + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse + +import errno +import logging +import random +import socket +import struct + +log = logging.getLogger(__name__) + + +class ProxyConnectionStates: + DISCONNECTED = '' + CONNECTING = '' + NEGOTIATE_PROPOSE = '' + NEGOTIATING = '' + AUTHENTICATING = '' + REQUEST_SUBMIT = '' + REQUESTING = '' + READ_ADDRESS = '' + COMPLETE = '' + + +class Socks5Wrapper: + """Socks5 proxy wrapper + + Manages connection through socks5 proxy with support for username/password + authentication. + """ + + def __init__(self, proxy_url, afi): + self._buffer_in = b'' + self._buffer_out = b'' + self._proxy_url = urlparse(proxy_url) + self._sock = None + self._state = ProxyConnectionStates.DISCONNECTED + self._target_afi = socket.AF_UNSPEC + + proxy_addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, afi) + # TODO raise error on lookup failure + self._proxy_addr = random.choice(proxy_addrs) + + @classmethod + def is_inet_4_or_6(cls, gai): + """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" + return gai[0] in (socket.AF_INET, socket.AF_INET6) + + @classmethod + def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC): + """Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)""" + # XXX: all DNS functions in Python are blocking. If we really + # want to be non-blocking here, we need to use a 3rd-party + # library like python-adns, or move resolution onto its + # own thread. This will be subject to the default libc + # name resolution timeout (5s on most Linux boxes) + try: + return list(filter(cls.is_inet_4_or_6, + socket.getaddrinfo(host, port, afi, + socket.SOCK_STREAM))) + except socket.gaierror as ex: + log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex) + return [] + + def socket(self, family, sock_type): + """Open and record a socket. + + Returns the actual underlying socket + object to ensure e.g. selects and ssl wrapping works as expected. + """ + self._target_afi = family # Store the address family of the target + afi, _, _, _, _ = self._proxy_addr + self._sock = socket.socket(afi, sock_type) + return self._sock + + def _flush_buf(self): + """Send out all data that is stored in the outgoing buffer. + + It is expected that the caller handles error handling, including non-blocking + as well as connection failure exceptions. + """ + while self._buffer_out: + sent_bytes = self._sock.send(self._buffer_out) + self._buffer_out = self._buffer_out[sent_bytes:] + + def _peek_buf(self, datalen): + """Ensure local inbound buffer has enough data, and return that data without + consuming the local buffer + + It's expected that the caller handles e.g. blocking exceptions""" + while True: + bytes_remaining = datalen - len(self._buffer_in) + if bytes_remaining <= 0: + break + data = self._sock.recv(bytes_remaining) + if not data: + break + self._buffer_in = self._buffer_in + data + + return self._buffer_in[:datalen] + + def _read_buf(self, datalen): + """Read and consume bytes from socket connection + + It's expected that the caller handles e.g. blocking exceptions""" + buf = self._peek_buf(datalen) + if buf: + self._buffer_in = self._buffer_in[len(buf):] + return buf + + def connect_ex(self, addr): + """Runs a state machine through connection to authentication to + proxy connection request. + + The somewhat strange setup is to facilitate non-intrusive use from + BrokerConnection state machine. + + This function is called with a socket in non-blocking mode. Both + send and receive calls can return in EWOULDBLOCK/EAGAIN which we + specifically avoid handling here. These are handled in main + BrokerConnection connection loop, which then would retry calls + to this function.""" + + if self._state == ProxyConnectionStates.DISCONNECTED: + self._state = ProxyConnectionStates.CONNECTING + + if self._state == ProxyConnectionStates.CONNECTING: + _, _, _, _, sockaddr = self._proxy_addr + ret = self._sock.connect_ex(sockaddr) + if not ret or ret == errno.EISCONN: + self._state = ProxyConnectionStates.NEGOTIATE_PROPOSE + else: + return ret + + if self._state == ProxyConnectionStates.NEGOTIATE_PROPOSE: + if self._proxy_url.username and self._proxy_url.password: + # Propose username/password + self._buffer_out = b"\x05\x01\x02" + else: + # Propose no auth + self._buffer_out = b"\x05\x01\x00" + self._state = ProxyConnectionStates.NEGOTIATING + + if self._state == ProxyConnectionStates.NEGOTIATING: + self._flush_buf() + buf = self._read_buf(2) + if buf[0:1] != b"\x05": + log.error("Unrecognized SOCKS version") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if buf[1:2] == b"\x00": + # No authentication required + self._state = ProxyConnectionStates.REQUEST_SUBMIT + elif buf[1:2] == b"\x02": + # Username/password authentication selected + userlen = len(self._proxy_url.username) + passlen = len(self._proxy_url.password) + self._buffer_out = struct.pack( + "!bb{}sb{}s".format(userlen, passlen), + 1, # version + userlen, + self._proxy_url.username.encode(), + passlen, + self._proxy_url.password.encode(), + ) + self._state = ProxyConnectionStates.AUTHENTICATING + else: + log.error("Unrecognized SOCKS authentication method") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.AUTHENTICATING: + self._flush_buf() + buf = self._read_buf(2) + if buf == b"\x01\x00": + # Authentication succesful + self._state = ProxyConnectionStates.REQUEST_SUBMIT + else: + log.error("Socks5 proxy authentication failure") + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.REQUEST_SUBMIT: + if self._target_afi == socket.AF_INET: + addr_type = 1 + addr_len = 4 + elif self._target_afi == socket.AF_INET6: + addr_type = 4 + addr_len = 16 + else: + log.error("Unknown address family, %r", self._target_afi) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + self._buffer_out = struct.pack( + "!bbbb{}sh".format(addr_len), + 5, # version + 1, # command: connect + 0, # reserved + addr_type, # 1 for ipv4, 4 for ipv6 address + socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address + addr[1], # port + ) + self._state = ProxyConnectionStates.REQUESTING + + if self._state == ProxyConnectionStates.REQUESTING: + self._flush_buf() + buf = self._read_buf(2) + if buf[0:2] == b"\x05\x00": + self._state = ProxyConnectionStates.READ_ADDRESS + else: + log.error("Proxy request failed: %r", buf[1:2]) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + + if self._state == ProxyConnectionStates.READ_ADDRESS: + # we don't really care about the remote endpoint address, but need to clear the stream + buf = self._peek_buf(2) + if buf[0:2] == b"\x00\x01": + _ = self._read_buf(2 + 4 + 2) # ipv4 address + port + elif buf[0:2] == b"\x00\x05": + _ = self._read_buf(2 + 16 + 2) # ipv6 address + port + else: + log.error("Unrecognized remote address type %r", buf[1:2]) + self._state = ProxyConnectionStates.DISCONNECTED + self._sock.close() + return errno.ECONNREFUSED + self._state = ProxyConnectionStates.COMPLETE + + if self._state == ProxyConnectionStates.COMPLETE: + return 0 + + # not reached; + # Send and recv will raise socket error on EWOULDBLOCK/EAGAIN that is assumed to be handled by + # the caller. The caller re-enters this state machine from retry logic with timer or via select & family + log.error("Internal error, state %r not handled correctly", self._state) + self._state = ProxyConnectionStates.DISCONNECTED + if self._sock: + self._sock.close() + return errno.ECONNREFUSED