1
0
mirror of https://github.com/bkerler/edl synced 2024-11-25 06:46:19 +00:00
edl/edlclient/Library/gpt.py
2024-06-10 01:44:14 +08:00

622 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import argparse
import copy
import logging
import logging.config
import os
import sys
from binascii import crc32
from binascii import hexlify
from enum import Enum
from struct import calcsize, unpack, pack
import colorama
class ColorFormatter(logging.Formatter):
LOG_COLORS = {
logging.ERROR: colorama.Fore.RED,
logging.DEBUG: colorama.Fore.LIGHTMAGENTA_EX,
logging.WARNING: colorama.Fore.YELLOW,
}
def format(self, record, *args, **kwargs):
# if the corresponding logger has children, they may receive modified
# record, so we want to keep it intact
new_record = copy.copy(record)
if new_record.levelno in self.LOG_COLORS:
pad = ""
if new_record.name != "root":
print(new_record.name)
pad = "[LIB]: "
# we want levelname to be in different color, so let"s modify it
new_record.msg = "{pad}{color_begin}{msg}{color_end}".format(
pad=pad,
msg=new_record.msg,
color_begin=self.LOG_COLORS[new_record.levelno],
color_end=colorama.Style.RESET_ALL,
)
# now we can let standart formatting take care of the rest
return super(ColorFormatter, self).format(new_record, *args, **kwargs)
class LogBase(type):
debuglevel = logging.root.level
def __init__(cls, *args):
super().__init__(*args)
logger_attribute_name = "_" + cls.__name__ + "__logger"
logger_debuglevel_name = "_" + cls.__name__ + "__debuglevel"
logger_name = ".".join([c.__name__ for c in cls.mro()[-2::-1]])
log_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"root": {
"()": ColorFormatter,
"format": "%(name)s - %(message)s",
}
},
"handlers": {
"root": {
# "level": cls.__logger.level,
"formatter": "root",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
}
},
"loggers": {
"": {
"handlers": ["root"],
# "level": cls.debuglevel,
"propagate": False
}
},
}
logging.config.dictConfig(log_config)
logger = logging.getLogger(logger_name)
setattr(cls, logger_attribute_name, logger)
setattr(cls, logger_debuglevel_name, cls.debuglevel)
cls.logsetup = logsetup
def logsetup(self, logger, loglevel):
self.info = logger.info
self.debug = logger.debug
self.error = logger.error
self.warning = logger.warning
if loglevel == logging.DEBUG:
logfilename = os.path.join("logs", "log.txt")
if os.path.exists(logfilename):
try:
os.remove(logfilename)
except:
pass
fh = logging.FileHandler(logfilename, encoding="utf-8")
logger.addHandler(fh)
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
self.loglevel = loglevel
return logger
def read_object(data: object, definition: object) -> object:
"""
Unpacks a structure using the given data and definition.
"""
obj = {}
object_size = 0
pos = 0
for (name, stype) in definition:
object_size += calcsize(stype)
obj[name] = unpack(stype, data[pos:pos + calcsize(stype)])[0]
pos += calcsize(stype)
obj["object_size"] = object_size
obj["raw_data"] = data
return obj
class structhelper:
pos = 0
def __init__(self, data, pos=0):
self.pos = 0
self.data = data
def qword(self, big=False):
e = ">" if big else "<"
dat = unpack(e + "Q", self.data[self.pos:self.pos + 8])[0]
self.pos += 8
return dat
def dword(self, big=False):
e = ">" if big else "<"
dat = unpack(e + "I", self.data[self.pos:self.pos + 4])[0]
self.pos += 4
return dat
def dwords(self, dwords=1, big=False):
e = ">" if big else "<"
dat = unpack(e + str(dwords) + "I", self.data[self.pos:self.pos + 4 * dwords])
self.pos += 4 * dwords
return dat
def qwords(self, qwords=1, big=False):
e = ">" if big else "<"
dat = unpack(e + str(qwords) + "Q", self.data[self.pos:self.pos + 8 * qwords])
self.pos += 8 * qwords
return dat
def short(self, big=False):
e = ">" if big else "<"
dat = unpack(e + "H", self.data[self.pos:self.pos + 2])[0]
self.pos += 2
return dat
def shorts(self, shorts, big=False):
e = ">" if big else "<"
dat = unpack(e + str(shorts) + "H", self.data[self.pos:self.pos + 2 * shorts])
self.pos += 2 * shorts
return dat
def bytes(self, rlen=1):
dat = self.data[self.pos:self.pos + rlen]
self.pos += rlen
if rlen == 1: return dat[0]
return dat
def string(self, rlen=1):
dat = self.data[self.pos:self.pos + rlen]
self.pos += rlen
return dat
def getpos(self):
return self.pos
def seek(self, pos):
self.pos = pos
AB_FLAG_OFFSET = 6
AB_PARTITION_ATTR_SLOT_ACTIVE = (0x1 << 2)
AB_PARTITION_ATTR_BOOT_SUCCESSFUL = (0x1 << 6)
AB_PARTITION_ATTR_UNBOOTABLE = (0x1 << 7)
AB_SLOT_ACTIVE_VAL = 0x3F
AB_SLOT_INACTIVE_VAL = 0x0
AB_SLOT_ACTIVE = 1
AB_SLOT_INACTIVE = 0
PART_ATT_PRIORITY_BIT = 48
PART_ATT_ACTIVE_BIT = 50
PART_ATT_MAX_RETRY_CNT_BIT = 51
MAX_PRIORITY = 3
PART_ATT_SUCCESS_BIT = 54
PART_ATT_UNBOOTABLE_BIT = 55
PART_ATT_PRIORITY_VAL = 0x3 << PART_ATT_PRIORITY_BIT
PART_ATT_ACTIVE_VAL = 0x1 << PART_ATT_ACTIVE_BIT
PART_ATT_MAX_RETRY_COUNT_VAL = 0x7 << PART_ATT_MAX_RETRY_CNT_BIT
PART_ATT_SUCCESSFUL_VAL = 0x1 << PART_ATT_SUCCESS_BIT
PART_ATT_UNBOOTABLE_VAL = 0x1 << PART_ATT_UNBOOTABLE_BIT
class gpt(metaclass=LogBase):
class gpt_header:
def __init__(self, data):
sh = structhelper(data)
self.signature = sh.bytes(8)
self.revision = sh.dword()
self.header_size = sh.dword()
self.crc32 = sh.dword()
self.reserved = sh.dword()
self.current_lba = sh.qword()
self.backup_lba = sh.qword()
self.first_usable_lba = sh.qword()
self.last_usable_lba = sh.qword()
self.disk_guid = sh.bytes(16)
self.part_entry_start_lba = sh.qword()
self.num_part_entries = sh.dword()
self.part_entry_size = sh.dword()
self.crc32_part_entries = sh.dword()
class gpt_partition:
def __init__(self, data):
sh = structhelper(data)
self.type = sh.bytes(16)
self.unique = sh.bytes(16)
self.first_lba = sh.qword()
self.last_lba = sh.qword()
self.flags = sh.qword()
self.name = sh.string(72)
def create(self):
val = pack("<16s16sQQQ72s", self.type, self.unique, self.first_lba, self.last_lba, self.flags, self.name)
return val
class efi_type(Enum):
EFI_UNUSED = 0x00000000
EFI_MBR = 0x024DEE41
EFI_SYSTEM = 0xC12A7328
EFI_BIOS_BOOT = 0x21686148
EFI_IFFS = 0xD3BFE2DE
EFI_SONY_BOOT = 0xF4019732
EFI_LENOVO_BOOT = 0xBFBFAFE7
EFI_MSR = 0xE3C9E316
EFI_BASIC_DATA = 0xEBD0A0A2
EFI_LDM_META = 0x5808C8AA
EFI_LDM = 0xAF9B60A0
EFI_RECOVERY = 0xDE94BBA4
EFI_GPFS = 0x37AFFC90
EFI_STORAGE_SPACES = 0xE75CAF8F
EFI_HPUX_DATA = 0x75894C1E
EFI_HPUX_SERVICE = 0xE2A1E728
EFI_LINUX_DAYA = 0x0FC63DAF
EFI_LINUX_RAID = 0xA19D880F
EFI_LINUX_ROOT32 = 0x44479540
EFI_LINUX_ROOT64 = 0x4F68BCE3
EFI_LINUX_ROOT_ARM32 = 0x69DAD710
EFI_LINUX_ROOT_ARM64 = 0xB921B045
EFI_LINUX_SWAP = 0x0657FD6D
EFI_LINUX_LVM = 0xE6D6D379
EFI_LINUX_HOME = 0x933AC7E1
EFI_LINUX_SRV = 0x3B8F8425
EFI_LINUX_DM_CRYPT = 0x7FFEC5C9
EFI_LINUX_LUKS = 0xCA7D7CCB
EFI_LINUX_RESERVED = 0x8DA63339
EFI_FREEBSD_BOOT = 0x83BD6B9D
EFI_FREEBSD_DATA = 0x516E7CB4
EFI_FREEBSD_SWAP = 0x516E7CB5
EFI_FREEBSD_UFS = 0x516E7CB6
EFI_FREEBSD_VINUM = 0x516E7CB8
EFI_FREEBSD_ZFS = 0x516E7CBA
EFI_OSX_HFS = 0x48465300
EFI_OSX_UFS = 0x55465300
EFI_OSX_ZFS = 0x6A898CC3
EFI_OSX_RAID = 0x52414944
EFI_OSX_RAID_OFFLINE = 0x52414944
EFI_OSX_RECOVERY = 0x426F6F74
EFI_OSX_LABEL = 0x4C616265
EFI_OSX_TV_RECOVERY = 0x5265636F
EFI_OSX_CORE_STORAGE = 0x53746F72
EFI_SOLARIS_BOOT = 0x6A82CB45
EFI_SOLARIS_ROOT = 0x6A85CF4D
EFI_SOLARIS_SWAP = 0x6A87C46F
EFI_SOLARIS_BACKUP = 0x6A8B642B
EFI_SOLARIS_USR = 0x6A898CC3
EFI_SOLARIS_VAR = 0x6A8EF2E9
EFI_SOLARIS_HOME = 0x6A90BA39
EFI_SOLARIS_ALTERNATE = 0x6A9283A5
EFI_SOLARIS_RESERVED1 = 0x6A945A3B
EFI_SOLARIS_RESERVED2 = 0x6A9630D1
EFI_SOLARIS_RESERVED3 = 0x6A980767
EFI_SOLARIS_RESERVED4 = 0x6A96237F
EFI_SOLARIS_RESERVED5 = 0x6A8D2AC7
EFI_NETBSD_SWAP = 0x49F48D32
EFI_NETBSD_FFS = 0x49F48D5A
EFI_NETBSD_LFS = 0x49F48D82
EFI_NETBSD_RAID = 0x49F48DAA
EFI_NETBSD_CONCAT = 0x2DB519C4
EFI_NETBSD_ENCRYPT = 0x2DB519EC
EFI_CHROMEOS_KERNEL = 0xFE3A2A5D
EFI_CHROMEOS_ROOTFS = 0x3CB8E202
EFI_CHROMEOS_FUTURE = 0x2E0A753D
EFI_HAIKU = 0x42465331
EFI_MIDNIGHTBSD_BOOT = 0x85D5E45E
EFI_MIDNIGHTBSD_DATA = 0x85D5E45A
EFI_MIDNIGHTBSD_SWAP = 0x85D5E45B
EFI_MIDNIGHTBSD_UFS = 0x0394EF8B
EFI_MIDNIGHTBSD_VINUM = 0x85D5E45C
EFI_MIDNIGHTBSD_ZFS = 0x85D5E45D
EFI_CEPH_JOURNAL = 0x45B0969E
EFI_CEPH_ENCRYPT = 0x45B0969E
EFI_CEPH_OSD = 0x4FBD7E29
EFI_CEPH_ENCRYPT_OSD = 0x4FBD7E29
EFI_CEPH_CREATE = 0x89C57F98
EFI_CEPH_ENCRYPT_CREATE = 0x89C57F98
EFI_OPENBSD = 0x824CC7A0
EFI_QNX = 0xCEF5A9AD
EFI_PLAN9 = 0xC91818F9
EFI_VMWARE_VMKCORE = 0x9D275380
EFI_VMWARE_VMFS = 0xAA31E02A
EFI_VMWARE_RESERVED = 0x9198EFFC
def __init__(self, num_part_entries=0, part_entry_size=0, part_entry_start_lba=0, loglevel=logging.INFO, *args,
**kwargs):
self.num_part_entries = num_part_entries
self.__logger = self.__logger
self.part_entry_size = part_entry_size
self.part_entry_start_lba = part_entry_start_lba
self.totalsectors = None
self.header = None
self.sectorsize = None
self.partentries = []
self.error = self.__logger.error
self.__logger.setLevel(loglevel)
if loglevel == logging.DEBUG:
logfilename = "log.txt"
fh = logging.FileHandler(logfilename, encoding="utf-8")
self.__logger.addHandler(fh)
def parseheader(self, gptdata, sectorsize=512):
return self.gpt_header(gptdata[sectorsize:sectorsize + 0x5C])
def parse(self, gptdata, sectorsize=512):
self.header = self.gpt_header(gptdata[sectorsize:sectorsize + 0x5C])
self.sectorsize = sectorsize
if self.header.signature != b"EFI PART":
return False
if self.header.revision != 0x10000:
self.error("Unknown GPT revision.")
return False
if self.part_entry_start_lba != 0:
start = self.part_entry_start_lba
else:
start = 2 * sectorsize # mbr + header + part_table
entrysize = self.header.part_entry_size
self.partentries = {}
class partf:
unique = b""
first_lba = 0
last_lba = 0
flags = 0
sector = 0
sectors = 0
type = b""
name = ""
entryoffset = 0
num_part_entries = self.header.num_part_entries
for idx in range(0, num_part_entries):
data = gptdata[start + (idx * entrysize):start + (idx * entrysize) + entrysize]
if int(hexlify(data[16:32]), 16) == 0:
break
partentry = self.gpt_partition(data)
pa = partf()
guid1 = unpack("<I", partentry.unique[0:0x4])[0]
guid2 = unpack("<H", partentry.unique[0x4:0x6])[0]
guid3 = unpack("<H", partentry.unique[0x6:0x8])[0]
guid4 = unpack("<H", partentry.unique[0x8:0xA])[0]
guid5 = hexlify(partentry.unique[0xA:0x10]).decode('utf-8')
pa.unique = "{:08x}-{:04x}-{:04x}-{:04x}-{}".format(guid1, guid2, guid3, guid4, guid5)
pa.sector = partentry.first_lba
pa.sectors = partentry.last_lba - partentry.first_lba + 1
pa.flags = partentry.flags
pa.entryoffset = (self.header.part_entry_start_lba * sectorsize) + (idx * entrysize)
type = int(unpack("<I", partentry.type[0:0x4])[0])
try:
pa.type = self.efi_type(type).name
except:
pa.type = hex(type)
pa.name = partentry.name.replace(b"\x00\x00", b"").decode('utf-16')
if pa.type == "EFI_UNUSED":
continue
self.partentries[pa.name] = pa
self.totalsectors = self.header.first_usable_lba + self.header.last_usable_lba
return True
def print(self):
print(self.tostring())
def tostring(self):
mstr = "\nGPT Table:\n-------------\n"
for partitionname in self.partentries:
partition = self.partentries[partitionname]
active = ((partition.flags >> (
AB_FLAG_OFFSET * 8)) & 0xFF) & AB_PARTITION_ATTR_SLOT_ACTIVE == AB_PARTITION_ATTR_SLOT_ACTIVE
mstr += ("{:20} Offset 0x{:016x}, Length 0x{:016x}, Flags 0x{:016x}, UUID {}, Type {}, Active {}\n".format(
partition.name + ":", partition.sector * self.sectorsize, partition.sectors * self.sectorsize,
partition.flags, partition.unique, partition.type, active))
mstr += ("\nTotal disk size:0x{:016x}, sectors:0x{:016x}\n".format(self.totalsectors * self.sectorsize,
self.totalsectors))
return mstr
def generate_rawprogram(self, lun, sectorsize, directory):
fname = "rawprogram" + str(lun) + ".xml"
with open(os.path.join(directory, fname), "wb") as wf:
mstr = "<?xml version=\"1.0\" ?>\n<data>\n"
partofsingleimage = "false"
readbackverify = "false"
sparse = "false"
for partname in self.partentries:
partition = self.partentries[partname]
filename = partition.name + ".bin"
mstr += f"\t<program SECTOR_SIZE_IN_BYTES=\"{sectorsize}\" " + \
f"file_sector_offset=\"0\" " \
f"filename=\"{filename}\" " + \
f"label=\"{partition.name}\" " \
f"num_partition_sectors=\"{partition.sectors}\" " + \
f"partofsingleimage=\"{partofsingleimage}\" " \
f"physical_partition_number=\"{str(lun)}\" " + \
f"readbackverify=\"{readbackverify}\" " \
f"size_in_KB=\"{(partition.sectors * sectorsize / 1024):.1f}\" " \
f"sparse=\"{sparse}\" " + \
f"start_byte_hex=\"{hex(partition.sector * sectorsize)}\" " \
f"start_sector=\"{partition.sector}\"/>\n"
partofsingleimage = "true"
sectors = self.header.first_usable_lba
mstr += f"\t<program SECTOR_SIZE_IN_BYTES=\"{sectorsize}\" " + \
f"file_sector_offset=\"0\" " + \
f"filename=\"gpt_main{str(lun)}.bin\" " + \
f"label=\"PrimaryGPT\" " + \
f"num_partition_sectors=\"{sectors}\" " + \
f"partofsingleimage=\"{partofsingleimage}\" " + \
f"physical_partition_number=\"{str(lun)}\" " + \
f"readbackverify=\"{readbackverify}\" " + \
f"size_in_KB=\"{(sectors * sectorsize / 1024):.1f}\" " + \
f"sparse=\"{sparse}\" " + \
f"start_byte_hex=\"0x0\" " + \
f"start_sector=\"0\"/>\n"
sectors = self.header.first_usable_lba - 1
mstr += f"\t<program SECTOR_SIZE_IN_BYTES=\"{sectorsize}\" " + \
f"file_sector_offset=\"0\" " + \
f"filename=\"gpt_backup{str(lun)}.bin\" " + \
f"label=\"BackupGPT\" " + \
f"num_partition_sectors=\"{sectors}\" " + \
f"partofsingleimage=\"{partofsingleimage}\" " + \
f"physical_partition_number=\"{str(lun)}\" " + \
f"readbackverify=\"{readbackverify}\" " + \
f"size_in_KB=\"{(sectors * sectorsize / 1024):.1f}\" " + \
f"sparse=\"{sparse}\" " + \
f"start_byte_hex=\"({sectorsize}*NUM_DISK_SECTORS)-{sectorsize * sectors}.\" " + \
f"start_sector=\"NUM_DISK_SECTORS-{sectors}.\"/>\n"
mstr += "</data>"
wf.write(bytes(mstr, "utf-8"))
print(f"Wrote partition xml as {fname}")
def print_gptfile(self, filename):
try:
filesize = os.stat(filename).st_size
with open(filename, "rb") as rf:
size = min(32 * 4096, filesize)
data = rf.read(size)
for sectorsize in [512, 4096]:
result = self.parse(data, sectorsize)
if result:
break
if result:
print(self.tostring())
return result
except Exception as e:
self.error(str(e))
return ""
def test_gpt(self):
res = self.print_gptfile(os.path.join("TestFiles", "gpt_sm8180x.bin"))
assert res, "GPT Partition wasn't decoded properly"
def fix_gpt_crc(self, data):
partentry_size = self.header.num_part_entries * self.header.part_entry_size
partentry_offset = 2 * self.sectorsize
partdata = data[partentry_offset:partentry_offset + partentry_size]
headeroffset = self.sectorsize
headerdata = bytearray(data[headeroffset:headeroffset + self.header.header_size])
headerdata[0x58:0x58 + 4] = pack("<I", crc32(partdata))
headerdata[0x10:0x10 + 4] = pack("<I", 0)
headerdata[0x10:0x10 + 4] = pack("<I", crc32(headerdata))
data[headeroffset:headeroffset + self.header.header_size] = headerdata
return data
def get_flag(self, filename, imagename):
if "." in imagename:
imagename = imagename[:imagename.find(".")]
try:
with open(filename, "rb") as rf:
if os.stat(filename).st_size > 0x200000:
print("Error: GPT is too big or no GPT at all.")
return None
data = rf.read()
return self.get_flag_data(data, imagename)
except FileNotFoundError:
print(f"File not found : {filename}")
return None, None
def get_flag_data(self, gpt: bytes, imagename: str):
for sectorsize in [512, 4096]:
result = self.parse(gpt, sectorsize)
if result:
for partition in self.partentries:
if imagename in partition.name.lower():
return partition.sector, sectorsize
return None, None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="GPT utils")
parser.add_argument("image", help="The path of the GPT disk image")
subparsers = parser.add_subparsers(dest="command", help="sub-command help")
parser_print = subparsers.add_parser("print", help="Print the gpt table")
parser_test = subparsers.add_parser("test", help="Run self-test")
parser_patch = subparsers.add_parser("patch", help="Set active boot slot")
parser_patch.add_argument("partition", help="Extract specific partitions (separated by comma)")
parser_patch.add_argument("-active", action="store_true", help="Set bootable")
parser_extract = subparsers.add_parser("extract", help="Extract the partitions")
parser_extract.add_argument("-out", "-o", help="The path to extract the partitions")
parser_extract.add_argument("-partition", "-p", help="Extract specific partitions (separated by comma)")
args = parser.parse_args()
if args.command not in ["print", "extract", "test", "patch"]:
parser.error("Command is mandatory")
gp = gpt()
if args.command == "print":
if not os.path.exists(args.image):
print(f"File {args.image} does not exist. Aborting.")
sys.exit(1)
gp.print_gptfile(args.image)
elif args.command == "test":
gp.test_gpt()
elif args.command == "patch":
partitition = args.partition
active = args.active
filesize = os.stat(args.image).st_size
with open(args.image, "rb") as rf:
size = min(32 * 4096, filesize)
data = bytearray(rf.read(size))
pdata, poffset = gp.patch(data, partitition, active=active)
data[poffset:poffset + len(pdata)] = pdata
wdata = gp.fix_gpt_crc(data)
if data is not None:
wfilename = args.image + ".patched"
with open(wfilename, "wb") as wf:
wf.write(wdata)
print(f"Successfully wrote patched gpt to {wfilename}")
else:
print("Error on setting bootable mode")
elif args.command == "extract":
if not os.path.exists(args.image):
print(f"File {args.image} does not exist. Aborting.")
sys.exit(1)
filesize = os.stat(args.image).st_size
with open(args.image, "rb", buffering=1024 * 1024) as rf:
data = rf.read(min(32 * 4096, filesize))
ssize = None
for sectorsize in [512, 4096]:
result = gp.parse(data, sectorsize)
if result:
ssize = sectorsize
break
if ssize is not None:
if args.partition == "gpt":
print(f"Extracting gpt to gpt.bin at {hex(0)}, length {hex(32 * ssize)}")
rf.seek(0)
data = rf.read(32 * ssize)
with open("gpt.bin", "wb") as wf:
wf.write(data)
else:
for partition in gp.partentries:
if args.partition is not None:
if partition.name.lower() != args.partition:
continue
name = partition.name
start = partition.sector * ssize
length = partition.sectors * ssize
out = args.out
if out is None:
out = "."
else:
if not os.path.exists(out):
os.makedirs(out)
filename = os.path.join(out, name) + ".bin"
print(f"Extracting {name} to {filename} at {hex(start)}, length {hex(length)}")
rf.seek(start)
bytestoread = length
with open(filename, "wb", buffering=1024 * 1024) as wf:
while bytestoread > 0:
size = min(bytestoread, 0x200000)
data = rf.read(size)
wf.write(data)
bytestoread -= size