1
0
mirror of https://github.com/bkerler/edl synced 2024-11-22 06:26:24 +00:00
edl/edlclient/Library/firehose.py
Dinolek f2a04c833c
Lazy init modules
Fixes "ERROR: Only nop and sig tag can be recevied before authentication." message before authentification

Signed-off-by: Dinolek <github@dinolek.me>
2024-07-07 23:05:48 +02:00

1733 lines
78 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import binascii
import json
import os.path
import platform
from binascii import hexlify
from queue import Queue
from threading import Thread
from edlclient.Library.Modules.nothing import nothing
from edlclient.Library.gpt import gpt, AB_FLAG_OFFSET, AB_PARTITION_ATTR_SLOT_ACTIVE
from edlclient.Library.sparse import QCSparse
from edlclient.Library.utils import *
from edlclient.Library.utils import progress
rq = Queue()
def writedata(filename, rq):
pos = 0
with open(filename, "wb") as wf:
while True:
data = rq.get()
if data is None:
break
pos += len(data)
wf.write(data)
rq.task_done()
class response:
resp = False
data = b""
error = ""
log = None
def __init__(self, resp=False, data=b"", error: str = "", log: dict = ""):
self.resp = resp
self.data = data
self.error = error
self.log = log
try:
from edlclient.Library.Modules.init import modules
except ImportError as e:
pass
class nand_partition:
partentries = {}
def __init__(self, parent, printer=None):
if printer is None:
self.printer = print
else:
self.printer = printer
self.partentries = {}
self.partitiontblsector = None
self.parent = parent
self.storage_info = {}
self.totalsectors = None
def parse(self, partdata):
self.partentries = {}
class partf:
sector = 0
sectors = 0
name = ""
attr1 = 0
attr2 = 0
attr3 = 0
which_flash = 0
magic1, magic2, version, numparts = unpack("<IIII", partdata[0:0x10])
if magic1 == 0x55EE73AA and magic2 == 0xE35EBDDB:
data = partdata[0x10:]
for i in range(numparts):
name, offset, length, attr1, attr2, attr3, which_flash = unpack("16sIIBBBB",
data[i * 0x1C:(i * 0x1C) + 0x1C])
np = partf()
if name[:2] == b"0:":
name = name[2:]
np.name = name.rstrip(b"\x00").decode('utf-8').lower()
if self.parent.cfg.block_size == 0:
np.sector = offset
np.sectors = length
else:
np.sector = offset * self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES
np.sectors = (length & 0xFFFF) * self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES
np.attr1 = attr1
np.attr2 = attr2
np.attr3 = attr3
np.which_flash = which_flash
self.partentries[np.name] = np
if self.parent.cfg.block_size != 0 and self.parent.cfg.total_blocks != 0:
self.totalsectors = (self.parent.cfg.block_size // self.parent.cfg.SECTOR_SIZE_IN_BYTES) * \
self.parent.cfg.total_blocks
else:
sectors = 0
for part in self.partentries:
if self.partentries[part].sector >= sectors:
sectors += self.partentries[part].sectors
self.totalsectors = sectors
return True
return False
def print(self):
self.printer("Name Offset\t\tLength\t\tAttr\t\t\tFlash")
self.printer("-------------------------------------------------------------")
for selpart in self.partentries:
partition = self.partentries[selpart]
name = partition.name
for i in range(0x10 - len(partition.name)):
name += " "
offset = partition.sector * self.parent.cfg.SECTOR_SIZE_IN_BYTES
length = partition.sectors * self.parent.cfg.SECTOR_SIZE_IN_BYTES
attr1 = partition.attr1
attr2 = partition.attr2
attr3 = partition.attr3
which_flash = partition.which_flash
self.printer(
f"{name}\t%08X\t%08X\t{hex(attr1)}/{hex(attr2)}/{hex(attr3)}\t{which_flash}" % (offset, length))
def writefile(wf, q, stop):
while True:
data = q.get()
if len(data) > 0:
wf.write(data)
q.task_done()
if stop() and q.empty():
break
class asyncwriter:
def __init__(self, wf):
self.writequeue = Queue()
self.worker = Thread(target=writefile, args=(wf, self.writequeue, lambda: self.stopthreads,), daemon=True)
self.stopthreads = False
self.worker.start()
def write(self, data):
self.writequeue.put_nowait(data)
def stop(self):
self.stopthreads = True
self.writequeue.join()
class firehose(metaclass=LogBase):
class cfg:
TargetName = ""
Version = ""
ZLPAwareHost = 1
SkipStorageInit = 0
SkipWrite = 0
MaxPayloadSizeToTargetInBytes = 1048576
MaxPayloadSizeFromTargetInBytes = 8192
MaxXMLSizeInBytes = 4096
bit64 = True
total_blocks = 0
num_physical = 0
block_size = 0
SECTOR_SIZE_IN_BYTES = 0
MemoryName = "eMMC"
prod_name = "Unknown"
maxlun = 99
def __init__(self, cdc, xml, cfg, loglevel, devicemodel, serial, skipresponse, luns, args):
self.cdc = cdc
self.lasterror = b""
self.loglevel = loglevel
self.args = args
self.xml = xml
self.cfg = cfg
self.prog = 0
self.progtime = 0
self.progpos = 0
self.pk = None
self.modules = None
self.serial = serial
self.devicemodel = devicemodel
self.skipresponse = skipresponse
self.luns = luns
self.supported_functions = []
self.lunsizes = {}
self.info = self.__logger.info
self.error = self.__logger.error
self.debug = self.__logger.debug
self.warning = self.__logger.warning
self.__logger.setLevel(loglevel)
if loglevel == logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename)
self.__logger.addHandler(fh)
self.nandparttbl = None
self.nandpart = nand_partition(parent=self, printer=print)
def detect_partition(self, arguments, partitionname, send_full=False):
if arguments is None:
arguments = {
"--gpt-num-part-entries": 0,
"--gpt-part-entry-size": 0,
"--gpt-part-entry-start-lba": 0
}
fpartitions = {}
for lun in self.luns:
lunname = "Lun" + str(lun)
fpartitions[lunname] = []
data, guid_gpt = self.get_gpt(lun, int(arguments["--gpt-num-part-entries"]),
int(arguments["--gpt-part-entry-size"]),
int(arguments["--gpt-part-entry-start-lba"]))
if guid_gpt is None:
break
else:
if partitionname in guid_gpt.partentries:
return [True, lun, data, guid_gpt] if send_full else [True, lun,
guid_gpt.partentries[partitionname]]
for part in guid_gpt.partentries:
fpartitions[lunname].append(part)
return [False, fpartitions]
def getstatus(self, resp):
if "value" in resp:
value = resp["value"]
if value == "ACK" or value == "true":
return True
else:
return False
return True
def decoder(self, data):
if isinstance(data, bytes) or isinstance(data, bytearray):
if data[:5] == b"<?xml":
try:
rdata = ""
for line in data.split(b"\n"):
try:
rdata += line.decode('utf-8') + "\n"
except Exception as err:
self.debug(str(err))
rdata += hexlify(line).decode('utf-8') + "\n"
return rdata
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
pass
return data
def xmlsend(self, data, skipresponse=False) -> response:
self.cdc.flush()
self.cdc.xmlread = True
if isinstance(data, bytes) or isinstance(data, bytearray):
self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes])
else:
self.cdc.write(bytes(data, 'utf-8')[:self.cfg.MaxXMLSizeInBytes])
rdata = bytearray()
counter = 0
timeout = 3
if not skipresponse:
while b"<response value" not in rdata:
try:
tmp = self.cdc.read(timeout=None)
if tmp == b"" in rdata:
counter += 1
time.sleep(0.05)
if counter > timeout:
break
rdata += tmp
except Exception as err:
self.error(err)
return response(resp=False, error=str(err))
try:
if b"raw hex token" in rdata:
rdata = rdata
try:
resp = self.xml.getresponse(rdata)
status = self.getstatus(resp)
if "rawmode" in resp:
if resp["rawmode"] == "false":
if status:
log = self.xml.getlog(rdata)
return response(resp=status, data=rdata, log=log)
else:
error = self.xml.getlog(rdata)
return response(resp=status, error=error, data=resp, log=error)
else:
if status:
if b"log value=" in rdata:
log = self.xml.getlog(rdata)
return response(resp=resp, data=rdata, log=log)
return response(resp=status, data=rdata)
except Exception as e: # pylint: disable=broad-except
rdata = bytes(self.decoder(rdata), 'utf-8')
resp = self.xml.getresponse(rdata)
status = self.getstatus(resp)
if status:
return response(resp=True, data=resp)
else:
error = ""
if b"<log value" in rdata:
error = self.xml.getlog(rdata)
return response(resp=False, error=error, data=resp)
except Exception as err:
self.debug(str(err))
if isinstance(rdata, bytes) or isinstance(rdata, bytearray):
try:
self.debug("Error on getting xml response:" + rdata.decode('utf-8'))
except Exception as err:
self.debug("Error on getting xml response:" + hexlify(rdata).decode('utf-8') +
", Error: " + str(err))
elif isinstance(rdata, str):
self.debug("Error on getting xml response:" + rdata)
return response(resp=False, error=rdata)
else:
return response(resp=True, data=rdata)
def cmd_reset(self, mode="reset"):
if mode is None:
mode = "reset"
data = f'<?xml version="1.0" ?><data><power value="{mode}"/></data>'
val = self.xmlsend(data)
try:
v = None
while v != b'':
v = self.cdc.read(timeout=None)
if v != b'':
resp = self.xml.getlog(v)[0]
else:
break
print(resp)
except Exception as err:
self.error(str(err))
pass
if val.resp:
self.info("Reset succeeded.")
return True
else:
self.error("Reset failed: " + val.error)
return False
def cmd_xml(self, filename):
with open(filename, 'rb') as rf:
data = rf.read()
val = self.xmlsend(data)
if val.resp:
self.info("Command succeeded." + str(val.data))
return val.data
else:
self.error("Command failed:" + str(val.error))
return val.error
def cmd_nop(self):
data = '<?xml version="1.0" ?><data><nop /></data>'
resp = self.xmlsend(data, True)
self.debug(resp.data.hex())
info = b""
tmp = None
while tmp != b"":
tmp = self.cdc.read(timeout=None)
if tmp == b"":
break
info += tmp
if info != b"":
self.info("Nop succeeded.")
return self.xml.getlog(info)
else:
self.error("Nop failed.")
return False
def cmd_getsha256digest(self, physical_partition_number, start_sector, num_partition_sectors):
data = f"<?xml version=\"1.0\" ?><data><getsha256digest" + \
f" SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\"/>\n</data>"
val = self.xmlsend(data)
if val.resp:
res = self.xml.getlog(val.data)
for line in res:
self.info(line)
if "Digest " in res:
return res.split("Digest ")[1]
else:
return res
else:
self.error("GetSha256Digest failed: " + val.error)
return False
def cmd_setbootablestoragedrive(self, partition_number):
data = f"<?xml version=\"1.0\" ?><data>\n<setbootablestoragedrive value=\"{str(partition_number)}\" /></data>"
val = self.xmlsend(data)
if val.resp:
self.info("Setbootablestoragedrive succeeded.")
return True
else:
self.error("Setbootablestoragedrive failed: " + val.error)
return False
def cmd_send(self, content, responsexml=True):
data = f"<?xml version=\"1.0\" ?><data>\n<{content} /></data>"
if responsexml:
val = self.xmlsend(data)
if val.resp:
return val.data
else:
self.error(f"{content} failed.")
self.error(f"{val.error}")
return val.error
else:
self.xmlsend(data, True)
return True
def cmd_patch(self, physical_partition_number, start_sector, byte_offset, value, size_in_bytes, display=True):
"""
<patch SECTOR_SIZE_IN_BYTES="512" byte_offset="16" filename="DISK" physical_partition_number="0"
size_in_bytes="4" start_sector="NUM_DISK_SECTORS-1." value="0" what="Zero Out Header CRC in Backup Header."/>
"""
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<patch SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" byte_offset=\"{byte_offset}\"" + \
f" filename=\"DISK\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" size_in_bytes=\"{size_in_bytes}\" " + \
f" start_sector=\"{start_sector}\" " + \
f" value=\"{value}\" "
if self.modules is not None:
data += self.modules.addpatch()
data += f"/>\n</data>"
rsp = self.xmlsend(data)
if rsp.resp:
if display:
self.info(f"Patch:\n--------------------\n")
self.info(rsp.data)
return True
else:
self.error(f"Error:{rsp.error}")
return False
def wait_for_data(self):
tmp = bytearray()
timeout = 0
while b'response value' not in tmp:
res = self.cdc.read(timeout=None)
if res == b'':
timeout += 1
if timeout == 4:
break
time.sleep(0.1)
tmp += res
return tmp
def cmd_program(self, physical_partition_number, start_sector, filename, display=True):
total = os.stat(filename).st_size
sparse = QCSparse(filename, self.loglevel)
sparseformat = False
if sparse.readheader():
sparseformat = True
total = sparse.getsize()
bytestowrite = total
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
with open(filename, "rb") as rf:
# Make sure we fill data up to the sector size
num_partition_sectors = total // self.cfg.SECTOR_SIZE_IN_BYTES
if (total % self.cfg.SECTOR_SIZE_IN_BYTES) != 0:
num_partition_sectors += 1
if display:
self.info(f"\nWriting to physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\" "
if self.modules is not None:
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
progbar.show_progress(prefix="Write", pos=0, total=total, display=display)
if rsp.resp:
while bytestowrite > 0:
wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes)
if sparseformat:
wdata = sparse.read(wlen)
else:
wdata = rf.read(wlen)
bytestowrite -= wlen
if wlen % self.cfg.SECTOR_SIZE_IN_BYTES != 0:
filllen = (wlen // self.cfg.SECTOR_SIZE_IN_BYTES * self.cfg.SECTOR_SIZE_IN_BYTES) + \
self.cfg.SECTOR_SIZE_IN_BYTES
wdata += b"\x00" * (filllen - wlen)
self.cdc.write(wdata)
progbar.show_progress(prefix="Write", pos=total - bytestowrite, total=total, display=display)
self.cdc.write(b'')
# time.sleep(0.2)
wd = self.wait_for_data()
log = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in log:
self.error(line)
return False
else:
self.error(f"Error:{rsp}")
return False
return True
def cmd_program_buffer(self, physical_partition_number, start_sector, wfdata, display=True):
bytestowrite = len(wfdata)
total = bytestowrite
# Make sure we fill data up to the sector size
num_partition_sectors = bytestowrite // self.cfg.SECTOR_SIZE_IN_BYTES
if (bytestowrite % self.cfg.SECTOR_SIZE_IN_BYTES) != 0:
num_partition_sectors += 1
if display:
self.info(f"\nWriting to physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\" "
if self.modules is not None:
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
progbar.show_progress(prefix="Write", pos=0, total=total, display=display)
if rsp.resp:
pos = 0
while bytestowrite > 0:
wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes)
wrdata = wfdata[pos:pos + wlen]
pos += wlen
bytestowrite -= wlen
if wlen % self.cfg.SECTOR_SIZE_IN_BYTES != 0:
filllen = (wlen // self.cfg.SECTOR_SIZE_IN_BYTES * self.cfg.SECTOR_SIZE_IN_BYTES) + \
self.cfg.SECTOR_SIZE_IN_BYTES
wrdata += b"\x00" * (filllen - wlen)
self.cdc.write(wrdata)
progbar.show_progress(prefix="Write", pos=total - bytestowrite, total=total, display=display)
self.cdc.write(b'')
# time.sleep(0.2)
wd = self.wait_for_data()
log = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in log:
self.error(line)
return False
else:
self.error(f"Error:{rsp}")
return False
else:
self.error(f"Error:{rsp.error}")
return True
def cmd_erase(self, physical_partition_number, start_sector, num_partition_sectors, display=True):
if display:
self.info(f"\nErasing from physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data>\n" + \
f"<program SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\" "
if self.modules is not None:
data += self.modules.addprogram()
data += f"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
empty = b"\x00" * self.cfg.MaxPayloadSizeToTargetInBytes
pos = 0
bytestowrite = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
total = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
progbar = progress(self.cfg.MaxPayloadSizeToTargetInBytes)
progbar.show_progress(prefix="Erase", pos=0, total=total, display=display)
if rsp.resp:
while bytestowrite > 0:
wlen = min(bytestowrite, self.cfg.MaxPayloadSizeToTargetInBytes)
self.cdc.write(empty[:wlen])
progbar.show_progress(prefix="Erase", pos=total - bytestowrite, total=total, display=display)
bytestowrite -= wlen
pos += wlen
self.cdc.write(b'')
res = self.wait_for_data()
info = self.xml.getlog(res)
rsp = self.xml.getresponse(res)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in info:
self.error(line)
return False
else:
self.error(f"Error:{rsp}")
return False
else:
self.error(f"Error:{rsp.error}")
return False
return True
def cmd_read(self, physical_partition_number, start_sector, num_partition_sectors, filename, display=True):
global rq
self.lasterror = b""
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
if display:
self.info(
f"\nReading from physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\"/>\n</data>"
rsp = self.xmlsend(data, self.skipresponse)
self.cdc.xmlread = False
time.sleep(0.01)
if not rsp.resp:
if display:
self.error(rsp.error)
return b""
else:
bytestoread = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
total = bytestoread
show_progress = progbar.show_progress
usb_read = self.cdc.read
progbar.show_progress(prefix="Read", pos=0, total=total, display=display)
worker = Thread(target=writedata, args=(filename, rq), daemon=True)
worker.start()
while bytestoread > 0:
if self.cdc.is_serial:
maxsize = self.cfg.MaxPayloadSizeFromTargetInBytes
else:
maxsize = 5 * 1024 * 1024
size = min(maxsize, bytestoread)
data = usb_read(size)
if len(data) > 0:
rq.put(data)
bytestoread -= len(data)
show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display)
rq.put(None)
worker.join(60)
self.cdc.xmlread = True
wd = self.wait_for_data()
info = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
if bytestoread != 0:
self.error(f"Error:")
for line in info:
self.error(line)
self.lasterror += bytes(line + "\n", "utf-8")
return False
else:
if display:
self.error(f"Error:{rsp[2]}")
return False
return True
def cmd_read_buffer(self, physical_partition_number, start_sector, num_partition_sectors, display=True):
self.lasterror = b""
prog = 0
if display:
self.info(
f"\nReading from physical partition {str(physical_partition_number)}, " +
f"sector {str(start_sector)}, sectors {str(num_partition_sectors)}")
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
data = f"<?xml version=\"1.0\" ?><data><read SECTOR_SIZE_IN_BYTES=\"{self.cfg.SECTOR_SIZE_IN_BYTES}\"" + \
f" num_partition_sectors=\"{num_partition_sectors}\"" + \
f" physical_partition_number=\"{physical_partition_number}\"" + \
f" start_sector=\"{start_sector}\"/>\n</data>"
progbar = progress(self.cfg.SECTOR_SIZE_IN_BYTES)
rsp = self.xmlsend(data, self.skipresponse)
self.cdc.xmlread = False
resData = bytearray()
if not rsp.resp:
if display:
self.error(rsp.error)
return rsp
else:
bytestoread = self.cfg.SECTOR_SIZE_IN_BYTES * num_partition_sectors
total = bytestoread
if display:
progbar.show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display)
while bytestoread > 0:
tmp = self.cdc.read(min(self.cdc.maxsize, bytestoread))
size = len(tmp)
bytestoread -= size
resData.extend(tmp)
progbar.show_progress(prefix="Read", pos=total - bytestoread, total=total, display=display)
self.cdc.xmlread = True
wd = self.wait_for_data()
info = self.xml.getlog(wd)
rsp = self.xml.getresponse(wd)
if "value" in rsp:
if rsp["value"] != "ACK":
self.error(f"Error:")
for line in info:
self.error(line)
return response(resp=False, data=resData, error=info)
elif "rawmode" in rsp:
if rsp["rawmode"] == "false":
return response(resp=True, data=resData)
else:
if len(rsp) > 1:
if b"Failed to open the UFS Device" in rsp[2]:
self.error(f"Error:{rsp[2]}")
self.lasterror = rsp[2]
return response(resp=False, data=resData, error=rsp[2])
if rsp["value"] != "ACK":
self.lasterror = rsp[2]
if display and prog != 100:
progbar.show_progress(prefix="Read", pos=total, total=total, display=display)
resp = rsp["value"] == "ACK"
return response(resp=resp, data=resData, error=rsp[2]) # Do not remove, needed for oneplus
def get_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba, start_sector=1):
try:
resp = self.cmd_read_buffer(lun, 0, 1, False)
except Exception as err:
self.debug(str(err))
self.skipresponse = True
resp = self.cmd_read_buffer(lun, 0, 1, False)
if not resp.resp:
for line in resp.error:
self.error(line)
return None, None
data = resp.data
magic = unpack("<I", data[0:4])[0]
data += self.cmd_read_buffer(lun, start_sector, 1, False).data
if magic == 0x844bdcd1:
self.info("Nand storage detected.")
self.info("Scanning for partition table ...")
progbar = progress(1)
if self.nandpart.partitiontblsector is None:
sector = 0x280
progbar.show_progress(prefix="Scanning", pos=sector, total=1024, display=True)
resp = self.cmd_read_buffer(0, sector, 1, False)
if resp.resp:
if resp.data[0:8] in [b"\xac\x9f\x56\xfe\x7a\x12\x7f\xcd", b"\xAA\x73\xEE\x55\xDB\xBD\x5E\xE3"]:
progbar.show_progress(prefix="Scanning", pos=1024, total=1024, display=True)
self.nandpart.partitiontblsector = sector
self.info(f"Found partition table at sector {sector} :)")
else:
self.error("Error on reading partition table data")
return None, None
if self.nandpart.partitiontblsector is not None:
resp = self.cmd_read_buffer(0, self.nandpart.partitiontblsector + 1, 2, False)
if resp.resp:
if self.nandpart.parse(resp.data):
return resp.data, self.nandpart
else:
self.error("Couldn't find partition table, but command \"rs\" might still work !")
sys.exit(0)
return None, None
else:
data = resp.data
guid_gpt = gpt(
num_part_entries=gpt_num_part_entries,
part_entry_size=gpt_part_entry_size,
part_entry_start_lba=gpt_part_entry_start_lba,
loglevel=self.__logger.level
)
try:
sectorsize = self.cfg.SECTOR_SIZE_IN_BYTES
header = guid_gpt.parseheader(data, sectorsize)
if header.signature == b"EFI PART":
part_table_size = header.num_part_entries * header.part_entry_size
sectors = part_table_size // self.cfg.SECTOR_SIZE_IN_BYTES
if part_table_size % self.cfg.SECTOR_SIZE_IN_BYTES != 0:
sectors += 1
if sectors == 0:
return None, None
if sectors > 64:
sectors = 64
data += self.cmd_read_buffer(lun, header.part_entry_start_lba, sectors, False).data
if data == b"":
return None, None
guid_gpt.parse(data, self.cfg.SECTOR_SIZE_IN_BYTES)
return data, guid_gpt
else:
return None, None
except Exception as err:
self.debug(str(err))
return None, None
def get_backup_gpt(self, lun, gpt_num_part_entries, gpt_part_entry_size, gpt_part_entry_start_lba):
resp = self.cmd_read_buffer(lun, 0, 2, False)
if not resp.resp:
self.error("Error on reading backup gpt")
return None
guid_gpt = gpt(
num_part_entries=gpt_num_part_entries,
part_entry_size=gpt_part_entry_size,
part_entry_start_lba=gpt_part_entry_start_lba,
loglevel=self.__logger.level
)
header = guid_gpt.parseheader(resp.data, self.cfg.SECTOR_SIZE_IN_BYTES)
if "backup_lba" in header:
sectors = header.first_usable_lba - 1
data = self.cmd_read_buffer(lun, header.backup_lba, sectors, False)
if data == b"":
return None
return data
else:
return None
def calc_offset(self, sector, offset):
sector = sector + (offset // self.cfg.SECTOR_SIZE_IN_BYTES)
offset = offset % self.cfg.SECTOR_SIZE_IN_BYTES
return sector, offset
def getluns(self, argument):
if argument["--lun"] is not None:
return [int(argument["--lun"])]
luns = []
if self.cfg.MemoryName.lower() == "ufs":
for i in range(0, self.cfg.maxlun):
luns.append(i)
else:
luns = [0]
return luns
def configure(self, lvl):
if self.cfg.SECTOR_SIZE_IN_BYTES == 0:
if self.cfg.MemoryName.lower() == "emmc":
self.cfg.SECTOR_SIZE_IN_BYTES = 512
else:
self.cfg.SECTOR_SIZE_IN_BYTES = 4096
connectcmd = f"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data>" + \
f"<configure MemoryName=\"{self.cfg.MemoryName}\" " + \
f"Verbose=\"0\" " + \
f"AlwaysValidate=\"0\" " + \
f"MaxDigestTableSizeInBytes=\"2048\" " + \
f"MaxPayloadSizeToTargetInBytes=\"{str(self.cfg.MaxPayloadSizeToTargetInBytes)}\" " + \
f"ZLPAwareHost=\"{str(self.cfg.ZLPAwareHost)}\" " + \
f"SkipStorageInit=\"{str(int(self.cfg.SkipStorageInit))}\" " + \
f"SkipWrite=\"{str(int(self.cfg.SkipWrite))}\"/>" + \
"</data>"
'''
"<?xml version=\"1.0\" encoding=\"UTF-8\" ?><data><response value=\"ACK\" MinVersionSupported=\"1\"" \
"MemoryName=\"eMMC\" MaxPayloadSizeFromTargetInBytes=\"4096\" MaxPayloadSizeToTargetInBytes=\"1048576\" " \
"MaxPayloadSizeToTargetInBytesSupported=\"1048576\" MaxXMLSizeInBytes=\"4096\" Version=\"1\"
TargetName=\"8953\" />" \
"</data>"
'''
rsp = self.xmlsend(connectcmd)
if not rsp.resp:
if rsp.error == "":
try:
if "MemoryName" in rsp.data:
self.cfg.MemoryName = rsp.data["MemoryName"]
except TypeError:
self.warning("!DEBUG! rsp.data: '%s'" % (rsp.data,))
return self.configure(lvl + 1)
if "MaxPayloadSizeFromTargetInBytes" in rsp.data:
self.cfg.MaxPayloadSizeFromTargetInBytes = int(rsp.data["MaxPayloadSizeFromTargetInBytes"])
if "MaxPayloadSizeToTargetInBytes" in rsp.data:
self.cfg.MaxPayloadSizeToTargetInBytes = int(rsp.data["MaxPayloadSizeToTargetInBytes"])
if "MaxPayloadSizeToTargetInBytesSupported" in rsp.data:
self.cfg.MaxPayloadSizeToTargetInBytesSupported = int(
rsp.data["MaxPayloadSizeToTargetInBytesSupported"])
if "TargetName" in rsp.data:
self.cfg.TargetName = rsp.data["TargetName"]
return self.configure(lvl + 1)
for line in rsp.error:
if "Not support configure MemoryName eMMC" in line:
self.info("eMMC is not supported by the firehose loader. Trying UFS instead.")
self.cfg.MemoryName = "UFS"
return self.configure(lvl + 1)
elif "Only nop and sig tag can be" in line:
self.info("Xiaomi EDL Auth detected.")
try:
self.modules = modules(fh=self, serial=self.serial,
supported_functions=self.supported_functions,
loglevel=self.__logger.level,
devicemodel=self.devicemodel, args=self.args)
except Exception as err: # pylint: disable=broad-except
self.modules = None
if self.modules.edlauth():
self.info("EDL Authenticated successfully.")
rsp = self.xmlsend(connectcmd)
return rsp.resp
else:
self.error("Error on EDL Authentification")
return False
elif "MaxPayloadSizeToTargetInBytes" in rsp.data:
try:
self.cfg.MemoryName = rsp.data["MemoryName"]
self.cfg.MaxPayloadSizeToTargetInBytes = int(rsp.data["MaxPayloadSizeToTargetInBytes"])
self.cfg.MaxPayloadSizeToTargetInBytesSupported = int(
rsp.data["MaxPayloadSizeToTargetInBytesSupported"])
if "MaxXMLSizeInBytes" in rsp.data:
self.cfg.MaxXMLSizeInBytes = int(rsp.data["MaxXMLSizeInBytes"])
else:
self.cfg.MaxXMLSizeInBytes = 4096
if "MaxPayloadSizeFromTargetInBytes" in rsp.data:
self.cfg.MaxPayloadSizeFromTargetInBytes = int(rsp.data["MaxPayloadSizeFromTargetInBytes"])
else:
self.cfg.MaxPayloadSizeFromTargetInBytes = 4096
if "TargetName" in rsp.data:
self.cfg.TargetName = rsp.data["TargetName"]
else:
self.cfg.TargetName = "Unknown"
if "MSM" not in self.cfg.TargetName:
self.cfg.TargetName = "MSM" + self.cfg.TargetName
if "Version" in rsp.data:
self.cfg.Version = rsp.data["Version"]
else:
self.cfg.Version = "Unknown"
if lvl == 0:
return self.configure(lvl + 1)
else:
self.error(f"Error:{rsp}")
sys.exit()
except Exception as e:
pass
elif "ERROR" in line or "WARN" in line:
if "ERROR" in line:
self.error(line)
sys.exit()
elif "WARN" in line:
self.warning(line)
else:
info = self.cdc.read(timeout=1)
if isinstance(rsp.resp, dict):
field = rsp.resp
if "MemoryName" not in field:
# print(rsp[1])
field["MemoryName"] = "eMMC"
if "MaxXMLSizeInBytes" not in field:
field["MaxXMLSizeInBytes"] = "4096"
self.warning("Couldn't detect MaxPayloadSizeFromTargetinBytes")
if "MaxPayloadSizeToTargetInBytes" not in field:
field["MaxPayloadSizeToTargetInBytes"] = "1038576"
if "MaxPayloadSizeToTargetInBytesSupported" not in field:
field["MaxPayloadSizeToTargetInBytesSupported"] = "1038576"
if field["MemoryName"].lower() != self.cfg.MemoryName.lower():
self.warning("Memory type was set as " + self.cfg.MemoryName + " but device reported it is " +
field["MemoryName"] + " instead.")
self.cfg.MemoryName = field["MemoryName"]
if "MaxPayloadSizeToTargetInBytes" in field:
self.cfg.MaxPayloadSizeToTargetInBytes = int(field["MaxPayloadSizeToTargetInBytes"])
else:
self.cfg.MaxPayloadSizeToTargetInBytes = 1048576
if "MaxPayloadSizeToTargetInBytesSupported" in field:
self.cfg.MaxPayloadSizeToTargetInBytesSupported = int(
field["MaxPayloadSizeToTargetInBytesSupported"])
else:
self.cfg.MaxPayloadSizeToTargetInBytesSupported = 1048576
if "MaxXMLSizeInBytes" in field:
self.cfg.MaxXMLSizeInBytes = int(field["MaxXMLSizeInBytes"])
else:
self.cfg.MaxXMLSizeInBytes = 4096
if "MaxPayloadSizeFromTargetInBytes" in field:
self.cfg.MaxPayloadSizeFromTargetInBytes = int(field["MaxPayloadSizeFromTargetInBytes"])
else:
self.cfg.MaxPayloadSizeFromTargetInBytes = self.cfg.MaxXMLSizeInBytes
self.warning("Couldn't detect MaxPayloadSizeFromTargetinBytes")
if "TargetName" in field:
self.cfg.TargetName = field["TargetName"]
if "MSM" not in self.cfg.TargetName:
self.cfg.TargetName = "MSM" + self.cfg.TargetName
else:
self.cfg.TargetName = "Unknown"
self.warning("Couldn't detect TargetName")
if "Version" in field:
self.cfg.Version = field["Version"]
else:
self.cfg.Version = 0
self.warning("Couldn't detect Version")
self.info(f"TargetName={self.cfg.TargetName}")
self.info(f"MemoryName={self.cfg.MemoryName}")
self.info(f"Version={self.cfg.Version}")
self.info("Trying to read first storage sector...")
rsp = self.cmd_read_buffer(0, 1, 1, False)
self.info("Running configure...")
if not rsp.resp and self.args["--memory"] is None:
for line in rsp.error:
if "Failed to set the IO options" in line:
self.warning(
"Memory type eMMC doesn't seem to match (Failed to init). Trying to use NAND instead.")
self.cfg.MemoryName = "nand"
return self.configure(0)
elif "Failed to open the SDCC Device" in line:
self.warning(
"Memory type eMMC doesn't seem to match (Failed to init). Trying to use UFS instead.")
self.cfg.MemoryName = "UFS"
return self.configure(0)
elif "Failed to initialize (open whole lun) UFS Device slot" in line:
self.warning(
"Memory type UFS doesn't seem to match (Failed to init). Trying to use eMMC instead.")
self.cfg.MemoryName = "eMMC"
return self.configure(0)
elif "Attribute \'SECTOR_SIZE_IN_BYTES\'=4096 must be equal to disk sector size 512" in line \
or "different from device sector size (512)" in line:
self.cfg.SECTOR_SIZE_IN_BYTES = 512
return self.configure(0)
elif "Attribute \'SECTOR_SIZE_IN_BYTES\'=512 must be equal to disk sector size 4096" in line \
or "different from device sector size (4096)" in line:
self.cfg.SECTOR_SIZE_IN_BYTES = 4096
return self.configure(0)
self.parse_storage()
for function in self.supported_functions:
if function == "checkntfeature":
if type(self.devicemodel) == list:
self.devicemodel = self.devicemodel[0]
self.nothing = nothing(fh=self, projid=self.devicemodel, serial=self.serial,
supported_functions=self.supported_functions,
loglevel=self.loglevel)
if self.nothing is not None:
self.nothing.ntprojectverify()
self.luns = self.getluns(self.args)
return True
def getlunsize(self, lun):
if lun not in self.lunsizes:
try:
data, guid_gpt = self.get_gpt(lun, int(self.args["--gpt-num-part-entries"]),
int(self.args["--gpt-part-entry-size"]),
int(self.args["--gpt-part-entry-start-lba"]))
self.lunsizes[lun] = guid_gpt.totalsectors
except Exception as err:
self.error(err)
return -1
else:
return self.lunsizes[lun]
return guid_gpt.totalsectors
def get_supported_functions(self):
supfunc = False
info = self.cmd_nop()
if not info:
self.info("No supported functions detected, configuring qc generic commands")
self.supported_functions = ['configure', 'program', 'firmwarewrite', 'patch', 'setbootablestoragedrive',
'ufs', 'emmc', 'power', 'benchmark', 'read', 'getstorageinfo',
'getcrc16digest', 'getsha256digest', 'erase', 'peek', 'poke', 'nop', 'xml']
else:
self.supported_functions = []
for line in info:
if "chip serial num" in line.lower():
self.info(line)
try:
serial = line.split("0x")[1][:-1]
self.serial = int(serial, 16)
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
serial = line.split(": ")[2]
self.serial = int(serial.split(" ")[0])
if supfunc and "end of supported functions" not in line.lower():
rs = line.replace("\n", "")
if rs != "":
rs = rs.replace("INFO: ", "")
self.supported_functions.append(rs)
if "supported functions" in line.lower():
supfunc = True
if len(self.supported_functions) > 1:
info = "Supported Functions: "
for line in self.supported_functions:
info += line + ","
self.info(info[:-1])
data = self.cdc.read(timeout=None)
try:
self.info(data.decode('utf-8'))
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
pass
if not self.supported_functions:
self.supported_functions = ['configure', 'program', 'firmwarewrite', 'patch', 'setbootablestoragedrive',
'ufs', 'emmc', 'power', 'benchmark', 'read', 'getstorageinfo',
'getcrc16digest', 'getsha256digest', 'erase', 'peek', 'poke', 'nop', 'xml']
def connect(self):
v = b'-1'
if platform.system() == 'Windows':
self.cdc.timeout = 50
elif platform.system() == 'Darwin':
# must ensure the timeout is enough to fill the buffer we alloc
# which is 1MB, othwise some data are dropped in the underlying usb libraries
self.cdc.timeout = 50
else:
self.cdc.timeout = 50
info = []
while v != b'':
try:
v = self.cdc.read(timeout=None)
if (b"response" in v and b"</data>" in v) or v == b'':
break
data = self.xml.getlog(v)
if len(data) > 0:
info.append(data[0])
if not info:
break
except Exception as err: # pylint: disable=broad-except
pass
if info == [] or (len(info) > 0 and 'ERROR' in info[0]):
if len(info) > 0:
self.debug(info[0])
if len(info) > 0:
supfunc = False
for line in info:
self.info(line)
if "chip serial num" in line.lower():
try:
serial = line.split("0x")[1][:-1]
if ")" in serial:
serial = serial[:serial.rfind(")")]
self.serial = int(serial, 16)
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
serial = line.split(": ")[2]
self.serial = int(serial.split(" ")[0])
if supfunc and "end of supported functions" not in line.lower():
rs = line.replace("\n", "")
if rs != "":
rs = rs.replace("INFO: ", "")
self.supported_functions.append(rs)
if "supported functions" in line.lower():
supfunc = True
if "program" in line.lower():
idx = line.find("Functions: ")
if idx != -1:
v = line[idx + 11:].split(" ")
for val in v:
if val != "":
self.supported_functions.append(val)
supfunc = False
try:
if os.path.exists(self.cfg.programmer):
data = open(self.cfg.programmer, "rb").read()
for cmd in [b"demacia", b"setprojmodel", b"setswprojmodel", b"setprocstart", b"SetNetType",
b"checkntfeature"]:
if cmd in data:
self.supported_functions.append(cmd.decode('utf-8'))
state = {
"supported_functions": self.supported_functions,
"programmer": self.cfg.programmer,
"serial": self.serial
}
if os.path.exists("edl_config.json"):
data = json.loads(open("edl_config.json", "rb").read().decode('utf-8'))
if "serial" in data and data["serial"] != state["serial"]:
open("edl_config.json", "w").write(json.dumps(state))
else:
self.supported_functions = data["supported_functions"]
self.cfg.programmer = data["programmer"]
else:
open("edl_config.json", "w").write(json.dumps(state))
if "001920e101cf0000_fa2836525c2aad8a_fhprg.bin" in self.cfg.programmer:
self.devicemodel = '20111'
elif "000b80e100020000_467f3020c4cc788d_fhprg.bin" in self.cfg.programmer:
self.devicemodel = '22111'
except:
pass
elif self.serial is None or self.supported_functions is []:
try:
if os.path.exists("edl_config.json"):
pinfo = json.loads(open("edl_config.json", "rb").read())
if not self.supported_functions:
if "supported_functions" in pinfo:
self.supported_functions = pinfo["supported_functions"]
if self.serial is None:
if "serial" in pinfo:
self.serial = pinfo["serial"]
else:
self.get_supported_functions()
except:
self.get_supported_functions()
pass
# rsp = self.xmlsend(data, self.skipresponse)
return self.supported_functions
def parse_storage(self):
storageinfo = self.cmd_getstorageinfo()
if storageinfo is None or storageinfo.resp and len(storageinfo.data) == 0:
return False
info = storageinfo.data
if "UFS Inquiry Command Output" in info:
self.cfg.prod_name = info["UFS Inquiry Command Output"]
self.info(info)
if "UFS Erase Block Size" in info:
self.cfg.block_size = int(info["UFS Erase Block Size"], 16)
self.info(info)
self.cfg.MemoryName = "UFS"
self.cfg.SECTOR_SIZE_IN_BYTES = 4096
if "UFS Boot Partition Enabled" in info:
self.info(info["UFS Boot Partition Enabled"])
if "UFS Total Active LU" in info:
self.cfg.maxlun = int(info["UFS Total Active LU"], 16)
if "SECTOR_SIZE_IN_BYTES" in info:
self.cfg.SECTOR_SIZE_IN_BYTES = int(info["SECTOR_SIZE_IN_BYTES"])
if "num_physical_partitions" in info:
self.cfg.num_physical = int(info["num_physical_partitions"])
return True
# OEM Stuff here below --------------------------------------------------
def cmd_writeimei(self, imei):
if len(imei) != 16:
self.info("IMEI must be 16 digits")
return False
data = '<?xml version="1.0" ?><data><writeIMEI len="16"/></data>'
val = self.xmlsend(data)
if val.resp:
self.info("writeIMEI succeeded.")
return True
else:
self.error("writeIMEI failed.")
return False
def cmd_getstorageinfo(self):
data = '<?xml version="1.0" ?><data><getstorageinfo physical_partition_number="0"/></data>'
val = self.xmlsend(data)
if val.data == '' and val.log == '' and val.resp:
return None
if isinstance(val.data, dict):
if "bNumberLu" in val.data:
self.cfg.maxlun = int(val.data["bNumberLu"])
if val.resp:
if val.log is not None:
res = {}
for value in val.log:
v = value.split("=")
if len(v) > 1:
res[v[0]] = v[1]
else:
if '"storage_info"' in value:
try:
info = value.replace("INFO:", "")
si = json.loads(info)["storage_info"]
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
continue
self.info("Storage report:")
for sii in si:
self.info(f"{sii}:{si[sii]}")
if "total_blocks" in si:
self.cfg.total_blocks = si["total_blocks"]
if "num_physical" in si:
self.cfg.num_physical = si["num_physical"]
self.cfg.maxlun = self.cfg.num_physical
if "block_size" in si:
self.cfg.block_size = si["block_size"]
if "page_size" in si:
self.cfg.SECTOR_SIZE_IN_BYTES = si["page_size"]
if "mem_type" in si:
self.cfg.MemoryName = si["mem_type"]
if "prod_name" in si:
self.cfg.prod_name = si["prod_name"]
else:
v = value.split(":")
if len(v) > 1:
res[v[0]] = v[1].lstrip(" ")
return response(resp=val.resp, data=res)
return response(resp=val.resp, data=val.data)
else:
if val.error:
for v in val.error:
if "Failed to open the SDCC Device" in v:
self.cfg.MemoryName = "ufs"
self.configure(0)
return self.cmd_getstorageinfo()
self.warning("GetStorageInfo command isn't supported.")
return None
def cmd_setactiveslot(self, slot: str):
# flags: 0x3a for inactive and 0x6f for active boot partition
def set_flags(flags, active, is_boot):
new_flags = flags
if active:
if is_boot:
#new_flags |= (PART_ATT_PRIORITY_VAL | PART_ATT_ACTIVE_VAL | PART_ATT_MAX_RETRY_COUNT_VAL)
#new_flags &= (~PART_ATT_SUCCESSFUL_VAL & ~PART_ATT_UNBOOTABLE_VAL)
new_flags = 0x6f << (AB_FLAG_OFFSET * 8)
else:
new_flags |= AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8)
else:
if is_boot:
#new_flags &= (~PART_ATT_PRIORITY_VAL & ~PART_ATT_ACTIVE_VAL)
#new_flags |= ((MAX_PRIORITY-1) << PART_ATT_PRIORITY_BIT)
new_flags = 0x3a << (AB_FLAG_OFFSET * 8)
else:
new_flags &= ~(AB_PARTITION_ATTR_SLOT_ACTIVE << (AB_FLAG_OFFSET * 8))
return new_flags
def patch_helper(gpt_data_a, gpt_data_b, guid_gpt_a, guid_gpt_b, partition_a, partition_b, slot_a_status,
slot_b_status, is_boot):
part_entry_size = guid_gpt_a.header.part_entry_size
rf_a = BytesIO(gpt_data_a)
rf_b = BytesIO(gpt_data_b)
entryoffset_a = partition_a.entryoffset - (
(guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
entryoffset_b = partition_b.entryoffset - (
(guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
rf_a.seek(entryoffset_a)
rf_b.seek(entryoffset_b)
sdata_a = rf_a.read(part_entry_size)
sdata_b = rf_b.read(part_entry_size)
partentry_a = gpt.gpt_partition(sdata_a)
partentry_b = gpt.gpt_partition(sdata_b)
partentry_a.flags = set_flags(partentry_a.flags, slot_a_status, is_boot)
partentry_b.flags = set_flags(partentry_b.flags, slot_b_status, is_boot)
partentry_a.type, partentry_b.type = partentry_b.type, partentry_a.type
pdata_a, pdata_b = partentry_a.create(), partentry_b.create()
return pdata_a, partition_a.entryoffset, pdata_b, partition_b.entryoffset
def cmd_patch_multiple(lun, start_sector, byte_offset, patch_data):
offset = 0
size_each_patch = 8 if len(patch_data) % 8 == 0 else 4
unpack_fmt = "<I" if size_each_patch == 4 else "<Q"
write_size = len(patch_data)
for i in range(0, write_size, size_each_patch):
pdata_subset = int(unpack(unpack_fmt, patch_data[offset:offset + size_each_patch])[0])
self.cmd_patch(lun, start_sector, byte_offset + offset, pdata_subset, size_each_patch, False)
offset += size_each_patch
return True
def update_gpt_info(guid_gpt_a, guid_gpt_b, partitionname_a, partitionname_b,
gpt_data_a, gpt_data_b, slot_a_status, slot_b_status, lun_a, lun_b
):
part_a = guid_gpt_a.partentries[partitionname_a]
part_b = guid_gpt_b.partentries[partitionname_b]
is_boot = False
if partitionname_a == "boot_a":
is_boot = True
pdata_a, poffset_a, pdata_b, poffset_b = patch_helper(
gpt_data_a, gpt_data_b,
guid_gpt_a, guid_gpt_b,
part_a, part_b,
slot_a_status, slot_b_status,
is_boot
)
if gpt_data_a and gpt_data_b:
entryoffset_a = poffset_a - ((guid_gpt_a.header.part_entry_start_lba - 2) * guid_gpt_a.sectorsize)
gpt_data_a[entryoffset_a: entryoffset_a + len(pdata_a)] = pdata_a
new_gpt_data_a = guid_gpt_a.fix_gpt_crc(gpt_data_a)
entryoffset_b = poffset_b - ((guid_gpt_b.header.part_entry_start_lba - 2) * guid_gpt_b.sectorsize)
gpt_data_b[entryoffset_b: entryoffset_b + len(pdata_b)] = pdata_b
new_gpt_data_b = guid_gpt_b.fix_gpt_crc(gpt_data_b)
start_sector_patch_a = poffset_a // self.cfg.SECTOR_SIZE_IN_BYTES
byte_offset_patch_a = poffset_a % self.cfg.SECTOR_SIZE_IN_BYTES
cmd_patch_multiple(lun_a, start_sector_patch_a, byte_offset_patch_a, pdata_a)
if lun_a != lun_b:
start_sector_hdr_a = guid_gpt_a.header.current_lba
headeroffset_a = guid_gpt_a.sectorsize # gptData: mbr + gpt header + part array
new_hdr_a = new_gpt_data_a[headeroffset_a: headeroffset_a + guid_gpt_a.header.header_size]
cmd_patch_multiple(lun_a, start_sector_hdr_a, 0, new_hdr_a)
start_sector_patch_b = poffset_b // self.cfg.SECTOR_SIZE_IN_BYTES
byte_offset_patch_b = poffset_b % self.cfg.SECTOR_SIZE_IN_BYTES
cmd_patch_multiple(lun_b, start_sector_patch_b, byte_offset_patch_b, pdata_b)
start_sector_hdr_b = guid_gpt_b.header.current_lba
headeroffset_b = guid_gpt_b.sectorsize
new_hdr_b = new_gpt_data_b[headeroffset_b: headeroffset_b + guid_gpt_b.header.header_size]
cmd_patch_multiple(lun_b, start_sector_hdr_b, 0, new_hdr_b)
return True
return False
def ensure_gpt_hdr_consistency(guid_gpt, backup_guid_gpt, gpt_data, backup_gpt_data):
headeroffset = guid_gpt.sectorsize
prim_corrupted, backup_corrupted = False, False
prim_hdr = gpt_data[headeroffset: headeroffset + guid_gpt.header.header_size]
test_hdr = guid_gpt.fix_gpt_crc(gpt_data)[headeroffset: headeroffset + guid_gpt.header.header_size]
prim_hdr_crc, test_hdr_crc = prim_hdr[0x10: 0x10 + 4], test_hdr[0x10: 0x10 + 4]
prim_part_table_crc, test_part_table_crc = prim_hdr[0x58: 0x58 + 4], test_hdr[0x58: 0x58 + 4]
prim_corrupted = prim_hdr_crc != test_hdr_crc or prim_part_table_crc != test_part_table_crc
backup_hdr = backup_gpt_data[headeroffset: headeroffset + backup_guid_gpt.header.header_size]
test_hdr = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)[
headeroffset: headeroffset + backup_guid_gpt.header.header_size]
backup_hdr_crc, test_hdr_crc = backup_hdr[0x10: 0x10 + 4], test_hdr[0x10: 0x10 + 4]
backup_part_table_crc, test_part_table_crc = backup_hdr[0x58: 0x58 + 4], test_hdr[0x58: 0x58 + 4]
backup_corrupted = backup_hdr_crc != test_hdr_crc or backup_part_table_crc != test_part_table_crc
prim_backup_consistent = prim_part_table_crc == backup_part_table_crc
if prim_corrupted or not prim_backup_consistent:
if backup_corrupted:
self.error("both are gpt headers are corrupted, cannot recover")
return False, None, None
gpt_data[2 * guid_gpt.sectorsize:] = backup_gpt_data[2 * backup_guid_gpt.sectorsize:]
gpt_data = guid_gpt.fix_gpt_crc(gpt_data)
elif backup_corrupted or not prim_backup_consistent:
backup_gpt_data[2 * backup_guid_gpt.sectorsize:] = gpt_data[2 * guid_gpt.sectorsize:]
backup_gpt_data = backup_guid_gpt.fix_gpt_crc(backup_gpt_data)
return True, gpt_data, backup_gpt_data
if slot.lower() not in ["a", "b"]:
self.error("Only slots a or b are accepted. Aborting.")
return False
slot_a_status = None
if slot == "a":
slot_a_status = True
elif slot == "b":
slot_a_status = False
slot_b_status = not slot_a_status
fpartitions = {}
try:
for lun_a in self.luns:
lunname = "Lun" + str(lun_a)
fpartitions[lunname] = []
check_gpt_hdr = False
gpt_data_a, guid_gpt_a = self.get_gpt(lun_a, int(0), int(0), int(0))
backup_gpt_data_a, backup_guid_gpt_a = self.get_gpt(lun_a, 0, 0, 0, guid_gpt_a.header.backup_lba)
if guid_gpt_a is None:
break
else:
for partitionname_a in guid_gpt_a.partentries:
slot = partitionname_a.lower()[-2:]
partition_a = backup_guid_gpt_a.partentries[partitionname_a]
if slot == "_a":
active_a = ((partition_a.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
if (active_a and slot_a_status) or (not active_a and slot_b_status):
return True
partitionname_b = partitionname_a[:-1] + "b"
if partitionname_b in guid_gpt_a.partentries:
lun_b = lun_a
gpt_data_b = gpt_data_a
guid_gpt_b = guid_gpt_a
backup_gpt_data_b = backup_gpt_data_a
backup_guid_gpt_b = backup_guid_gpt_a
else:
resp = self.detect_partition(arguments=None,
partitionname=partitionname_b,
send_full=True)
if not resp[0]:
self.error(f"Cannot find partition {partitionname_b}")
return False
_, lun_b, gpt_data_b, guid_gpt_b = resp
backup_gpt_data_b, backup_guid_gpt_b = self.get_gpt(lun_b, 0, 0, 0,
guid_gpt_b.header.backup_lba)
if not check_gpt_hdr and partitionname_a[
:3] != "xbl": # xbl partition don't need check consistency
sts, gpt_data_a, backup_gpt_data_a = ensure_gpt_hdr_consistency(guid_gpt_a,
backup_guid_gpt_a,
gpt_data_a,
backup_gpt_data_a)
if not sts:
return False
if lun_a != lun_b:
sts, gpt_data_b, backup_gpt_data_b = ensure_gpt_hdr_consistency(guid_gpt_b,
backup_guid_gpt_b,
gpt_data_b,
backup_gpt_data_b)
if not sts:
return False
check_gpt_hdr = True
update_gpt_info(guid_gpt_a, guid_gpt_b,
partitionname_a, partitionname_b,
gpt_data_a, gpt_data_b,
slot_a_status, slot_b_status,
lun_a, lun_b)
# TODO: this updates the backup gpt header, but is it needed, since it is updated when xbl loads
#update_gpt_info(backup_guid_gpt_a, backup_guid_gpt_b,
# partitionname_a, partitionname_b,
# backup_gpt_data_a, backup_gpt_data_b,
# slot_a_status, slot_b_status,
# lun_a, lun_b)
except Exception as err:
self.error(str(err))
return False
return True
def cmd_test(self, cmd):
token = "1234"
pk = "1234"
data = f'<?xml version="1.0" ?>\n<data>\n<{cmd} token="{token}" pk="{pk}" />\n</data>'
val = self.xmlsend(data)
if val.resp:
if b"raw hex token" in val[2]:
return True
if b"opcmd is not enabled" in val[2]:
return True
return False
def cmd_getstorageinfo_string(self):
data = '<?xml version="1.0" ?><data><getstorageinfo /></data>'
val = self.xmlsend(data)
if val.resp:
self.info(f"GetStorageInfo:\n--------------------\n")
data = self.xml.getlog(val.data)
for line in data:
self.info(line)
return True
else:
self.warning("GetStorageInfo command isn't supported.")
return False
def cmd_poke(self, address, data, filename="", info=False):
rf = None
if filename != "":
rf = open(filename, "rb")
SizeInBytes = os.stat(filename).st_size
else:
SizeInBytes = len(data)
if info:
self.info(f"Poke: Address({hex(address)}),Size({hex(SizeInBytes)})")
'''
<?xml version="1.0" ?><data><poke address64="1048576" SizeInBytes="90112" value="0x22 0x00 0x00"/></data>
'''
maxsize = 8
lengthtowrite = SizeInBytes
if lengthtowrite < maxsize:
maxsize = lengthtowrite
pos = 0
old = 0
datawritten = 0
mode = 0
if info:
print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50)
while lengthtowrite > 0:
if rf is not None:
content = hex(int(hexlify(rf.read(maxsize)).decode('utf-8'), 16))
else:
content = 0
if lengthtowrite < maxsize:
maxsize = lengthtowrite
for i in range(0, maxsize):
content = (content << 8) + int(
hexlify(data[pos + maxsize - i - 1:pos + maxsize - i]).decode('utf-8'), 16)
# content=hex(int(hexlify(data[pos:pos+maxsize]).decode('utf-8'),16))
content = hex(content)
if mode == 0:
xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \
f"size_in_bytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n"
else:
xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \
f"SizeInBytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n"
try:
self.cdc.write(xdata[:self.cfg.MaxXMLSizeInBytes])
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
pass
addrinfo = self.cdc.read(timeout=None)
if b"SizeInBytes" in addrinfo or b"Invalid parameters" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
tmp += self.cdc.read(timeout=None)
xdata = f"<?xml version=\"1.0\" ?><data><poke address64=\"{str(address + pos)}\" " + \
f"SizeInBytes=\"{str(maxsize)}\" value64=\"{content}\" /></data>\n"
self.cdc.write(xdata[:self.cfg.MaxXMLSizeInBytes])
addrinfo = self.cdc.read(timeout=None)
if (b'<response' in addrinfo and 'NAK' in addrinfo) or b"Invalid parameters" in addrinfo:
self.error(f"Error:{addrinfo}")
return False
if b"address" in addrinfo and b"can\'t" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
tmp += self.cdc.read(timeout=None)
self.error(f"Error:{addrinfo}")
return False
addrinfo = self.cdc.read(timeout=None)
if b'<response' in addrinfo and b'NAK' in addrinfo:
print(f"Error:{addrinfo}")
return False
pos += maxsize
datawritten += maxsize
lengthtowrite -= maxsize
if info:
prog = round(float(datawritten) / float(SizeInBytes) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
if info:
self.info("Done writing.")
return True
def cmd_peek(self, address, SizeInBytes, filename="", info=False):
if info:
self.info(f"Peek: Address({hex(address)}),Size({hex(SizeInBytes)})")
wf = None
if filename != "":
wf = open(filename, "wb")
'''
<?xml version="1.0" ?><data><peek address64="1048576" SizeInBytes="90112" /></data>
'''
data = f"<?xml version=\"1.0\" ?><data><peek address64=\"{address}\" " + \
f"size_in_bytes=\"{SizeInBytes}\" /></data>\n"
'''
<?xml version="1.0" encoding="UTF-8" ?><data><log value="Using address 00100000" /></data>
<?xml version="1.0" encoding="UTF-8" ?><data><log value="0x22 0x00 0x00 0xEA 0x70 0x00 0x00 0xEA 0x74 0x00
0x00 0xEA 0x78 0x00 0x00 0xEA 0x7C 0x00 0x00 0xEA 0x80 0x00 0x00 0xEA 0x84 0x00 0x00 0xEA 0x88 0x00 0x00
0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA
0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE
0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF
0xFF 0xEA 0xFE 0xFF 0xFF 0xEA 0xFE 0xFF " /></data>
'''
try:
self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes])
except Exception as err: # pylint: disable=broad-except
self.debug(str(err))
pass
addrinfo = self.cdc.read(timeout=None)
if b"SizeInBytes" in addrinfo or b"Invalid parameters" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
tmp += self.cdc.read(timeout=None)
data = f"<?xml version=\"1.0\" ?><data><peek address64=\"{hex(address)}\" " + \
f"SizeInBytes=\"{hex(SizeInBytes)}\" /></data>"
self.cdc.write(data[:self.cfg.MaxXMLSizeInBytes])
addrinfo = self.cdc.read(timeout=None)
if (b'<response' in addrinfo and 'NAK' in addrinfo) or b"Invalid parameters" in addrinfo:
self.error(f"Error:{addrinfo}")
return False
if b"address" in addrinfo and b"can\'t" in addrinfo:
tmp = b""
while b"NAK" not in tmp and b"ACK" not in tmp:
tmp += self.cdc.read(timeout=None)
self.error(f"Error:{addrinfo}")
return False
resp = b""
dataread = 0
old = 0
if info:
print_progress(0, 100, prefix='Progress:', suffix='Complete', bar_length=50)
while True:
tmp = self.cdc.read(timeout=None)
if b'<response' in tmp or b"ERROR" in tmp:
break
rdata = self.xml.getlog(tmp)[0].replace("0x", "").replace(" ", "")
tmp2 = b""
try:
tmp2 = binascii.unhexlify(rdata)
except: # pylint: disable=broad-except
print(rdata)
exit(0)
dataread += len(tmp2)
if wf is not None:
wf.write(tmp2)
else:
resp += tmp2
if info:
prog = round(float(dataread) / float(SizeInBytes) * float(100), 1)
if prog > old:
print_progress(prog, 100, prefix='Progress:', suffix='Complete', bar_length=50)
old = prog
if wf is not None:
wf.close()
if b'<response' in tmp and b'ACK' in tmp:
if info:
self.info(f"Bytes from {hex(address)}, bytes read {hex(dataread)}, written to {filename}.")
return True
else:
self.error(f"Error:{addrinfo}")
return False
else:
return resp
def cmd_memcpy(self, destaddress, sourceaddress, size):
data = self.cmd_peek(sourceaddress, size)
if data != b"" and data:
if self.cmd_poke(destaddress, data):
return True
return False
def cmd_rawxml(self, data, response=True):
if response:
val = self.xmlsend(data)
if val.resp:
self.info(f"{data} succeeded.")
return val.data
else:
self.error(f"{data} failed.")
self.error(f"{val.error}")
return False
else:
self.xmlsend(data, False)
return True