py-kms/py-kms/pykms_Client.py
2020-09-24 22:44:15 +02:00

364 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import binascii
import datetime
import random
import socket
import string
import sys
import uuid
import logging
import os
import threading
import pykms_RpcBind, pykms_RpcRequest
from pykms_Filetimes import dt_to_filetime
from pykms_Dcerpc import MSRPCHeader, MSRPCBindNak, MSRPCRequestHeader, MSRPCRespHeader
from pykms_Base import kmsBase, UUID
from pykms_RequestV4 import kmsRequestV4
from pykms_RequestV5 import kmsRequestV5
from pykms_RequestV6 import kmsRequestV6
from pykms_RpcBase import rpcBase
from pykms_DB2Dict import kmsDB2Dict
from pykms_Misc import check_setup, check_other
from pykms_Misc import KmsParser, KmsParserException, KmsParserHelp
from pykms_Misc import kms_parser_get, kms_parser_check_optionals, kms_parser_check_positionals
from pykms_Format import justify, byterize, enco, deco, pretty_printer
clt_version = "py-kms_2020-07-01"
__license__ = "The Unlicense"
__author__ = u"Matteo an <SystemRage@protonmail.com>"
__url__ = "https://github.com/SystemRage/py-kms"
clt_description = "py-kms: KMS Client Emulator written in Python"
clt_config = {}
#---------------------------------------------------------------------------------------------------------------------------------------------------------
class client_thread(threading.Thread):
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
self.with_gui = False
def run(self):
clt_main(with_gui = self.with_gui)
#---------------------------------------------------------------------------------------------------------------------------------------------------------
loggerclt = logging.getLogger('logclt')
# 'help' string - 'default' value - 'dest' string.
clt_options = {
'ip' : {'help' : 'The IP address or hostname of the KMS server.', 'def' : "0.0.0.0", 'des' : "ip"},
'port' : {'help' : 'The port the KMS service is listening on. The default is \"1688\".', 'def' : 1688, 'des' : "port"},
'mode' : {'help' : 'Use this flag to manually specify a Microsoft product for testing the server. The default is \"Windows81\"',
'def' : "Windows8.1", 'des' : "mode",
'choi' : ["WindowsVista","Windows7","Windows8","Windows8.1","Windows10","Office2010","Office2013","Office2016","Office2019"]},
'cmid' : {'help' : 'Use this flag to manually specify a CMID to use. If no CMID is specified, a random CMID will be generated.',
'def' : None, 'des' : "cmid"},
'name' : {'help' : 'Use this flag to manually specify an ASCII machine name to use. If no machine name is specified a random one \
will be generated.', 'def' : None, 'des' : "machine"},
'time0' : {'help' : 'Set the maximum time to wait for a connection attempt to KMS server to succeed. Default is no timeout.',
'def' : None, 'des' : "timeoutidle"},
'time1' : {'help' : 'Set the maximum time to wait for sending / receiving a request / response. Default is no timeout.',
'def' : None, 'des' : "timeoutsndrcv"},
'asyncmsg' : {'help' : 'Prints pretty / logging messages asynchronously. Deactivated by default.',
'def' : False, 'des' : "asyncmsg"},
'llevel' : {'help' : 'Use this option to set a log level. The default is \"ERROR\".', 'def' : "ERROR", 'des' : "loglevel",
'choi' : ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "MININFO"]},
'lfile' : {'help' : 'Use this option to set an output log file. The default is \"pykms_logclient.log\". \
Type \"STDOUT\" to view log info on stdout. Type \"FILESTDOUT\" to combine previous actions. \
Use \"STDOUTOFF\" to disable stdout messages. Use \"FILEOFF\" if you not want to create logfile.',
'def' : os.path.join('.', 'pykms_logclient.log'), 'des' : "logfile"},
'lsize' : {'help' : 'Use this flag to set a maximum size (in MB) to the output log file. Deactivated by default.', 'def' : 0, 'des': "logsize"},
}
def client_options():
client_parser = KmsParser(description = clt_description, epilog = 'version: ' + clt_version, add_help = False)
client_parser.add_argument("ip", nargs = "?", action = "store", default = clt_options['ip']['def'],
help = clt_options['ip']['help'], type = str)
client_parser.add_argument("port", nargs = "?", action = "store", default = clt_options['port']['def'],
help = clt_options['port']['help'], type = int)
client_parser.add_argument("-m", "--mode", dest = clt_options['mode']['des'], default = clt_options['mode']['def'],
choices = clt_options['mode']['choi'], help = clt_options['mode']['help'], type = str)
client_parser.add_argument("-c", "--cmid", dest = clt_options['cmid']['des'], default = clt_options['cmid']['def'],
help = clt_options['cmid']['help'], type = str)
client_parser.add_argument("-n", "--name", dest = clt_options['name']['des'] , default = clt_options['name']['def'],
help = clt_options['name']['help'], type = str)
client_parser.add_argument("-t0", "--timeout-idle", action = "store", dest = clt_options['time0']['des'], default = clt_options['time0']['def'],
help = clt_options['time0']['help'], type = str)
client_parser.add_argument("-t1", "--timeout-sndrcv", action = "store", dest = clt_options['time1']['des'], default = clt_options['time1']['def'],
help = clt_options['time1']['help'], type = str)
client_parser.add_argument("-y", "--async-msg", action = "store_true", dest = clt_options['asyncmsg']['des'],
default = clt_options['asyncmsg']['def'], help = clt_options['asyncmsg']['help'])
client_parser.add_argument("-V", "--loglevel", dest = clt_options['llevel']['des'], action = "store",
choices = clt_options['llevel']['choi'], default = clt_options['llevel']['def'],
help = clt_options['llevel']['help'], type = str)
client_parser.add_argument("-F", "--logfile", nargs = "+", action = "store", dest = clt_options['lfile']['des'],
default = clt_options['lfile']['def'], help = clt_options['lfile']['help'], type = str)
client_parser.add_argument("-S", "--logsize", dest = clt_options['lsize']['des'], action = "store",
default = clt_options['lsize']['def'], help = clt_options['lsize']['help'], type = float)
client_parser.add_argument("-h", "--help", action = "help", help = "show this help message and exit")
try:
userarg = sys.argv[1:]
# Run help.
if any(arg in ["-h", "--help"] for arg in userarg):
KmsParserHelp().printer(parsers = [client_parser])
# Get stored arguments.
pykmsclt_zeroarg, pykmsclt_onearg = kms_parser_get(client_parser)
# Update pykms options for dict client config.
kms_parser_check_optionals(userarg, pykmsclt_zeroarg, pykmsclt_onearg, msg = 'optional py-kms client',
exclude_opt_len = ['-F', '--logfile'])
kms_parser_check_positionals(clt_config, client_parser.parse_args, msg = 'positional py-kms client')
except KmsParserException as e:
pretty_printer(put_text = "{reverse}{red}{bold}%s. Exiting...{end}" %str(e), to_exit = True, where = "clt")
def client_check():
# Setup and some checks.
check_setup(clt_config, clt_options, loggerclt, where = "clt")
# Check cmid.
if clt_config['cmid'] is not None:
try:
uuid.UUID(clt_config['cmid'])
except ValueError:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}argument `-c/--cmid`: invalid with: '%s'. Exiting...{end}" %clt_config['cmid'])
# Check machine name.
if clt_config['machine'] is not None:
try:
clt_config['machine'].encode('utf-16le')
if len(clt_config['machine']) < 2:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}argument `-n/--name`: too short (required 2 - 63 chars). Exiting...{end}")
elif len(clt_config['machine']) > 63:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}argument `-n/--name`: too long (required 2 - 63 chars). Exiting...{end}")
except UnicodeEncodeError:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}argument `-n/--name`: invalid with: '%s'. Exiting...{end}" %clt_config['machine'])
clt_config['call_id'] = 1
# Check other specific client options.
opts = [('timeoutidle', '-t0/--timeout-idle'),
('timeoutsndrcv', '-t1/--timeout-sndrcv')]
check_other(clt_config, opts, loggerclt, where = 'clt')
def client_update():
kmsdb = kmsDB2Dict()
appitems = kmsdb[2]
for appitem in appitems:
kmsitems = appitem['KmsItems']
for kmsitem in kmsitems:
name = re.sub('\(.*\)', '', kmsitem['DisplayName']).replace('2015', '').replace(' ', '')
if name == clt_config['mode']:
skuitems = kmsitem['SkuItems']
# Select 'Enterprise' for Windows or 'Professional Plus' for Office.
for skuitem in skuitems:
if skuitem['DisplayName'].replace(' ','') == name + 'Enterprise' or \
skuitem['DisplayName'].replace(' ','') == name[:6] + 'ProfessionalPlus' + name[6:]:
clt_config['KMSClientSkuID'] = skuitem['Id']
clt_config['RequiredClientCount'] = int(kmsitem['NCountPolicy'])
clt_config['KMSProtocolMajorVersion'] = int(float(kmsitem['DefaultKmsProtocol']))
clt_config['KMSProtocolMinorVersion'] = 0
clt_config['KMSClientLicenseStatus'] = 2
clt_config['KMSClientAppID'] = appitem['Id']
clt_config['KMSClientKMSCountedID'] = kmsitem['Id']
break
def client_connect():
loggerclt.info("Connecting to %s on port %d" % (clt_config['ip'], clt_config['port']))
try:
clt_sock = socket.create_connection((clt_config['ip'], clt_config['port']), timeout = clt_config['timeoutidle'])
loggerclt.info("Connection successful !")
clt_sock.settimeout(clt_config['timeoutsndrcv'])
except socket.timeout:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}Client connection timed out. Exiting...{end}")
except (socket.gaierror, socket.error) as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}Connection failed '%s:%d': %s. Exiting...{end}" %(clt_config['ip'],
clt_config['port'],
str(e)))
return clt_sock
def client_create(clt_sock):
binder = pykms_RpcBind.handler(None, clt_config)
RPC_Bind = enco(str(binder.generateRequest()), 'latin-1')
try:
loggerclt.info("Sending RPC bind request...")
pretty_printer(num_text = [-1, 1], where = "clt")
clt_sock.send(RPC_Bind)
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}While sending: %s. Exiting...{end}" %str(e))
try:
bindResponse = clt_sock.recv(1024)
if bindResponse == '' or not bindResponse:
pretty_printer(log_obj = loggerclt.warning, to_exit = True, where = "clt",
put_text = "{reverse}{yellow}{bold}No data received. Exiting...{end}")
pretty_printer(num_text = [-4, 7], where = "clt")
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}While receiving: %s. Exiting...{end}" %str(e))
packetType = MSRPCHeader(bindResponse)['type']
if packetType == rpcBase.packetType['bindAck']:
loggerclt.info("RPC bind acknowledged.")
pretty_printer(num_text = 8, where = "clt")
kmsRequest = createKmsRequest()
requester = pykms_RpcRequest.handler(kmsRequest, clt_config)
try:
loggerclt.info("Sending RPC activation request...")
RPC_Actv = enco(str(requester.generateRequest()), 'latin-1')
pretty_printer(num_text = [-1, 12], where = "clt")
clt_sock.send(RPC_Actv)
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}While sending: %s. Exiting...{end}" %str(e))
try:
response = clt_sock.recv(1024)
pretty_printer(num_text = [-4, 20], where = "clt")
except socket.error as e:
pretty_printer(log_obj = loggerclt.error, to_exit = True, where = "clt",
put_text = "{reverse}{red}{bold}While receiving: %s. Exiting...{end}" %str(e))
loggerclt.debug("Response: \n%s\n" % justify(deco(binascii.b2a_hex(response), 'latin-1')))
parsed = MSRPCRespHeader(response)
kmsData = readKmsResponse(parsed['pduData'], kmsRequest, clt_config)
kmsResp = kmsData['response']
try:
hwid = kmsData['hwid']
loggerclt.info("KMS Host HWID: %s" % deco(binascii.b2a_hex(enco(hwid, 'latin-1')).upper(), 'utf-8'))
except KeyError:
pass
loggerclt.info("KMS Host ePID: %s" % kmsResp['kmsEpid'].encode('utf-8').decode('utf-16le'))
loggerclt.info("KMS Host Current Client Count: %s" % kmsResp['currentClientCount'])
loggerclt.info("KMS VL Activation Interval: %s" % kmsResp['vLActivationInterval'])
loggerclt.info("KMS VL Renewal Interval: %s" % kmsResp['vLRenewalInterval'])
if clt_config['loglevel'] == 'MININFO':
loggerclt.mininfo("", extra = {'host': str(clt_sock.getpeername()),
'status' : kmsBase.licenseStates[requester.srv_config['KMSClientLicenseStatus']],
'product' : clt_config["mode"]})
pretty_printer(num_text = 21, where = "clt")
elif packetType == rpcBase.packetType['bindNak']:
loggerclt.info(justify(MSRPCBindNak(bindResponse).dump(print_to_stdout = False)))
sys.exit(0)
else:
pretty_printer(log_obj = loggerclt.warning, to_exit = True, where = "clt",
put_text = "{reverse}{magenta}{bold}Something went wrong. Exiting...{end}")
def clt_main(with_gui = False):
try:
if not with_gui:
# Parse options.
client_options()
# Check options.
client_check()
# Update Config.
client_update()
# Create and run client.
clt_sock = client_connect()
client_create(clt_sock)
except (KeyboardInterrupt, SystemExit):
try:
clt_sock.shutdown(socket.SHUT_RDWR)
clt_sock.close()
except:
pass
def createKmsRequestBase():
requestDict = kmsBase.kmsRequestStruct()
requestDict['versionMinor'] = clt_config['KMSProtocolMinorVersion']
requestDict['versionMajor'] = clt_config['KMSProtocolMajorVersion']
requestDict['isClientVm'] = 0
requestDict['licenseStatus'] = clt_config['KMSClientLicenseStatus']
requestDict['graceTime'] = 43200
requestDict['applicationId'] = UUID(uuid.UUID(clt_config['KMSClientAppID']).bytes_le)
requestDict['skuId'] = UUID(uuid.UUID(clt_config['KMSClientSkuID']).bytes_le)
requestDict['kmsCountedId'] = UUID(uuid.UUID(clt_config['KMSClientKMSCountedID']).bytes_le)
requestDict['clientMachineId'] = UUID(uuid.UUID(clt_config['cmid']).bytes_le if (clt_config['cmid'] is not None) else uuid.uuid4().bytes_le)
requestDict['previousClientMachineId'] = '\0' * 16 # I'm pretty sure this is supposed to be a null UUID.
requestDict['requiredClientCount'] = clt_config['RequiredClientCount']
requestDict['requestTime'] = dt_to_filetime(datetime.datetime.utcnow())
requestDict['machineName'] = (clt_config['machine'] if (clt_config['machine'] is not None) else
''.join(random.choice(string.ascii_letters + string.digits) for i in range(random.randint(2,63)))).encode('utf-16le')
requestDict['mnPad'] = '\0'.encode('utf-16le') * (63 - len(requestDict['machineName'].decode('utf-16le')))
# Debug Stuff
pretty_printer(num_text = 9, where = "clt")
requestDict = byterize(requestDict)
loggerclt.debug("Request Base Dictionary: \n%s\n" % justify(requestDict.dump(print_to_stdout = False)))
return requestDict
def createKmsRequest():
# Update the call ID
clt_config['call_id'] += 1
# KMS Protocol Major Version
if clt_config['KMSProtocolMajorVersion'] == 4:
handler = kmsRequestV4(None, clt_config)
elif clt_config['KMSProtocolMajorVersion'] == 5:
handler = kmsRequestV5(None, clt_config)
elif clt_config['KMSProtocolMajorVersion'] == 6:
handler = kmsRequestV6(None, clt_config)
else:
return None
requestBase = createKmsRequestBase()
return handler.generateRequest(requestBase)
def readKmsResponse(data, request, clt_config):
if clt_config['KMSProtocolMajorVersion'] == 4:
loggerclt.info("Received V4 response")
response = readKmsResponseV4(data, request)
elif clt_config['KMSProtocolMajorVersion'] == 5:
loggerclt.info("Received V5 response")
response = readKmsResponseV5(data)
elif clt_config['KMSProtocolMajorVersion'] == 6:
loggerclt.info("Received V6 response")
response = readKmsResponseV6(data)
else:
loggerclt.info("Unhandled response version: %d.%d" % (clt_config['KMSProtocolMajorVersion'], clt_config['KMSProtocolMinorVersion']))
loggerclt.info("I'm not even sure how this happened...")
return response
def readKmsResponseV4(data, request):
response = kmsRequestV4.ResponseV4(data)
hashed = kmsRequestV4(data, clt_config).generateHash(bytearray(enco(str(response['response']) , 'latin-1')))
if deco(hashed, 'latin-1') == response['hash']:
loggerclt.info("Response Hash has expected value !")
return response
def readKmsResponseV5(data):
response = kmsRequestV5.ResponseV5(data)
decrypted = kmsRequestV5(data, clt_config).decryptResponse(response)
return decrypted
def readKmsResponseV6(data):
response = kmsRequestV6.ResponseV5(data)
decrypted = kmsRequestV6(data, clt_config).decryptResponse(response)
message = decrypted['message']
return message
if __name__ == "__main__":
clt_main(with_gui = False)