Refinements

This commit is contained in:
Ircama 2023-08-06 11:04:12 +02:00
parent 76c48f133a
commit 7993898368
2 changed files with 65 additions and 22 deletions

View file

@ -145,6 +145,7 @@ printer.write_first_ti_received_time(2000, 1, 2)
``` ```
TimeoutError TimeoutError
ValueError
``` ```
(And *pysnmp* exceptions.) (And *pysnmp* exceptions.)

View file

@ -381,7 +381,7 @@ class EpsonPrinter:
if ret: if ret:
stat_set[method[4:]] = ret stat_set[method[4:]] = ret
else: else:
logging.error(f"No value for method '{method}'.") logging.info(f"No value for method '{method}'.")
return stat_set return stat_set
def caesar(self, key): def caesar(self, key):
@ -402,8 +402,12 @@ class EpsonPrinter:
msb = oid // 256 msb = oid // 256
oid = oid % 256 oid = oid % 256
if msb > 255: if msb > 255:
logging.error("EpsonPrinter - invalid API usage")
return None return None
if not self.parm or 'read_key' not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None
if 'read_key' not in self.parm:
return None return None
return ( return (
f"{self.eeprom_link}" f"{self.eeprom_link}"
@ -430,9 +434,13 @@ class EpsonPrinter:
msb = oid // 256 msb = oid // 256
oid = oid % 256 oid = oid % 256
if msb > 255: if msb > 255:
logging.error("EpsonPrinter - invalid API usage")
return None
if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None return None
if ( if (
not self.parm or 'write_key' not in self.parm 'write_key' not in self.parm
or 'read_key' not in self.parm): or 'read_key' not in self.parm):
return None return None
write_op = ( write_op = (
@ -446,7 +454,7 @@ class EpsonPrinter:
f".{self.caesar(self.parm['write_key'])}" f".{self.caesar(self.parm['write_key'])}"
) )
if self.dry_run: if self.dry_run:
logging.debug("WRITE_DRY_RUN: %s", write_op) logging.warning("WRITE_DRY_RUN: %s", write_op)
return self.eeprom_oid_read_address(oid, label=label) return self.eeprom_oid_read_address(oid, label=label)
else: else:
return write_op return write_op
@ -471,12 +479,12 @@ class EpsonPrinter:
for response in iterator: for response in iterator:
errorIndication, errorStatus, errorIndex, varBinds = response errorIndication, errorStatus, errorIndex, varBinds = response
if errorIndication: if errorIndication:
logging.error("snmp_mib error: %s", errorIndication) logging.info("snmp_mib error: %s", errorIndication)
if " timed out" in errorIndication: if " timed out" in errorIndication:
raise TimeoutError(errorIndication) raise TimeoutError(errorIndication)
return False return False
elif errorStatus: elif errorStatus:
logging.error( logging.info(
'snmp_mib PDU error: %s at %s', 'snmp_mib PDU error: %s at %s',
errorStatus.prettyPrint(), errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?' errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
@ -488,9 +496,9 @@ class EpsonPrinter:
return varBind[1].asOctets() return varBind[1].asOctets()
else: else:
return varBind[1].prettyPrint() return varBind[1].prettyPrint()
logging.error("snmp_mib value error: invalid multiple data") logging.info("snmp_mib value error: invalid multiple data")
return False return False
logging.error("snmp_mib value error: invalid data") logging.info("snmp_mib value error: invalid data")
return False return False
def read_eeprom( def read_eeprom(
@ -512,7 +520,7 @@ class EpsonPrinter:
try: try:
response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:] response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:]
except (TypeError, IndexError): except (TypeError, IndexError):
logging.error(f"Invalid read key.") logging.info(f"Invalid read key.")
return None return None
chk_addr = response[0:4] chk_addr = response[0:4]
value = response[4:6] value = response[4:6]
@ -538,7 +546,10 @@ class EpsonPrinter:
value: int, value: int,
label: str = "unknown method") -> None: label: str = "unknown method") -> None:
"""Write a single byte 'value' to the Epson EEPROM address 'oid'.""" """Write a single byte 'value' to the Epson EEPROM address 'oid'."""
if not self.parm or "write_key" not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return False
if "write_key" not in self.parm:
logging.error( logging.error(
f"Missing 'write_key' parameter in configuration.") f"Missing 'write_key' parameter in configuration.")
return False return False
@ -555,7 +566,7 @@ class EpsonPrinter:
if response: if response:
logging.debug(f" RESPONSE: {repr(response)}") logging.debug(f" RESPONSE: {repr(response)}")
if not self.dry_run and response and not ":OK;" in repr(response): if not self.dry_run and response and not ":OK;" in repr(response):
logging.error("Write error") logging.info("Write error")
return False # ":NA;" is an error return False # ":NA;" is an error
return True return True
@ -623,25 +634,32 @@ class EpsonPrinter:
} }
if len(data) < 16: if len(data) < 16:
logging.info("status_parser: invalid packet")
return "invalid packet" return "invalid packet"
if data[:11] != b'\x00@BDC ST2\r\n': if data[:11] != b'\x00@BDC ST2\r\n':
logging.debug("Unaligned BDC ST2 header. Trying to fix...") logging.debug("Unaligned BDC ST2 header. Trying to fix...")
start = data.find(b'BDC ST2\r\n') start = data.find(b'BDC ST2\r\n')
if start < 0: if start < 0:
logging.info(
"status_parser: "
"printer status error (must start with BDC ST2...)")
return "printer status error (must start with BDC ST2...)" return "printer status error (must start with BDC ST2...)"
data = bytes(2) + data[start:] data = bytes(2) + data[start:]
len_p = int.from_bytes(data[11:13], byteorder='little') len_p = int.from_bytes(data[11:13], byteorder='little')
if len(data) - 13 != len_p: if len(data) - 13 != len_p:
logging.info("status_parser: message error (invalid length)")
return "message error (invalid length)" return "message error (invalid length)"
buf = data[13:] buf = data[13:]
data_set = {} data_set = {}
while len(buf): while len(buf):
if len(buf) < 3: if len(buf) < 3:
logging.info("status_parser: invalid element")
return "invalid element" return "invalid element"
(ftype, length) = buf[:2] (ftype, length) = buf[:2]
buf = buf[2:] buf = buf[2:]
item = buf[:length] item = buf[:length]
if len(item) != length: if len(item) != length:
logging.info("status_parser: invalid element length")
return "invalid element length" return "invalid element length"
buf = buf[length:] buf = buf[length:]
logging.debug( logging.debug(
@ -797,6 +815,8 @@ class EpsonPrinter:
elif ftype == 0x36: # Paper count information elif ftype == 0x36: # Paper count information
if length != 20: if length != 20:
data_set["paper_count"] = "error" data_set["paper_count"] = "error"
logging.info(
"status_parser: paper_count error. Length: %s", length)
continue continue
data_set["paper_count_normal"] = int.from_bytes( data_set["paper_count_normal"] = int.from_bytes(
item[0:4] , "little", signed=True) item[0:4] , "little", signed=True)
@ -881,13 +901,16 @@ class EpsonPrinter:
elif isinstance(result, str): elif isinstance(result, str):
sys_info[name] = result sys_info[name] = result
else: else:
logging.error( logging.info(
f"No value for SNMP OID '{name}'. MIB: {oid}.") f"No value for SNMP OID '{name}'. MIB: {oid}.")
return sys_info return sys_info
def get_serial_number(self) -> str: def get_serial_number(self) -> str:
"""Return serial number of printer.""" """Return serial number of printer."""
if not self.parm or "serial_number" not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None
if "serial_number" not in self.parm:
return None return None
return "".join( return "".join(
chr(int(value or "0", 16)) chr(int(value or "0", 16))
@ -897,7 +920,10 @@ class EpsonPrinter:
def get_stats(self, stat_name: str = None) -> str: def get_stats(self, stat_name: str = None) -> str:
"""Return printer statistics.""" """Return printer statistics."""
if not self.parm or "stats" not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None
if "stats" not in self.parm:
return None return None
if stat_name and stat_name in self.parm["stats"].keys(): if stat_name and stat_name in self.parm["stats"].keys():
stat_info = {stat_name: self.parm["stats"][stat_name]} stat_info = {stat_name: self.parm["stats"][stat_name]}
@ -923,7 +949,10 @@ class EpsonPrinter:
def get_printer_head_id(self) -> str: # to be revised def get_printer_head_id(self) -> str: # to be revised
"""Return printer head id.""" """Return printer head id."""
if not self.parm or "printer_head_id_h" not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None
if "printer_head_id_h" not in self.parm:
return None return None
if "printer_head_id_f" not in self.parm: if "printer_head_id_f" not in self.parm:
return None return None
@ -967,7 +996,10 @@ class EpsonPrinter:
def get_ink_replacement_counters(self) -> str: def get_ink_replacement_counters(self) -> str:
"""Return list of ink replacement counters.""" """Return list of ink replacement counters."""
if not self.parm or "ink_replacement_counters" not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None
if "ink_replacement_counters" not in self.parm:
return None return None
irc = { irc = {
( (
@ -1002,7 +1034,10 @@ class EpsonPrinter:
def get_waste_ink_levels(self): def get_waste_ink_levels(self):
"""Return waste ink levels as a percentage.""" """Return waste ink levels as a percentage."""
if not self.parm or "main_waste" not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None
if "main_waste" not in self.parm:
return None return None
results = {} results = {}
for waste_type in ["main_waste", "borderless_waste", "first_waste", for waste_type in ["main_waste", "borderless_waste", "first_waste",
@ -1020,7 +1055,10 @@ class EpsonPrinter:
def get_last_printer_fatal_errors(self) -> str: def get_last_printer_fatal_errors(self) -> str:
"""Return list of last printer fatal errors in hex format.""" """Return list of last printer fatal errors in hex format."""
if not self.parm or "last_printer_fatal_errors" not in self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None
if "last_printer_fatal_errors" not in self.parm:
return None return None
return self.read_eeprom_many( return self.read_eeprom_many(
self.parm["last_printer_fatal_errors"], self.parm["last_printer_fatal_errors"],
@ -1058,7 +1096,7 @@ class EpsonPrinter:
for j in response for j in response
] ]
except Exception as e: except Exception as e:
logging.debug(f" CARTRIDGE MAP ERROR: {e}") logging.info(" CARTRIDGE MAP ERROR: %s", e)
return None return None
try: try:
return [ return [
@ -1074,7 +1112,7 @@ class EpsonPrinter:
} for i in cartridges } for i in cartridges
] ]
except Exception as e: except Exception as e:
logging.error(f" CARTRIDGE VALUE ERROR: {e}") logging.info(" CARTRIDGE VALUE ERROR: %s", e)
return None return None
def dump_eeprom(self, start: int = 0, end: int = 0xFF): def dump_eeprom(self, start: int = 0, end: int = 0xFF):
@ -1091,6 +1129,7 @@ class EpsonPrinter:
Set waste ink levels to 0. Set waste ink levels to 0.
""" """
if not self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None return None
if "raw_waste_reset" in self.parm: if "raw_waste_reset" in self.parm:
for oid, value in self.parm["raw_waste_reset"].items(): for oid, value in self.parm["raw_waste_reset"].items():
@ -1113,11 +1152,13 @@ class EpsonPrinter:
self, year: int, month: int, day: int) -> bool: self, year: int, month: int, day: int) -> bool:
"""Update first TI received time""" """Update first TI received time"""
if not self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None return None
try: try:
msb = self.parm["stats"]["First TI received time"][0] msb = self.parm["stats"]["First TI received time"][0]
lsb = self.parm["stats"]["First TI received time"][1] lsb = self.parm["stats"]["First TI received time"][1]
except KeyError: except KeyError:
logging.info("write_first_ti_received_time: missing parameter")
return False return False
n = (year - 2000) * 16 * 32 + 32 * month + day n = (year - 2000) * 16 * 32 + 32 * month + day
logging.debug( logging.debug(
@ -1149,10 +1190,11 @@ class EpsonPrinter:
def brute_force_read_key(self, minimum: int = 0x00, maximum: int = 0xFF): def brute_force_read_key(self, minimum: int = 0x00, maximum: int = 0xFF):
"""Brute force read_key for printer.""" """Brute force read_key for printer."""
if not self.parm: if not self.parm:
logging.error("EpsonPrinter - invalid API usage")
return None return None
for x, y in itertools.permutations(range(minimum, maximum + 1), r=2): for x, y in itertools.permutations(range(minimum, maximum + 1), r=2):
self.parm['read_key'] = [x, y] self.parm['read_key'] = [x, y]
logging.error(f"Trying {self.parm['read_key']}...") logging.warning(f"Trying {self.parm['read_key']}...")
val = self.read_eeprom(0x00, label="brute_force_read_key") val = self.read_eeprom(0x00, label="brute_force_read_key")
if val is None: if val is None:
continue continue
@ -1285,7 +1327,7 @@ if __name__ == "__main__":
) )
args = parser.parse_args() args = parser.parse_args()
logging_level = logging.INFO logging_level = logging.WARNING
logging_fmt = "%(message)s" logging_fmt = "%(message)s"
env_key=os.path.basename(Path(__file__).stem).upper() + '_LOG_CFG' env_key=os.path.basename(Path(__file__).stem).upper() + '_LOG_CFG'
path = Path(__file__).stem + '-log.yaml' path = Path(__file__).stem + '-log.yaml'