Source code for 3_logparse

"""
Log parser
===========

Accepts a filename on the command line.
The file is a Linux-like log file from a system you are debugging.
Mixed in among the various statements are messages indicating
the state of the device. They look like this:

    Jul 11 16:11:51:490 [139681125603136] dut: Device State: ON

The device state message has many possible values, but this program cares
about only three: ``ON``, ``OFF``, and ``ERR``.

Your program will parse the given log file and print out
a report giving how long the device was ``ON``
and the timestamp of any ``ERR`` conditions.
"""

import argparse
import pathlib
import datetime
import typing


[docs]def cli_parse() -> pathlib.Path: """ CLI interface to get the logfile path with the ``--log`` argument. :return: path of the logfile. :rtype: pathlib.Path """ parser = argparse.ArgumentParser() parser.add_argument( '--log', required=True, help='Path of the logfile to parse' ) args = parser.parse_args() return pathlib.Path(args.log)
[docs]def filter_log( log: pathlib.Path ) -> typing.List[typing.Tuple[datetime.datetime, str]]: """ Read the logfile and get the timestamp and the status of log messages reporting the Device status. :param log: file path of the logfile. :type log: pathlib.Path :return: list of tuples with timestamp and status. :rtype: typing.List[typing.Tuple[datetime.datetime, str]] """ filtered_log = [] with open(log, mode='r', encoding='utf-8') as f: for line in f: if 'Device State' in line: line_elements = line.split() # get the status status = line_elements[-1] # get the time timestamp = ' '.join(line_elements[:3]) timestamp = datetime.datetime.strptime( timestamp, '%b %d %H:%M:%S:%f' ) filtered_log.append((timestamp, status)) return filtered_log
[docs]def compute_uptime( filtered_log: typing.List[typing.Tuple[datetime.datetime, str]] ) -> float: """ Compute the total uptime in seconds between each ``ON`` and ``OFF`` status. :param filtered_log: list of tuples with timestamp and status. :type filtered_log: typing.List[typing.Tuple[datetime.datetime, str]] :return: seconds of uptime. :rtype: float """ # total seconds passed between as ON status and an OFF status uptime_seconds = 0.0 # I assume the first timestamp has an ON status # as it can't be OFF or err if the Device wasn't ON first first_timestamp = filtered_log[0][0] on_timestamp = first_timestamp for timestamp, status in filtered_log: # get the timestamp when the Device is ON if status == 'ON': if timestamp != first_timestamp: on_timestamp = timestamp # when the Device is OFF, update the total uptime seconds elif status == 'OFF': off_timestamp = timestamp uptime_seconds += (off_timestamp - on_timestamp).total_seconds() return uptime_seconds
[docs]def check_errors( filtered_log: typing.List[typing.Tuple[datetime.datetime, str]] ) -> typing.List[str]: """ Get the timestamps associated with the status ``ERR``. :param filtered_log: list of tuples with timestamp and status. :type filtered_log: typing.List[typing.Tuple[datetime.datetime, str]] :return: list of timestamps formatted to nice strings. :rtype: typing.List[str] """ # list to store timestamps associated with error events error_events = [ timestamp.strftime('%b %d %H:%M:%S:%f') for timestamp, status in filtered_log if status == 'ERR' ] return error_events
[docs]def prepare_report( filtered_log: typing.List[typing.Tuple[datetime.datetime, str]] ) -> str: """ Create a string holding the report data. :param filtered_log: list of tuples with timestamp and status. :type filtered_log: typing.List[typing.Tuple[datetime.datetime, str]] :return: report data. :rtype: str """ uptime_seconds = compute_uptime(filtered_log) error_events = check_errors(filtered_log) # create the report report = f'⏱ Total uptime seconds: {uptime_seconds:.1f}s' report += '\n--------\n\n' if not error_events: report += '🎉 No error events, hurray 🎉' else: report += '\n'.join(['⚠ Error events at:', *error_events]) return report
if __name__ == '__main__': log_to_parse = cli_parse() log_timestamps = filter_log(log_to_parse) log_report = prepare_report(log_timestamps) print(log_report)