From 76c48f133a911dbb9e9b9e4863a7204cfc48a93c Mon Sep 17 00:00:00 2001 From: Ircama Date: Sun, 6 Aug 2023 06:51:18 +0200 Subject: [PATCH] Refinements --- README.md | 3 +- epson_print_conf.py | 250 ++++++++++++++++++++++++-------------------- 2 files changed, 136 insertions(+), 117 deletions(-) diff --git a/README.md b/README.md index 360f4cc..dd3c606 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,8 @@ python3 epson_print_conf.py -m XP-205 -a 192.168.1.87 -R 173,172 ```python import epson_print_conf -printer = epson_print_conf.EpsonPrinter("XP-205", "192.168.1.87") +printer = epson_print_conf.EpsonPrinter( + printer_model="XP-205", hostname="192.168.1.87") if not printer.parm: print("Unknown printer") diff --git a/epson_print_conf.py b/epson_print_conf.py index 4628fab..26a996e 100644 --- a/epson_print_conf.py +++ b/epson_print_conf.py @@ -12,6 +12,10 @@ import datetime import time import textwrap import ast +import logging +import os +import yaml +from pathlib import Path from pysnmp.hlapi.v1arch import * from pyasn1.type.univ import OctetString as OctetStringType @@ -296,8 +300,8 @@ class EpsonPrinter: "Emulation 4": "1.3.6.1.2.1.43.15.1.1.5.1.4", "Emulation 5": "1.3.6.1.2.1.43.15.1.1.5.1.5", "Total printed pages": "1.3.6.1.2.1.43.10.2.1.4.1.1", - "Total copies": "1.3.6.1.2.1.43.11.1.1.9.1.1", - "Serial number": "1.3.6.1.2.1.43.5.1.1.17.1", + #"Total copies": "1.3.6.1.2.1.43.11.1.1.9.1.1", + #"Serial number": "1.3.6.1.2.1.43.5.1.1.17.1", "IP Address": "1.3.6.1.4.1.1248.1.1.3.1.4.19.1.3.1", "URL_path": "1.3.6.1.4.1.1248.1.1.3.1.4.19.1.4.1", "URL": "1.3.6.1.4.1.1248.1.1.3.1.4.46.1.2.1", @@ -328,11 +332,10 @@ class EpsonPrinter: def __init__( self, - printer_model: - str, hostname: str, + printer_model: str = None, + hostname: str = None, timeout: (None, float) = None, retries: (None, float) = None, - debug: bool = False, dry_run: bool = False) -> None: """Initialise printer model.""" for printer_name, printer_data in self.PRINTER_CONFIG.copy().items(): @@ -350,7 +353,6 @@ class EpsonPrinter: self.hostname = hostname self.timeout = timeout self.retries = retries - self.debug = debug self.dry_run = dry_run if self.printer_model in self.valid_printers: self.parm = self.PRINTER_CONFIG[self.printer_model] @@ -379,8 +381,7 @@ class EpsonPrinter: if ret: stat_set[method[4:]] = ret else: - if self.debug: - print(f"No value for method '{method}'.") + logging.error(f"No value for method '{method}'.") return stat_set def caesar(self, key): @@ -402,7 +403,7 @@ class EpsonPrinter: oid = oid % 256 if msb > 255: return None - if 'read_key' not in self.parm: + if not self.parm or 'read_key' not in self.parm: return None return ( f"{self.eeprom_link}" @@ -430,7 +431,8 @@ class EpsonPrinter: oid = oid % 256 if msb > 255: return None - if ('write_key' not in self.parm + if ( + not self.parm or 'write_key' not in self.parm or 'read_key' not in self.parm): return None write_op = ( @@ -444,13 +446,15 @@ class EpsonPrinter: f".{self.caesar(self.parm['write_key'])}" ) if self.dry_run: - print("WRITE_DRY_RUN:", write_op) + logging.debug("WRITE_DRY_RUN: %s", write_op) return self.eeprom_oid_read_address(oid, label=label) else: return write_op def snmp_mib(self, mib): """Generic SNMP query, returning value of a MIB.""" + if not self.hostname: + return False utt = UdpTransportTarget( (self.hostname, 161), ) @@ -467,18 +471,16 @@ class EpsonPrinter: for response in iterator: errorIndication, errorStatus, errorIndex, varBinds = response if errorIndication: - if self.debug: - print("snmp_mib error:", errorIndication) + logging.error("snmp_mib error: %s", errorIndication) if " timed out" in errorIndication: raise TimeoutError(errorIndication) return False elif errorStatus: - if self.debug: - print( - 'snmp_mib PDU error: %s at %s' % ( - errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex) - 1][0] or '?') - ) + logging.error( + 'snmp_mib PDU error: %s at %s', + errorStatus.prettyPrint(), + errorIndex and varBinds[int(errorIndex) - 1][0] or '?' + ) return False else: for varBind in varBinds: @@ -486,11 +488,9 @@ class EpsonPrinter: return varBind[1].asOctets() else: return varBind[1].prettyPrint() - if self.debug: - print("snmp_mib value error: invalid multiple data") + logging.error("snmp_mib value error: invalid multiple data") return False - if self.debug: - print("snmp_mib value error: invalid data") + logging.error("snmp_mib value error: invalid data") return False def read_eeprom( @@ -498,24 +498,21 @@ class EpsonPrinter: oid: int, label: str = "unknown method") -> str: """Read a single byte from the Epson EEPROM address 'oid'.""" - if self.debug: - print( - f"EEPROM_DUMP {label}:\n" - f" ADDRESS: " - f"{self.eeprom_oid_read_address(oid, label=label)}\n" - f" OID: {oid}={hex(oid)}" - ) + logging.debug( + f"EEPROM_DUMP {label}:\n" + f" ADDRESS: " + f"{self.eeprom_oid_read_address(oid, label=label)}\n" + f" OID: {oid}={hex(oid)}" + ) response = self.snmp_mib( self.eeprom_oid_read_address(oid, label=label)) if not response: return None - if self.debug: - print(f" RESPONSE: {repr(response)}") + logging.debug(f" RESPONSE: {repr(response)}") try: response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:] except (TypeError, IndexError): - if self.debug: - print(f"Invalid read key.") + logging.error(f"Invalid read key.") return None chk_addr = response[0:4] value = response[4:6] @@ -541,26 +538,24 @@ class EpsonPrinter: value: int, label: str = "unknown method") -> None: """Write a single byte 'value' to the Epson EEPROM address 'oid'.""" - if "write_key" not in self.parm: - if self.debug: - print(f"Missing 'write_key' parameter in configuration.") + if not self.parm or "write_key" not in self.parm: + logging.error( + f"Missing 'write_key' parameter in configuration.") return False - if not self.dry_run and self.debug: + if not self.dry_run: response = self.read_eeprom(oid, label=label) - print(f"Previous value for {label}: {response}") + logging.debug(f"Previous value for {label}: {response}") oid_string = self.eeprom_oid_write_address(oid, value, label=label) response = self.snmp_mib(oid_string) - if self.debug: - print( - f"EEPROM_WRITE {label}:\n" - f" ADDRESS: {oid_string}\n" - f" OID: {oid}={hex(oid)}" - ) - if self.debug and response: - print(f" RESPONSE: {repr(response)}") + logging.debug( + f"EEPROM_WRITE {label}:\n" + f" ADDRESS: {oid_string}\n" + f" OID: {oid}={hex(oid)}" + ) + if response: + logging.debug(f" RESPONSE: {repr(response)}") if not self.dry_run and response and not ":OK;" in repr(response): - if self.debug: - print("Write error") + logging.error("Write error") return False # ":NA;" is an error return True @@ -630,8 +625,7 @@ class EpsonPrinter: if len(data) < 16: return "invalid packet" if data[:11] != b'\x00@BDC ST2\r\n': - if self.debug: - print("Unaligned BDC ST2 header. Trying to fix...") + logging.debug("Unaligned BDC ST2 header. Trying to fix...") start = data.find(b'BDC ST2\r\n') if start < 0: return "printer status error (must start with BDC ST2...)" @@ -650,13 +644,10 @@ class EpsonPrinter: if len(item) != length: return "invalid element length" buf = buf[length:] - - if self.debug: - print( - "Processing status - ftype", hex(ftype), - "length:", length, "item:", item.hex(' ') - ) - + logging.debug( + "Processing status - ftype %s, length: %s, item: %s", + hex(ftype), length, item.hex(' ') + ) if ftype == 0x01: # status printer_status = item[0] status_text = "unknown" @@ -890,13 +881,13 @@ class EpsonPrinter: elif isinstance(result, str): sys_info[name] = result else: - if self.debug: - print(f"No value for SNMP OID '{name}'. MIB: {oid}.") + logging.error( + f"No value for SNMP OID '{name}'. MIB: {oid}.") return sys_info def get_serial_number(self) -> str: """Return serial number of printer.""" - if "serial_number" not in self.parm: + if not self.parm or "serial_number" not in self.parm: return None return "".join( chr(int(value or "0", 16)) @@ -906,7 +897,7 @@ class EpsonPrinter: def get_stats(self, stat_name: str = None) -> str: """Return printer statistics.""" - if "stats" not in self.parm: + if not self.parm or "stats" not in self.parm: return None if stat_name and stat_name in self.parm["stats"].keys(): stat_info = {stat_name: self.parm["stats"][stat_name]} @@ -932,7 +923,7 @@ class EpsonPrinter: def get_printer_head_id(self) -> str: # to be revised """Return printer head id.""" - if "printer_head_id_h" not in self.parm: + if not self.parm or "printer_head_id_h" not in self.parm: return None if "printer_head_id_f" not in self.parm: return None @@ -976,7 +967,7 @@ class EpsonPrinter: def get_ink_replacement_counters(self) -> str: """Return list of ink replacement counters.""" - if "ink_replacement_counters" not in self.parm: + if not self.parm or "ink_replacement_counters" not in self.parm: return None irc = { ( @@ -995,29 +986,23 @@ class EpsonPrinter: def get_printer_status(self): """Return printer status and ink levels.""" - result = self.snmp_mib(f"{self.eeprom_link}.115.116.1.0.1") + address = f"{self.eeprom_link}.115.116.1.0.1" + logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}") + result = self.snmp_mib(address) if not result: return None - if self.debug: - print( - textwrap.fill( - "PRINTER_STATUS: " + repr(result), - initial_indent="", - subsequent_indent=" ", - ) - ) - print( - textwrap.fill( - result.hex(' '), - initial_indent=" ", - subsequent_indent=" ", - ) + logging.debug(f" RESPONSE: {repr(result[:20])}...\n%s", + textwrap.fill( + result.hex(' '), + initial_indent=" ", + subsequent_indent=" ", ) + ) return self.status_parser(result) def get_waste_ink_levels(self): """Return waste ink levels as a percentage.""" - if "main_waste" not in self.parm: + if not self.parm or "main_waste" not in self.parm: return None results = {} for waste_type in ["main_waste", "borderless_waste", "first_waste", @@ -1035,7 +1020,7 @@ class EpsonPrinter: def get_last_printer_fatal_errors(self) -> str: """Return list of last printer fatal errors in hex format.""" - if "last_printer_fatal_errors" not in self.parm: + if not self.parm or "last_printer_fatal_errors" not in self.parm: return None return self.read_eeprom_many( self.parm["last_printer_fatal_errors"], @@ -1053,14 +1038,12 @@ class EpsonPrinter: response = [] for i in range(1, 9): mib = f"{self.eeprom_link}.105.105.2.0.1." + str(i) - if self.debug: - print( - f"Cartridge {i}:\n" - f" MIB: {mib}" - ) + logging.debug( + f"Cartridge {i}:\n" + f" ADDRESS: {mib}" + ) cartridge = self.snmp_mib(mib) - if self.debug: - print(f" RESPONSE: {repr(cartridge)}") + logging.debug(f" RESPONSE: {repr(cartridge)}") if not cartridge: continue if cartridge.find(b'ii:NA;') > 0 or cartridge.find( @@ -1069,22 +1052,30 @@ class EpsonPrinter: response.append(cartridge[10:-2].decode().split(';')) if not response: return None - cartridges = [ - {i[0]: i[1] for i in map(lambda x: x.split(':'), j)} - for j in response - ] - return [ - { - "ink_color": self.ink_color(int(i['IC1'], 16)), - "ink_quantity": int(i['IQT'], 16), - #"viq": int(i['VIQ'], 16), - #"uiq": int(i['UIQ'], 16), - "production_year": int(i['PDY'], 16) + ( - 1900 if int(i['PDY'], 16) > 80 else 2000), - "production_month": int(i['PDM'], 16), - "id": i['SID'] - } for i in cartridges - ] + try: + cartridges = [ + {i[0]: i[1] for i in map(lambda x: x.split(':'), j)} + for j in response + ] + except Exception as e: + logging.debug(f" CARTRIDGE MAP ERROR: {e}") + return None + try: + return [ + { + "ink_color": self.ink_color(int(i['IC1'], 16)), + "ink_quantity": int(i['IQT'], 16), + #"viq": int(i['VIQ'], 16), + #"uiq": int(i['UIQ'], 16), + "production_year": int(i['PDY'], 16) + ( + 1900 if int(i['PDY'], 16) > 80 else 2000), + "production_month": int(i['PDM'], 16), + "id": i['SID'] + } for i in cartridges + ] + except Exception as e: + logging.error(f" CARTRIDGE VALUE ERROR: {e}") + return None def dump_eeprom(self, start: int = 0, end: int = 0xFF): """ @@ -1099,6 +1090,8 @@ class EpsonPrinter: """ Set waste ink levels to 0. """ + if not self.parm: + return None if "raw_waste_reset" in self.parm: for oid, value in self.parm["raw_waste_reset"].items(): if not self.write_eeprom(oid, value, label="raw_waste_reset"): @@ -1119,14 +1112,17 @@ class EpsonPrinter: def write_first_ti_received_time( self, year: int, month: int, day: int) -> bool: """Update first TI received time""" + if not self.parm: + return None try: msb = self.parm["stats"]["First TI received time"][0] lsb = self.parm["stats"]["First TI received time"][1] except KeyError: return False n = (year - 2000) * 16 * 32 + 32 * month + day - if self.debug: - print("FTRT:", hex(n // 256), hex(n % 256), "=", n // 256, n % 256) + logging.debug( + "FTRT: %s %s = %s %s", + hex(n // 256), hex(n % 256), n // 256, n % 256) if not self.write_eeprom(msb, n // 256, label="First TI received time"): return False if not self.write_eeprom(lsb, n % 256, label="First TI received time"): @@ -1134,28 +1130,29 @@ class EpsonPrinter: return True def list_known_keys(self): + known_keys = [] for model, chars in self.PRINTER_CONFIG.items(): if 'write_key' in chars: - print( + known_keys.append( f"{repr(model).rjust(25)}: " f"{repr(chars['read_key']).rjust(10)} - " f"{repr(chars['write_key'])[1:]}" ) else: - print( + known_keys.append( f"{repr(model).rjust(25)}: " f"{repr(chars['read_key']).rjust(10)} " f"(unknown write key)" ) + return known_keys - def brute_force_read_key( - self, minimum: int = 0x00, maximum: int = 0xFF, debug=False - ): + def brute_force_read_key(self, minimum: int = 0x00, maximum: int = 0xFF): """Brute force read_key for printer.""" + if not self.parm: + return None for x, y in itertools.permutations(range(minimum, maximum + 1), r=2): self.parm['read_key'] = [x, y] - if debug: - print(f"Trying {self.parm['read_key']}...") + logging.error(f"Trying {self.parm['read_key']}...") val = self.read_eeprom(0x00, label="brute_force_read_key") if val is None: continue @@ -1288,12 +1285,33 @@ if __name__ == "__main__": ) args = parser.parse_args() + logging_level = logging.INFO + logging_fmt = "%(message)s" + env_key=os.path.basename(Path(__file__).stem).upper() + '_LOG_CFG' + path = Path(__file__).stem + '-log.yaml' + value = os.getenv(env_key, None) + #print("Configuration file:", path, "| Environment variable:", env_key) + if value: + path = value + if os.path.exists(path): + with open(path, 'rt') as f: + config = yaml.safe_load(f.read()) + try: + logging.config.dictConfig(config) + except Exception as e: + logging.basicConfig(level=logging_level, format=logging_fmt) + logging.critical("Cannot configure logs: %s. %s", e, path) + else: + logging.basicConfig(level=logging_level, format=logging_fmt) + + if args.debug: + logging.getLogger().setLevel(logging.DEBUG) + printer = EpsonPrinter( args.model, args.hostname, timeout=args.timeout, retries=args.retries, - debug=args.debug, dry_run=args.dry_run) if not printer.parm: print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join( @@ -1314,11 +1332,11 @@ if __name__ == "__main__": print("Failed to reset waste ink levels. Check configuration.") if args.detect_key: print_opt = True - read_key = printer.brute_force_read_key(debug=True) + read_key = printer.brute_force_read_key() if read_key: print(f"read_key found: {read_key}") print("List of known keys:") - printer.list_known_keys() + print("\n".join(printer.list_known_keys())) else: print(f"Cannot found read_key") if args.ftrt: