Skip to content

Commit

Permalink
Revert "Stop overriding Case.__getattr__"
Browse files Browse the repository at this point in the history
We won't be needing this in 0.9.x at all since 1.0.x changes things up a lot.
  • Loading branch information
hmpf committed Aug 24, 2023
1 parent cc2e191 commit c6383be
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 80 deletions.
118 changes: 60 additions & 58 deletions src/zinolib/ritz.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,28 +207,11 @@ class Case:
def __init__(self, zino, caseid):
self._zino = zino
self._caseid = caseid
self._copy_attributes(caseid)
self._attrs = self._zino.get_attributes(caseid)

def __repr__(self):
return "%s(%s)" % (str(self.__class__), self._caseid)

def _copy_attributes(self, caseid):
self._attrs = self._zino.get_attributes(caseid)
for attr, value in self._attrs.items():
setattr(self, attr, value)

@property
def history(self):
return self._zino.get_history(self._caseid)

@property
def log(self):
return self._zino.get_log(self._caseid)

@property
def downtime(self):
return self.get_downtime()

def clear_flapping(self):
"""Clear flapping state if this case object
Expand Down Expand Up @@ -278,6 +261,25 @@ def poll(self):
else:
return self._zino.poll_router(self._attrs["router"])

def __getattr__(self, name):
"""Wrapper to get all attributbutes of the object as python attributes
Usage:
c = ritz_session.case(123)
print(c.id)
"""
if name in self._attrs:
return self._attrs[name]
elif "history" == name:
return self._zino.get_history(self._caseid)
elif "log" == name:
return self._zino.get_log(self._caseid)
elif "downtime" == name:
return self.get_downtime()
else:
self.__getattribute__(name)
# raise AttributeError("%s instance of type %s has no attribute '%s'" % (self.__class__, self._attrs["type"], name))
return self

def __getitem__(self, key):
"""Wrapper to dict
Expand Down Expand Up @@ -601,49 +603,49 @@ def get_attributes(self, caseid):
"""
attrlist = self.get_raw_attributes(caseid)
caseinfo = self.convert_attribute_list_to_case_dict(attrlist)
caesinfo = self.clean_attributes(caseinfo)
return caseinfo

def clean_attributes(self, caseinfo):
"""Format attributes received via self.get_attributes"""
cleaninfo = {}

# required
cleaninfo["id"] = int(caseinfo.pop("id"))
cleaninfo["opened"] = datetime.fromtimestamp(int(caseinfo.pop("opened")))
cleaninfo["updated"] = datetime.fromtimestamp(int(caseinfo.pop("updated")))
cleaninfo["priority"] = int(caseinfo.pop("priority"))

# optional
# serialized as ints
for attr in ("ifindex", "flaps", "remote_as", "peer_uptime", "alarm_count", "bfdix", "bfddiscr", "lasttrans", "ac_down"):
value = caseinfo.pop(attr, None)
if value is not None:
value = int(value)
cleaninfo[attr] = value

# various time fields serialized as ints
if cleaninfo["lasttrans"] is not None:
cleaninfo["lasttrans"] = datetime.fromtimestamp(cleaninfo["lasttrans"])
if cleaninfo["ac_down"] is not None:
cleaninfo["ac_down"] = timedelta(seconds=cleaninfo["ac_down"])

# ip addresses serialized as strings
for attr in ("polladdr", "remote_addr", "bfdaddr"):
value = caseinfo.pop(attr, None)
if value:
cleaninfo[attr] = ipaddress.ip_address(value)

# enums serialized as strings
state_ = caseinfo.pop("state", None)
cleaninfo["state"] = caseState(state_) if state_ else None
type_ = caseinfo.pop("type", None)
cleaninfo["type"] = caseType(type_) if type_ else None

# unknown, treated as strings
for attr, value in caseinfo.items():
cleaninfo[attr] = str(value)

return cleaninfo
"""Format attributes received via self.get_raw_attributes
Keys not found are missing from the returned dict.
"""
caseinfo["id"] = int(caseinfo["id"])
caseinfo["opened"] = datetime.fromtimestamp(int(caseinfo["opened"]))
caseinfo["updated"] = datetime.fromtimestamp(int(caseinfo["updated"]))
caseinfo["priority"] = int(caseinfo["priority"])

if "ifindex" in caseinfo:
caseinfo["ifindex"] = int(caseinfo["ifindex"])
if "lasttrans" in caseinfo:
caseinfo["lasttrans"] = datetime.fromtimestamp(int(caseinfo["lasttrans"]))
if "flaps" in caseinfo:
caseinfo["flaps"] = int(caseinfo["flaps"])
if "ac_down" in caseinfo:
caseinfo["ac_down"] = timedelta(seconds=int(caseinfo["ac_down"]))
if "state" in caseinfo:
caseinfo["state"] = caseState(caseinfo["state"])
if "type" in caseinfo:
caseinfo["type"] = caseType(caseinfo["type"])
if "polladdr" in caseinfo:
caseinfo["polladdr"] = ipaddress.ip_address(caseinfo["polladdr"])
if "remote_addr" in caseinfo:
caseinfo["remote_addr"] = ipaddress.ip_address(caseinfo["remote_addr"])
if "remote_as" in caseinfo:
caseinfo["remote_as"] = int(caseinfo["remote_as"])
if "peer_uptime" in caseinfo:
caseinfo["peer_uptime"] = int(caseinfo["peer_uptime"])
if "alarm_count" in caseinfo:
caseinfo["alarm_count"] = int(caseinfo["alarm_count"])
if "bfdix" in caseinfo:
caseinfo["bfdix"] = int(caseinfo["bfdix"])
if "bfddiscr" in caseinfo:
caseinfo["bfddiscr"] = int(caseinfo["bfddiscr"])
if "bfdaddr" in caseinfo:
caseinfo["bfdaddr"] = ipaddress.ip_address(caseinfo["bfdaddr"])

return caseinfo

def get_raw_history(self, caseid):
# gethist Get Logs from CaseID
Expand Down
26 changes: 4 additions & 22 deletions tests/test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ def test_F_with(self):
with ritz("127.0.0.1", username="testuser", password="test") as sess:
self.assertTrue(sess)

def test_G_clean_attributes(self):

def test_G_get_attributes(self):
self.maxDiff = None
with zinoemu(executor):
with ritz("127.0.0.1", username="testuser", password="test") as sess:
Expand All @@ -153,17 +154,10 @@ def test_G_clean_attributes(self):
self.assertTrue(34978 in caseids)
self.assertFalse(999 in caseids)
expected_result = {
"ac_down": None,
"alarm_count": None,
"bfddiscr": None,
"bfdix": None,
"bgpas": "halted",
"bgpos": "down",
"flaps": None,
"id": 32802,
"ifindex": None,
"lastevent": "peer is admin turned off",
"lasttrans": None,
"opened": datetime.datetime(2018, 4, 23, 8, 32, 22),
"peer_uptime": 0,
"polladdr": ip_address("127.0.0.1"),
Expand All @@ -175,34 +169,22 @@ def test_G_clean_attributes(self):
"type": caseType.BGP,
"updated": datetime.datetime(2018, 8, 1, 11, 45, 51),
}
raw_attrs = sess.get_attributes(32802)
cleaned_attrs = sess.clean_attributes(raw_attrs)
self.assertEqual(cleaned_attrs, expected_result)
self.assertEqual(sess.get_attributes(32802), expected_result)

expected_result = {
"ac_down": None,
"alarm_count": 1,
"bfddiscr": None,
"bfdix": None,
"flaps": None,
"alarm_type": "yellow",
"id": 34978,
"ifindex": None,
"lastevent": "alarms went from 0 to 1",
"lasttrans": None,
"peer_uptime": None,
"opened": datetime.datetime(2018, 6, 16, 15, 37, 15),
"polladdr": ip_address("127.0.0.1"),
"priority": 100,
"remote_as": None,
"router": "bergen-sw1",
"state": caseState.WORKING,
"type": caseType.ALARM,
"updated": datetime.datetime(2018, 6, 16, 15, 37, 15),
}
raw_attrs = sess.get_attributes(34978)
cleaned_attrs = sess.clean_attributes(raw_attrs)
self.assertEqual(cleaned_attrs, expected_result)
self.assertEqual(sess.get_attributes(34978), expected_result)

def test_H_get_history(self):
with zinoemu(executor):
Expand Down

0 comments on commit c6383be

Please sign in to comment.