Refinements

This commit is contained in:
Ircama 2023-08-06 06:51:18 +02:00
parent debe849204
commit 76c48f133a
2 changed files with 136 additions and 117 deletions

View file

@ -105,7 +105,8 @@ python3 epson_print_conf.py -m XP-205 -a 192.168.1.87 -R 173,172
```python ```python
import epson_print_conf import epson_print_conf
printer = epson_print_conf.EpsonPrinter("XP-205", "192.168.1.87") printer = epson_print_conf.EpsonPrinter(
printer_model="XP-205", hostname="192.168.1.87")
if not printer.parm: if not printer.parm:
print("Unknown printer") print("Unknown printer")

View file

@ -12,6 +12,10 @@ import datetime
import time import time
import textwrap import textwrap
import ast import ast
import logging
import os
import yaml
from pathlib import Path
from pysnmp.hlapi.v1arch import * from pysnmp.hlapi.v1arch import *
from pyasn1.type.univ import OctetString as OctetStringType from pyasn1.type.univ import OctetString as OctetStringType
@ -296,8 +300,8 @@ class EpsonPrinter:
"Emulation 4": "1.3.6.1.2.1.43.15.1.1.5.1.4", "Emulation 4": "1.3.6.1.2.1.43.15.1.1.5.1.4",
"Emulation 5": "1.3.6.1.2.1.43.15.1.1.5.1.5", "Emulation 5": "1.3.6.1.2.1.43.15.1.1.5.1.5",
"Total printed pages": "1.3.6.1.2.1.43.10.2.1.4.1.1", "Total printed pages": "1.3.6.1.2.1.43.10.2.1.4.1.1",
"Total copies": "1.3.6.1.2.1.43.11.1.1.9.1.1", #"Total copies": "1.3.6.1.2.1.43.11.1.1.9.1.1",
"Serial number": "1.3.6.1.2.1.43.5.1.1.17.1", #"Serial number": "1.3.6.1.2.1.43.5.1.1.17.1",
"IP Address": "1.3.6.1.4.1.1248.1.1.3.1.4.19.1.3.1", "IP Address": "1.3.6.1.4.1.1248.1.1.3.1.4.19.1.3.1",
"URL_path": "1.3.6.1.4.1.1248.1.1.3.1.4.19.1.4.1", "URL_path": "1.3.6.1.4.1.1248.1.1.3.1.4.19.1.4.1",
"URL": "1.3.6.1.4.1.1248.1.1.3.1.4.46.1.2.1", "URL": "1.3.6.1.4.1.1248.1.1.3.1.4.46.1.2.1",
@ -328,11 +332,10 @@ class EpsonPrinter:
def __init__( def __init__(
self, self,
printer_model: printer_model: str = None,
str, hostname: str, hostname: str = None,
timeout: (None, float) = None, timeout: (None, float) = None,
retries: (None, float) = None, retries: (None, float) = None,
debug: bool = False,
dry_run: bool = False) -> None: dry_run: bool = False) -> None:
"""Initialise printer model.""" """Initialise printer model."""
for printer_name, printer_data in self.PRINTER_CONFIG.copy().items(): for printer_name, printer_data in self.PRINTER_CONFIG.copy().items():
@ -350,7 +353,6 @@ class EpsonPrinter:
self.hostname = hostname self.hostname = hostname
self.timeout = timeout self.timeout = timeout
self.retries = retries self.retries = retries
self.debug = debug
self.dry_run = dry_run self.dry_run = dry_run
if self.printer_model in self.valid_printers: if self.printer_model in self.valid_printers:
self.parm = self.PRINTER_CONFIG[self.printer_model] self.parm = self.PRINTER_CONFIG[self.printer_model]
@ -379,8 +381,7 @@ class EpsonPrinter:
if ret: if ret:
stat_set[method[4:]] = ret stat_set[method[4:]] = ret
else: else:
if self.debug: logging.error(f"No value for method '{method}'.")
print(f"No value for method '{method}'.")
return stat_set return stat_set
def caesar(self, key): def caesar(self, key):
@ -402,7 +403,7 @@ class EpsonPrinter:
oid = oid % 256 oid = oid % 256
if msb > 255: if msb > 255:
return None return None
if 'read_key' not in self.parm: if not self.parm or 'read_key' not in self.parm:
return None return None
return ( return (
f"{self.eeprom_link}" f"{self.eeprom_link}"
@ -430,7 +431,8 @@ class EpsonPrinter:
oid = oid % 256 oid = oid % 256
if msb > 255: if msb > 255:
return None return None
if ('write_key' not in self.parm if (
not self.parm or 'write_key' not in self.parm
or 'read_key' not in self.parm): or 'read_key' not in self.parm):
return None return None
write_op = ( write_op = (
@ -444,13 +446,15 @@ class EpsonPrinter:
f".{self.caesar(self.parm['write_key'])}" f".{self.caesar(self.parm['write_key'])}"
) )
if self.dry_run: if self.dry_run:
print("WRITE_DRY_RUN:", write_op) logging.debug("WRITE_DRY_RUN: %s", write_op)
return self.eeprom_oid_read_address(oid, label=label) return self.eeprom_oid_read_address(oid, label=label)
else: else:
return write_op return write_op
def snmp_mib(self, mib): def snmp_mib(self, mib):
"""Generic SNMP query, returning value of a MIB.""" """Generic SNMP query, returning value of a MIB."""
if not self.hostname:
return False
utt = UdpTransportTarget( utt = UdpTransportTarget(
(self.hostname, 161), (self.hostname, 161),
) )
@ -467,18 +471,16 @@ class EpsonPrinter:
for response in iterator: for response in iterator:
errorIndication, errorStatus, errorIndex, varBinds = response errorIndication, errorStatus, errorIndex, varBinds = response
if errorIndication: if errorIndication:
if self.debug: logging.error("snmp_mib error: %s", errorIndication)
print("snmp_mib error:", errorIndication)
if " timed out" in errorIndication: if " timed out" in errorIndication:
raise TimeoutError(errorIndication) raise TimeoutError(errorIndication)
return False return False
elif errorStatus: elif errorStatus:
if self.debug: logging.error(
print( 'snmp_mib PDU error: %s at %s',
'snmp_mib PDU error: %s at %s' % ( errorStatus.prettyPrint(),
errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
errorIndex and varBinds[int(errorIndex) - 1][0] or '?') )
)
return False return False
else: else:
for varBind in varBinds: for varBind in varBinds:
@ -486,11 +488,9 @@ class EpsonPrinter:
return varBind[1].asOctets() return varBind[1].asOctets()
else: else:
return varBind[1].prettyPrint() return varBind[1].prettyPrint()
if self.debug: logging.error("snmp_mib value error: invalid multiple data")
print("snmp_mib value error: invalid multiple data")
return False return False
if self.debug: logging.error("snmp_mib value error: invalid data")
print("snmp_mib value error: invalid data")
return False return False
def read_eeprom( def read_eeprom(
@ -498,24 +498,21 @@ class EpsonPrinter:
oid: int, oid: int,
label: str = "unknown method") -> str: label: str = "unknown method") -> str:
"""Read a single byte from the Epson EEPROM address 'oid'.""" """Read a single byte from the Epson EEPROM address 'oid'."""
if self.debug: logging.debug(
print( f"EEPROM_DUMP {label}:\n"
f"EEPROM_DUMP {label}:\n" f" ADDRESS: "
f" ADDRESS: " f"{self.eeprom_oid_read_address(oid, label=label)}\n"
f"{self.eeprom_oid_read_address(oid, label=label)}\n" f" OID: {oid}={hex(oid)}"
f" OID: {oid}={hex(oid)}" )
)
response = self.snmp_mib( response = self.snmp_mib(
self.eeprom_oid_read_address(oid, label=label)) self.eeprom_oid_read_address(oid, label=label))
if not response: if not response:
return None return None
if self.debug: logging.debug(f" RESPONSE: {repr(response)}")
print(f" RESPONSE: {repr(response)}")
try: try:
response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:] response = re.findall(r"EE:[0-9A-F]{6}", response.decode())[0][3:]
except (TypeError, IndexError): except (TypeError, IndexError):
if self.debug: logging.error(f"Invalid read key.")
print(f"Invalid read key.")
return None return None
chk_addr = response[0:4] chk_addr = response[0:4]
value = response[4:6] value = response[4:6]
@ -541,26 +538,24 @@ class EpsonPrinter:
value: int, value: int,
label: str = "unknown method") -> None: label: str = "unknown method") -> None:
"""Write a single byte 'value' to the Epson EEPROM address 'oid'.""" """Write a single byte 'value' to the Epson EEPROM address 'oid'."""
if "write_key" not in self.parm: if not self.parm or "write_key" not in self.parm:
if self.debug: logging.error(
print(f"Missing 'write_key' parameter in configuration.") f"Missing 'write_key' parameter in configuration.")
return False return False
if not self.dry_run and self.debug: if not self.dry_run:
response = self.read_eeprom(oid, label=label) response = self.read_eeprom(oid, label=label)
print(f"Previous value for {label}: {response}") logging.debug(f"Previous value for {label}: {response}")
oid_string = self.eeprom_oid_write_address(oid, value, label=label) oid_string = self.eeprom_oid_write_address(oid, value, label=label)
response = self.snmp_mib(oid_string) response = self.snmp_mib(oid_string)
if self.debug: logging.debug(
print( f"EEPROM_WRITE {label}:\n"
f"EEPROM_WRITE {label}:\n" f" ADDRESS: {oid_string}\n"
f" ADDRESS: {oid_string}\n" f" OID: {oid}={hex(oid)}"
f" OID: {oid}={hex(oid)}" )
) if response:
if self.debug and response: logging.debug(f" RESPONSE: {repr(response)}")
print(f" RESPONSE: {repr(response)}")
if not self.dry_run and response and not ":OK;" in repr(response): if not self.dry_run and response and not ":OK;" in repr(response):
if self.debug: logging.error("Write error")
print("Write error")
return False # ":NA;" is an error return False # ":NA;" is an error
return True return True
@ -630,8 +625,7 @@ class EpsonPrinter:
if len(data) < 16: if len(data) < 16:
return "invalid packet" return "invalid packet"
if data[:11] != b'\x00@BDC ST2\r\n': if data[:11] != b'\x00@BDC ST2\r\n':
if self.debug: logging.debug("Unaligned BDC ST2 header. Trying to fix...")
print("Unaligned BDC ST2 header. Trying to fix...")
start = data.find(b'BDC ST2\r\n') start = data.find(b'BDC ST2\r\n')
if start < 0: if start < 0:
return "printer status error (must start with BDC ST2...)" return "printer status error (must start with BDC ST2...)"
@ -650,13 +644,10 @@ class EpsonPrinter:
if len(item) != length: if len(item) != length:
return "invalid element length" return "invalid element length"
buf = buf[length:] buf = buf[length:]
logging.debug(
if self.debug: "Processing status - ftype %s, length: %s, item: %s",
print( hex(ftype), length, item.hex(' ')
"Processing status - ftype", hex(ftype), )
"length:", length, "item:", item.hex(' ')
)
if ftype == 0x01: # status if ftype == 0x01: # status
printer_status = item[0] printer_status = item[0]
status_text = "unknown" status_text = "unknown"
@ -890,13 +881,13 @@ class EpsonPrinter:
elif isinstance(result, str): elif isinstance(result, str):
sys_info[name] = result sys_info[name] = result
else: else:
if self.debug: logging.error(
print(f"No value for SNMP OID '{name}'. MIB: {oid}.") f"No value for SNMP OID '{name}'. MIB: {oid}.")
return sys_info return sys_info
def get_serial_number(self) -> str: def get_serial_number(self) -> str:
"""Return serial number of printer.""" """Return serial number of printer."""
if "serial_number" not in self.parm: if not self.parm or "serial_number" not in self.parm:
return None return None
return "".join( return "".join(
chr(int(value or "0", 16)) chr(int(value or "0", 16))
@ -906,7 +897,7 @@ class EpsonPrinter:
def get_stats(self, stat_name: str = None) -> str: def get_stats(self, stat_name: str = None) -> str:
"""Return printer statistics.""" """Return printer statistics."""
if "stats" not in self.parm: if not self.parm or "stats" not in self.parm:
return None return None
if stat_name and stat_name in self.parm["stats"].keys(): if stat_name and stat_name in self.parm["stats"].keys():
stat_info = {stat_name: self.parm["stats"][stat_name]} stat_info = {stat_name: self.parm["stats"][stat_name]}
@ -932,7 +923,7 @@ class EpsonPrinter:
def get_printer_head_id(self) -> str: # to be revised def get_printer_head_id(self) -> str: # to be revised
"""Return printer head id.""" """Return printer head id."""
if "printer_head_id_h" not in self.parm: if not self.parm or "printer_head_id_h" not in self.parm:
return None return None
if "printer_head_id_f" not in self.parm: if "printer_head_id_f" not in self.parm:
return None return None
@ -976,7 +967,7 @@ class EpsonPrinter:
def get_ink_replacement_counters(self) -> str: def get_ink_replacement_counters(self) -> str:
"""Return list of ink replacement counters.""" """Return list of ink replacement counters."""
if "ink_replacement_counters" not in self.parm: if not self.parm or "ink_replacement_counters" not in self.parm:
return None return None
irc = { irc = {
( (
@ -995,29 +986,23 @@ class EpsonPrinter:
def get_printer_status(self): def get_printer_status(self):
"""Return printer status and ink levels.""" """Return printer status and ink levels."""
result = self.snmp_mib(f"{self.eeprom_link}.115.116.1.0.1") address = f"{self.eeprom_link}.115.116.1.0.1"
logging.debug(f"PRINTER_STATUS:\n ADDRESS: {address}")
result = self.snmp_mib(address)
if not result: if not result:
return None return None
if self.debug: logging.debug(f" RESPONSE: {repr(result[:20])}...\n%s",
print( textwrap.fill(
textwrap.fill( result.hex(' '),
"PRINTER_STATUS: " + repr(result), initial_indent=" ",
initial_indent="", subsequent_indent=" ",
subsequent_indent=" ",
)
)
print(
textwrap.fill(
result.hex(' '),
initial_indent=" ",
subsequent_indent=" ",
)
) )
)
return self.status_parser(result) return self.status_parser(result)
def get_waste_ink_levels(self): def get_waste_ink_levels(self):
"""Return waste ink levels as a percentage.""" """Return waste ink levels as a percentage."""
if "main_waste" not in self.parm: if not self.parm or "main_waste" not in self.parm:
return None return None
results = {} results = {}
for waste_type in ["main_waste", "borderless_waste", "first_waste", for waste_type in ["main_waste", "borderless_waste", "first_waste",
@ -1035,7 +1020,7 @@ class EpsonPrinter:
def get_last_printer_fatal_errors(self) -> str: def get_last_printer_fatal_errors(self) -> str:
"""Return list of last printer fatal errors in hex format.""" """Return list of last printer fatal errors in hex format."""
if "last_printer_fatal_errors" not in self.parm: if not self.parm or "last_printer_fatal_errors" not in self.parm:
return None return None
return self.read_eeprom_many( return self.read_eeprom_many(
self.parm["last_printer_fatal_errors"], self.parm["last_printer_fatal_errors"],
@ -1053,14 +1038,12 @@ class EpsonPrinter:
response = [] response = []
for i in range(1, 9): for i in range(1, 9):
mib = f"{self.eeprom_link}.105.105.2.0.1." + str(i) mib = f"{self.eeprom_link}.105.105.2.0.1." + str(i)
if self.debug: logging.debug(
print( f"Cartridge {i}:\n"
f"Cartridge {i}:\n" f" ADDRESS: {mib}"
f" MIB: {mib}" )
)
cartridge = self.snmp_mib(mib) cartridge = self.snmp_mib(mib)
if self.debug: logging.debug(f" RESPONSE: {repr(cartridge)}")
print(f" RESPONSE: {repr(cartridge)}")
if not cartridge: if not cartridge:
continue continue
if cartridge.find(b'ii:NA;') > 0 or cartridge.find( if cartridge.find(b'ii:NA;') > 0 or cartridge.find(
@ -1069,22 +1052,30 @@ class EpsonPrinter:
response.append(cartridge[10:-2].decode().split(';')) response.append(cartridge[10:-2].decode().split(';'))
if not response: if not response:
return None return None
cartridges = [ try:
{i[0]: i[1] for i in map(lambda x: x.split(':'), j)} cartridges = [
for j in response {i[0]: i[1] for i in map(lambda x: x.split(':'), j)}
] for j in response
return [ ]
{ except Exception as e:
"ink_color": self.ink_color(int(i['IC1'], 16)), logging.debug(f" CARTRIDGE MAP ERROR: {e}")
"ink_quantity": int(i['IQT'], 16), return None
#"viq": int(i['VIQ'], 16), try:
#"uiq": int(i['UIQ'], 16), return [
"production_year": int(i['PDY'], 16) + ( {
1900 if int(i['PDY'], 16) > 80 else 2000), "ink_color": self.ink_color(int(i['IC1'], 16)),
"production_month": int(i['PDM'], 16), "ink_quantity": int(i['IQT'], 16),
"id": i['SID'] #"viq": int(i['VIQ'], 16),
} for i in cartridges #"uiq": int(i['UIQ'], 16),
] "production_year": int(i['PDY'], 16) + (
1900 if int(i['PDY'], 16) > 80 else 2000),
"production_month": int(i['PDM'], 16),
"id": i['SID']
} for i in cartridges
]
except Exception as e:
logging.error(f" CARTRIDGE VALUE ERROR: {e}")
return None
def dump_eeprom(self, start: int = 0, end: int = 0xFF): def dump_eeprom(self, start: int = 0, end: int = 0xFF):
""" """
@ -1099,6 +1090,8 @@ class EpsonPrinter:
""" """
Set waste ink levels to 0. Set waste ink levels to 0.
""" """
if not self.parm:
return None
if "raw_waste_reset" in self.parm: if "raw_waste_reset" in self.parm:
for oid, value in self.parm["raw_waste_reset"].items(): for oid, value in self.parm["raw_waste_reset"].items():
if not self.write_eeprom(oid, value, label="raw_waste_reset"): if not self.write_eeprom(oid, value, label="raw_waste_reset"):
@ -1119,14 +1112,17 @@ class EpsonPrinter:
def write_first_ti_received_time( def write_first_ti_received_time(
self, year: int, month: int, day: int) -> bool: self, year: int, month: int, day: int) -> bool:
"""Update first TI received time""" """Update first TI received time"""
if not self.parm:
return None
try: try:
msb = self.parm["stats"]["First TI received time"][0] msb = self.parm["stats"]["First TI received time"][0]
lsb = self.parm["stats"]["First TI received time"][1] lsb = self.parm["stats"]["First TI received time"][1]
except KeyError: except KeyError:
return False return False
n = (year - 2000) * 16 * 32 + 32 * month + day n = (year - 2000) * 16 * 32 + 32 * month + day
if self.debug: logging.debug(
print("FTRT:", hex(n // 256), hex(n % 256), "=", n // 256, n % 256) "FTRT: %s %s = %s %s",
hex(n // 256), hex(n % 256), n // 256, n % 256)
if not self.write_eeprom(msb, n // 256, label="First TI received time"): if not self.write_eeprom(msb, n // 256, label="First TI received time"):
return False return False
if not self.write_eeprom(lsb, n % 256, label="First TI received time"): if not self.write_eeprom(lsb, n % 256, label="First TI received time"):
@ -1134,28 +1130,29 @@ class EpsonPrinter:
return True return True
def list_known_keys(self): def list_known_keys(self):
known_keys = []
for model, chars in self.PRINTER_CONFIG.items(): for model, chars in self.PRINTER_CONFIG.items():
if 'write_key' in chars: if 'write_key' in chars:
print( known_keys.append(
f"{repr(model).rjust(25)}: " f"{repr(model).rjust(25)}: "
f"{repr(chars['read_key']).rjust(10)} - " f"{repr(chars['read_key']).rjust(10)} - "
f"{repr(chars['write_key'])[1:]}" f"{repr(chars['write_key'])[1:]}"
) )
else: else:
print( known_keys.append(
f"{repr(model).rjust(25)}: " f"{repr(model).rjust(25)}: "
f"{repr(chars['read_key']).rjust(10)} " f"{repr(chars['read_key']).rjust(10)} "
f"(unknown write key)" f"(unknown write key)"
) )
return known_keys
def brute_force_read_key( def brute_force_read_key(self, minimum: int = 0x00, maximum: int = 0xFF):
self, minimum: int = 0x00, maximum: int = 0xFF, debug=False
):
"""Brute force read_key for printer.""" """Brute force read_key for printer."""
if not self.parm:
return None
for x, y in itertools.permutations(range(minimum, maximum + 1), r=2): for x, y in itertools.permutations(range(minimum, maximum + 1), r=2):
self.parm['read_key'] = [x, y] self.parm['read_key'] = [x, y]
if debug: logging.error(f"Trying {self.parm['read_key']}...")
print(f"Trying {self.parm['read_key']}...")
val = self.read_eeprom(0x00, label="brute_force_read_key") val = self.read_eeprom(0x00, label="brute_force_read_key")
if val is None: if val is None:
continue continue
@ -1288,12 +1285,33 @@ if __name__ == "__main__":
) )
args = parser.parse_args() args = parser.parse_args()
logging_level = logging.INFO
logging_fmt = "%(message)s"
env_key=os.path.basename(Path(__file__).stem).upper() + '_LOG_CFG'
path = Path(__file__).stem + '-log.yaml'
value = os.getenv(env_key, None)
#print("Configuration file:", path, "| Environment variable:", env_key)
if value:
path = value
if os.path.exists(path):
with open(path, 'rt') as f:
config = yaml.safe_load(f.read())
try:
logging.config.dictConfig(config)
except Exception as e:
logging.basicConfig(level=logging_level, format=logging_fmt)
logging.critical("Cannot configure logs: %s. %s", e, path)
else:
logging.basicConfig(level=logging_level, format=logging_fmt)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
printer = EpsonPrinter( printer = EpsonPrinter(
args.model, args.model,
args.hostname, args.hostname,
timeout=args.timeout, timeout=args.timeout,
retries=args.retries, retries=args.retries,
debug=args.debug,
dry_run=args.dry_run) dry_run=args.dry_run)
if not printer.parm: if not printer.parm:
print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join( print(textwrap.fill("Unknown printer. Valid printers: " + ", ".join(
@ -1314,11 +1332,11 @@ if __name__ == "__main__":
print("Failed to reset waste ink levels. Check configuration.") print("Failed to reset waste ink levels. Check configuration.")
if args.detect_key: if args.detect_key:
print_opt = True print_opt = True
read_key = printer.brute_force_read_key(debug=True) read_key = printer.brute_force_read_key()
if read_key: if read_key:
print(f"read_key found: {read_key}") print(f"read_key found: {read_key}")
print("List of known keys:") print("List of known keys:")
printer.list_known_keys() print("\n".join(printer.list_known_keys()))
else: else:
print(f"Cannot found read_key") print(f"Cannot found read_key")
if args.ftrt: if args.ftrt: