Skip to content

Commit

Permalink
Bugfix, passing on Log nextToken and cleanup (#48)
Browse files Browse the repository at this point in the history
* horrible bug, forgot to pass on nextToken.  Also cleaned up a bit

* version bump to 0.16.3
  • Loading branch information
mskytt authored Dec 20, 2022
1 parent 83e7312 commit 0bb59f9
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 55 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
name="tibber_aws",
packages=["tibber_aws"],
install_requires=requirements,
version="0.16.2",
version="0.16.3",
description="A python3 library to communicate with Aws",
python_requires=">=3.7.0",
author="Tibber",
Expand Down
95 changes: 41 additions & 54 deletions tibber_aws/aws_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async def _filter_structlog_events(
)
return log_events

async def get_structlog_events(
async def get_structlog_event_generator(
self,
event: str,
log_group: str,
Expand All @@ -269,18 +269,20 @@ async def get_structlog_events(
extra_filter: Dict[str, str] = None,
max_recursion: int = 10,
**kwargs,
) -> List[CloudWatchFilteredLogEvent]:
"""Retrieve log messages for a specific log event in a log group with potential extra filters
) -> AsyncIterator[CloudWatchFilteredLogEvent]:
"""Retrieve log messages for a specific log event in a log group with potential extra filters as
an async generator
Example:
.. code-block:: python
param_filter = {"body.price_area": "NL", "body.vehicle_type": "easee charger"}
log_events = await log_client.get_structlog_events(
event="Request",
log_group="prod-hem-api",
start_time=datetime.datetime(2022,12,7),
end_time=datetime.datetime(2022,12,15),
extra_filter=param_filter)
async for r in log_client.get_structlog_event_generator(
event="Request",
log_group="prod-hem-api",
start_time=datetime.datetime(2022,12,7),
end_time=datetime.datetime(2022,12,15),
extra_filter=param_filter):
print(r)
:param event: name of the event to search for
:type event: str
Expand All @@ -299,7 +301,10 @@ async def get_structlog_events(
:rtype: list[CloudWatchLogEvent]
"""
log_events = await self._filter_structlog_events(event, log_group, start_time, end_time, extra_filter, **kwargs)
events = [CloudWatchFilteredLogEvent(**e) for e in log_events.get("events", [])]

for e in log_events.get("events", []):
yield CloudWatchFilteredLogEvent(**e)

next_token = log_events.get("nextToken")
i = 1
while next_token is not None:
Expand All @@ -310,9 +315,12 @@ async def get_structlog_events(
start_time=start_time,
end_time=end_time,
extra_filter=extra_filter,
nextToken=next_token,
**kwargs,
)
events += [CloudWatchFilteredLogEvent(**e) for e in log_events.get("events", [])]
for e in log_events.get("events", []):
yield CloudWatchFilteredLogEvent(**e)

next_token = log_events.get("nextToken")
if i >= max_recursion and next_token is not None:
logger.warning(
Expand All @@ -322,9 +330,7 @@ async def get_structlog_events(
break
i += 1

return events

async def get_structlog_event_generator(
async def get_structlog_events(
self,
event: str,
log_group: str,
Expand All @@ -333,20 +339,18 @@ async def get_structlog_event_generator(
extra_filter: Dict[str, str] = None,
max_recursion: int = 10,
**kwargs,
) -> AsyncIterator[CloudWatchFilteredLogEvent]:
"""Retrieve log messages for a specific log event in a log group with potential extra filters as
an async generator
) -> List[CloudWatchFilteredLogEvent]:
"""Retrieve log messages for a specific log event in a log group with potential extra filters
Example:
.. code-block:: python
param_filter = {"body.price_area": "NL", "body.vehicle_type": "easee charger"}
async for r in log_client.get_structlog_event_generator(
event="Request",
log_group="prod-hem-api",
start_time=datetime.datetime(2022,12,7),
end_time=datetime.datetime(2022,12,15),
extra_filter=param_filter):
print(r)
log_events = await log_client.get_structlog_events(
event="Request",
log_group="prod-hem-api",
start_time=datetime.datetime(2022,12,7),
end_time=datetime.datetime(2022,12,15),
extra_filter=param_filter)
:param event: name of the event to search for
:type event: str
Expand All @@ -364,34 +368,15 @@ async def get_structlog_event_generator(
:return: The events from `log_group` response matching the filter.
:rtype: list[CloudWatchLogEvent]
"""
log_events = await self._filter_structlog_events(event, log_group, start_time, end_time, extra_filter, **kwargs)

for e in log_events.get("events", []):
yield CloudWatchFilteredLogEvent(**e)

next_token = log_events.get("nextToken")
i = 1
while next_token is not None:
logger.info("More logs to fetch... Using nextToken, %s/%s", i, max_recursion)
log_events = await self._filter_structlog_events(
event=event,
log_group=log_group,
start_time=start_time,
end_time=end_time,
extra_filter=extra_filter,
**kwargs,
)
for e in log_events.get("events", []):
yield CloudWatchFilteredLogEvent(**e)

next_token = log_events.get("nextToken")
if i >= max_recursion and next_token is not None:
logger.warning(
"Hit max recursion: %s, more to be fetched. Increase `max_recursion` to get everything.",
max_recursion,
)
break
i += 1
return [event
async for event in self.get_structlog_event_generator(
event=event,
log_group=log_group,
start_time=start_time,
end_time=end_time,
extra_filter=extra_filter,
max_recursion=max_recursion,
**kwargs)]


async def main():
Expand All @@ -402,8 +387,8 @@ async def main():
structlog_events = await log_client.get_structlog_events(
event="Request",
log_group="prod-hem-api",
start_time=datetime.datetime(2022, 12, 10),
end_time=datetime.datetime(2022, 12, 15),
start_time=datetime.datetime(2022, 12, 15),
end_time=datetime.datetime(2022, 12, 15, 2),
extra_filter=param_filter,
)

Expand Down Expand Up @@ -464,6 +449,8 @@ async def main():
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - [%(filename)s:%(lineno)d] %(message)s")
handler.setFormatter(formatter)
logging.basicConfig(level=logging.getLevelName("DEBUG"), handlers=[handler])
logging.getLogger("botocore").setLevel(logging.INFO)
logging.getLogger("aiobotocore").setLevel(logging.INFO)

all_struct_events = asyncio.run(main())
print(all_struct_events[0])

0 comments on commit 0bb59f9

Please sign in to comment.