Skip to content

Commit

Permalink
Add disconnect on Nth retry to make it reresolving IP
Browse files Browse the repository at this point in the history
  • Loading branch information
vzhestkov committed Jul 22, 2024
1 parent 4379f48 commit 3454ad2
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions salt/transport/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def __init__(self, opts, io_loop, **kwargs):
self._socket.setsockopt(zmq.IPV4ONLY, 0)

if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]:
self._monitor = ZeroMQSocketMonitor(self._socket)
self._monitor = ZeroMQSocketMonitor(self._socket, opts)
self._monitor.start_io_loop(self.io_loop)

def close(self):
Expand Down Expand Up @@ -641,7 +641,7 @@ def mark_future(msg):
class ZeroMQSocketMonitor:
__EVENT_MAP = None

def __init__(self, socket):
def __init__(self, socket, opts=None):
"""
Create ZMQ monitor sockets
Expand All @@ -651,6 +651,11 @@ def __init__(self, socket):
self._socket = socket
self._monitor_socket = self._socket.get_monitor_socket()
self._monitor_stream = None
self.disconnect_callback = None
self.disconnect_on_retry = None
self._connect_retry = None
if opts is not None:
self.disconnect_on_retry = opts.get("zmq_disconnect_on_retry", 10)

def start_io_loop(self, io_loop):
log.trace("Event monitor start!")
Expand Down Expand Up @@ -688,11 +693,20 @@ def monitor_callback(self, msg):
if evt["event"] == zmq.EVENT_MONITOR_STOPPED:
self.stop()
elif evt["event"] == zmq.EVENT_DISCONNECTED:
if self.disconnect_callback is not None:
self.disconnect_callback()
elif evt["event"] == zmq.EVENT_CONNECT_RETRIED:
if (
hasattr(self, "disconnect_callback")
self.disconnect_on_retry is not None
and self.disconnect_callback is not None
):
self.disconnect_callback()
if self._connect_retry is None:
self._connect_retry = self.disconnect_on_retry
self._connect_retry -= 1
if self._connect_retry <= 0:
log.debug("Calling disconnect callback as number of retries reached.")
self.disconnect_callback()
self._connect_retry = self.disconnect_on_retry

def stop(self):
if self._socket is None:
Expand Down

0 comments on commit 3454ad2

Please sign in to comment.