Skip to content

Commit

Permalink
Merge pull request #3591 from anurag6/port_flush
Browse files Browse the repository at this point in the history
Flush ports post datapath disconnect.
  • Loading branch information
gizmoguy authored Jun 9, 2020
2 parents a890974 + d8ce89a commit 6c73bc2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 8 deletions.
16 changes: 14 additions & 2 deletions clib/valve_test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,10 @@ def connect_dp(self, dp_id=None):
self.assertTrue(valve.dp.to_conf())
return connect_msgs

def disconnect_dp(self):
valve = self.valves_manager.valves[self.DP_ID]
valve.datapath_disconnect(time.time())

def cold_start(self, dp_id=None):
"""
Cold start a DP
Expand All @@ -807,7 +811,7 @@ def cold_start(self, dp_id=None):
if dp_id is None:
dp_id = self.DP_ID
valve = self.valves_manager.valves[dp_id]
valve.datapath_disconnect()
valve.datapath_disconnect(time.time())
return self.connect_dp(dp_id)

def get_prom(self, var, labels=None, bare=False, dp_id=None):
Expand Down Expand Up @@ -1289,7 +1293,7 @@ def test_disconnect(self):
"""Test disconnection of DP from controller."""
valve = self.valves_manager.valves[self.DP_ID]
self.assertEqual(1, int(self.get_prom('dp_status')))
self.prom_inc(partial(valve.datapath_disconnect), 'of_dp_disconnections_total')
self.prom_inc(partial(valve.datapath_disconnect, time.time()), 'of_dp_disconnections_total')
self.assertEqual(0, int(self.get_prom('dp_status')))

def test_unexpected_port(self):
Expand Down Expand Up @@ -2134,6 +2138,14 @@ def test_get_config_dict(self):
self.assertTrue(valve.get_config_dict())
self.assertTrue(valve.dp.get_tables())

def test_dp_disconnect_cleanup(self):
"""Test port varz cleanup post dp disconnect"""
valve = self.valves_manager.valves[self.DP_ID]
port_num = list(valve.dp.ports.keys())[0]
self.port_expected_status(port_num, 1)
self.disconnect_dp()
self.port_expected_status(port_num, 0)


class ValveTestStackedRouting(ValveTestNetwork):
"""Test inter-vlan routing with stacking capabilities in an IPV4 network"""
Expand Down
4 changes: 2 additions & 2 deletions faucet/faucet.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def _send_flow_msgs(self, valve, flow_msgs, ryu_dp=None):
if not ryu_dp:
valve.logger.error('send_flow_msgs: DP not up')
return
valve.send_flows(ryu_dp, flow_msgs)
valve.send_flows(ryu_dp, flow_msgs, time.time())

def _get_valve(self, ryu_event, require_running=False):
"""Get Valve instance to response to an event.
Expand Down Expand Up @@ -299,7 +299,7 @@ def _datapath_disconnect(self, ryu_event):
valve, _, _ = self._get_valve(ryu_event)
if valve is None:
return
valve.datapath_disconnect()
valve.datapath_disconnect(time.time())

@set_ev_cls(ofp_event.EventOFPDescStatsReply, MAIN_DISPATCHER) # pylint: disable=no-member
@kill_on_exception(exc_logname)
Expand Down
13 changes: 9 additions & 4 deletions faucet/valve.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ def datapath_connect(self, now, discovered_up_ports):
self._reset_dp_status()
return ofmsgs

def datapath_disconnect(self):
def datapath_disconnect(self, now):
"""Handle Ryu datapath disconnection event."""
self.logger.warning('datapath down')
self.notify(
Expand All @@ -683,6 +683,7 @@ def datapath_disconnect(self):
self.dp.dyn_running = False
self._inc_var('of_dp_disconnections')
self._reset_dp_status()
self.ports_delete(self.dp.ports.keys(), now=now)

def _port_delete_manager_state(self, port):
ofmsgs = []
Expand Down Expand Up @@ -763,7 +764,7 @@ def port_add(self, port_num):
"""
return self.ports_add([port_num])

def ports_delete(self, port_nums, log_msg='down', keep_cache=False, other_valves=None):
def ports_delete(self, port_nums, log_msg='down', keep_cache=False, other_valves=None, now=None):
"""Handle the deletion of ports.
Args:
Expand All @@ -781,6 +782,10 @@ def ports_delete(self, port_nums, log_msg='down', keep_cache=False, other_valves
port.dyn_phys_up = False
self.logger.info('%s (%s) %s' % (port, port.description, log_msg))

# now is set to a time value only when ports_delete is called to flush
if now:
self._set_port_status(port_num, False, now)

if port.output_only:
continue

Expand Down Expand Up @@ -1816,7 +1821,7 @@ def prepare_send_flows(self, flow_msgs):
self.recent_ofmsgs.extend(reordered_flow_msgs)
return reordered_flow_msgs

def send_flows(self, ryu_dp, flow_msgs):
def send_flows(self, ryu_dp, flow_msgs, now):
"""Send flows to datapath (or disconnect an OF session).
Args:
Expand All @@ -1830,7 +1835,7 @@ def ryu_send_flows(local_flow_msgs):
ryu_dp.send_msg(flow_msg)

if flow_msgs is None:
self.datapath_disconnect()
self.datapath_disconnect(now)
ryu_dp.close()
else:
ryu_send_flows(flow_msgs)
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/faucet/test_valve.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,24 @@ def test_lacp_timeout(self):
self.assertFalse(
valve.dp.ports[1].non_stack_forwarding())

def test_dp_disconnect(self):
"""Test LACP state when disconnects."""
test_port = 1
labels = self.port_labels(test_port)
self.assertEqual(
1, int(self.get_prom('port_lacp_state', labels=labels)))
self.rcv_packet(test_port, 0, {
'actor_system': '0e:00:00:00:00:02',
'partner_system': FAUCET_MAC,
'eth_dst': slow.SLOW_PROTOCOL_MULTICAST,
'eth_src': '0e:00:00:00:00:02',
'actor_state_synchronization': 1})
self.assertEqual(
3, int(self.get_prom('port_lacp_state', labels=labels)))
self.disconnect_dp()
self.assertEqual(
0, int(self.get_prom('port_lacp_state', labels=labels)))


class ValveTFMSizeOverride(ValveTestBases.ValveTestNetwork):
"""Test TFM size override."""
Expand Down

0 comments on commit 6c73bc2

Please sign in to comment.