minecraft-convert-local-player/bedrock/bedrock.py

440 lines
17 KiB
Python

# Interprets the minecraft bedrock world format.
import struct
import os.path
import numpy as np
from . import leveldb as ldb
from . import nbt
# Handles chunk loading and mapping blocks to chunks.
class World:
def __init__(self, path):
self.path = os.path.join(path, "db")
self.db = None
self.chunks = {}
# Enable use in a with statement.
def __enter__(self):
self.db = ldb.open(self.path)
return self
def __exit__(self, exceptionType, exception, tb):
if exceptionType is None:
self.save()
ldb.close(self.db)
return False
def getChunk(self, x, z, dimension=0):
chunk = self.chunks.get((x, z, dimension), None)
if chunk is None:
chunk = Chunk(self.db, x, z, dimension)
self.chunks[(x, z, dimension)] = chunk
return chunk
def getBlock(self, x, y, z, layer=0, dimension=0):
cx = x // 16
x %= 16
cz = z // 16
z %= 16
chunk = self.getChunk(cx, cz, dimension)
return chunk.getBlock(x, y, z, layer)
def setBlock(self, x, y, z, block, layer=0, dimension=0):
cx = x // 16
x %= 16
cz = z // 16
z %= 16
chunk = self.getChunk(cx, cz, dimension)
return chunk.setBlock(x, y, z, block, layer)
def save(self):
for chunk in self.chunks.values():
chunk.save(self.db)
def iterKeys(self, start=None, end=None):
yield from ldb.iterate(self.db, start, end)
def iterChunks(self, start=None, end=None, dimension=0):
for k, _ in ldb.iterate(self.db):
if dimension == 0:
# Overworld
if len(k) == 9 and k.endswith((b"v", b",")):
x, z = struct.unpack("<ii", k[:8])
if start and (x < start[0] or x >= end[0]):
continue
if end and (z < start[1] or z >= end[1]):
continue
try:
yield self.getChunk(x, z, dimension)
except Exception as e:
print("Error: Couldn't load chunk at {} {} (Dim {}): {}".format(x, z, dimension, e))
else:
# Nether / The End
if len(k) == 13 and k.endswith((b"v", b",")):
x, z, dim = struct.unpack("<iii", k[:12])
if dim != dimension:
continue
if start and (x < start[0] or x >= end[0]):
continue
if end and (z < start[1] or z >= end[1]):
continue
try:
yield self.getChunk(x, z, dimension)
except Exception as e:
print("Error: Couldn't load chunk at {} {} (Dim {}): {}".format(x, z, dimension, e))
# Handles biomes and tile entities. Maps blocks to subchunks.
class Chunk:
def __init__(self, db, x, z, dimension=0):
self.x = x
self.z = z
self.dimension = dimension
# Leveldb chunks are stored in a number of keys with the same prefix.
if self.dimension == 0:
self.keyBase = struct.pack("<ii", self.x, self.z)
else:
self.keyBase = struct.pack("<iii", self.x, self.z, self.dimension)
self.version = self._loadVersion(db)
self.cavesAndCliffs = self.version >= 25
if not self.cavesAndCliffs:
self.hMap, self.biomes = self._load2D(db)
else:
self.hMap, self.biomes = None, None
self.subchunks = []
for i in range(24 if self.cavesAndCliffs else 16):
try:
self.subchunks.append(SubChunk(db, self.x, self.z, i, self.dimension)) #Pass off processing to the subchunk class
#Supposedly if a subchunk exists then all the subchunks below it exist. This is not the case.
except NotFoundError:
self.subchunks.append(None)
self._loadTileEntities(db)
self.entities = self._loadEntities(db)
# Version is simply a stored value.
def _loadVersion(self, db):
try:
try:
version = ldb.get(db, self.keyBase + b",")
except KeyError:
version = ldb.get(db, self.keyBase + b"v")
version = struct.unpack("<B", version)[0]
if version not in [10, 13, 14, 15, 18, 19, 21, 22, 25]:
raise NotImplementedError("Unexpected chunk version {} at chunk {} {} (Dim {}).".format(version, self.x, self.z, self.dimension))
except KeyError:
raise KeyError("Chunk at {}, {} (Dim {}) does not exist.".format(self.x, self.z, self.dimension))
return version
# Load heightmap (seemingly useless) and biome info
def _load2D(self, db):
data = ldb.get(db, self.keyBase + b'-')
heightMap = struct.unpack("<" + "H" * 16 * 16, data[:2 * 16 * 16])
biomes = struct.unpack("B" * 16 * 16, data[2 * 16 * 16:])
return heightMap, biomes
# Tile entities are stored as a bunch of NBT compound tags end to end.
def _loadTileEntities(self, db):
try:
data = ldb.get(db, self.keyBase + b"1")
except KeyError:
return
data = nbt.DataReader(data)
while not data.finished():
nbtData = nbt.decode(data)
x = nbtData.pop("x").payload # We add back theses with the correct value on save, they are important.
y = nbtData.pop("y").payload
z = nbtData.pop("z").payload
block = self.getBlock(x % 16, y, z % 16)
if not block:
print("Warning: Cannot apply nbt to block at {} {} {} since it does not exist.".format(x, y, z))
continue
block.nbt = nbtData
def _loadEntities(self, db):
try:
data = ldb.get(db, self.keyBase + b"2")
except KeyError:
return []
data = nbt.DataReader(data)
entities = []
while not data.finished():
entities.append(nbt.decode(data))
return entities
def getBlock(self, x, y, z, layer=0):
if self.cavesAndCliffs:
y += 64
if y // 16 + 1 > len(self.subchunks) or self.subchunks[y // 16] is None:
return None
return self.subchunks[y // 16].getBlock(x, y % 16, z, layer)
def setBlock(self, x, y, z, block, layer=0):
if self.cavesAndCliffs:
y += 64
while y // 16 + 1 > len(self.subchunks):
self.subchunks.append(SubChunk.empty(self.x, self.z, len(self.subchunks), self.dimension))
if self.subchunks[y // 16] is None:
self.subchunks[y // 16] = SubChunk.empty(self.x, self.z, y // 16, self.dimension)
self.subchunks[y // 16].setBlock(x, y % 16, z, block, layer)
def save(self, db):
version = struct.pack("<B", self.version)
ldb.put(db, self.keyBase + b",", version)
if not self.cavesAndCliffs:
self._save2D(db)
for subchunk in self.subchunks:
if subchunk is None:
continue
subchunk.save(db)
self._saveTileEntities(db)
self._saveEntities(db)
def _save2D(self, db):
data = struct.pack("<" + "H" * 16 * 16, *self.hMap)
data += struct.pack("B" * 16 * 16, *self.biomes)
ldb.put(db, self.keyBase + b'-', data)
def _saveTileEntities(self, db):
data = nbt.DataWriter()
for subchunk in self.subchunks:
if subchunk is None:
continue
for x in range(16):
for y in range(16):
for z in range(16):
block = subchunk.getBlock(x, y, z)
if block.nbt is not None: # Add back the correct position.
block.nbt.add(nbt.TAG_Int("x", subchunk.x * 16 + x))
block.nbt.add(nbt.TAG_Int("y", subchunk.y * 16 + y))
block.nbt.add(nbt.TAG_Int("z", subchunk.z * 16 + z))
nbt.encode(block.nbt, data)
ldb.put(db, self.keyBase + b"1", data.get())
def _saveEntities(self, db):
data = nbt.DataWriter()
for entity in self.entities:
nbt.encode(entity, data)
ldb.put(db, self.keyBase + b"2", data.get())
def __repr__(self):
return "Chunk {} {} (Dim {}): {} subchunks".format(self.x, self.z, self.dimension, len(self.subchunks))
# Handles the blocks and block palette format.
class SubChunk:
def __init__(self, db, x, z, y, dimension=0):
self.dirty = False
self.x = x
self.z = z
self.y = y
self.dimension = dimension
if db is not None: # For creating subchunks, there will be no DB.
# Subchunks are stored as base key + subchunk key `/` + subchunk id (y level // 16)
if self.dimension == 0:
self.key = struct.pack("<iicB", x, z, b'/', y)
else:
self.key = struct.pack("<iiicB", x, z, dimension, b'/', y)
try:
data = ldb.get(db, self.key)
except KeyError:
raise NotFoundError("Subchunk at {} {} (Dim {})/{} not found.".format(x, z, dimension, y))
self.version, data = data[0], data[1:]
if self.version not in [8, 9]:
raise NotImplementedError("Unsupported subchunk version {} at {} {} (Dim {})/{}".format(self.version, x, z, dimension, y))
numStorages, data = data[0], data[1:]
if self.version == 9:
self.y_db, data = data[0], data[1:]
else:
self.y_db = None
self.blocks = []
for i in range(numStorages):
blocks, data = self._loadBlocks(data)
if data:
palette, data = self._loadPalette(data)
self.blocks.append(np.empty(4096, dtype=Block)) # Prepare with correct dtype
for j, block in enumerate(blocks):
block = palette[block]
try: # 1.13 format
#if block["version"].payload != 17629200:
# raise NotImplementedError("Unexpected block version {}".format(block["version"].payload))
self.blocks[i][j] = Block(block["name"].payload, block["states"].payload) # .payload to get actual val
except KeyError: # 1.12 format
self.blocks[i][j] = Block(block["name"].payload, block["val"].payload) # .payload to get actual val
self.blocks[i] = self.blocks[i].reshape(16, 16, 16).swapaxes(1, 2) # Y and Z saved in an inverted order
else:
# I *think* this means the whole subchunk is one type of block - commonly endstone
try: # 1.13 format
blocks = Block(blocks["name"].payload, blocks["states"].payload)
except KeyError: # 1.12 format
blocks = Block(blocks["name"].payload, blocks["val"].payload)
self.blocks.append(np.full(4096, blocks, dtype=Block))
self.blocks[i] = self.blocks[i].reshape(16, 16, 16)
# These arent actual blocks, just ids pointing to the palette.
def _loadBlocks(self, data):
#Ignore LSB of data (its a flag) and get compacting level
bitsPerBlock, data = data[0] >> 1, data[1:]
if bitsPerBlock == 0:
# I *think* this means the whole subchunk is one type of block - commonly endstone
dr = nbt.DataReader(data)
block = nbt.decode(dr)
return (block, None)
blocksPerWord = 32 // bitsPerBlock # Word = 4 bytes, basis of compacting.
numWords = - (-4096 // blocksPerWord) # Ceiling divide is inverted floor divide
blockWords, data = struct.unpack("<" + "I" * numWords, data[:4 * numWords]), data[4 * numWords:]
blocks = np.empty(4096, dtype=np.uint32)
for i, word in enumerate(blockWords):
for j in range(blocksPerWord):
block = word & ((1 << bitsPerBlock) - 1) # Mask out number of bits for one block
word >>= bitsPerBlock # For next iteration
if i * blocksPerWord + j < 4096: # Safety net for padding at end.
blocks[i * blocksPerWord + j] = block
return blocks, data
# NBT encoded block names (with minecraft:) and data values.
def _loadPalette(self, data):
palletLen, data = struct.unpack("<I", data[:4])[0], data[4:]
dr = nbt.DataReader(data)
palette = []
for _ in range(palletLen):
block = nbt.decode(dr)
palette.append(block)
return palette, data[dr.idx:]
def getBlock(self, x, y, z, layer=0):
if layer >= len(self.blocks):
raise KeyError("Subchunk {} {} (Dim {})/{} does not have a layer {}".format(self.x, self.z, self.dimension, self.y, layer))
return self.blocks[layer][x, y, z]
def setBlock(self, x, y, z, block, layer=0):
if layer >= len(self.blocks):
raise KeyError("Subchunk {} {} (Dim {})/{} does not have a layer {}".format(self.x, self.z, self.dimension, self.y, layer))
self.blocks[layer][x, y, z] = block
self.dirty = True
def save(self, db, force=False):
if self.dirty or force:
data = struct.pack("<BB", self.version, len(self.blocks))
for i in range(len(self.blocks)):
palette, blockIDs = self._savePalette(i)
data += self._saveBlocks(len(palette), blockIDs)
data += struct.pack("<I", len(palette))
for block in palette:
data += nbt.encode(block)
if self.version == 9:
data = struct.pack("B", self.y_db) + data
ldb.put(db, self.key, data)
# Compact blockIDs bitwise. See _loadBlocks for details.
def _saveBlocks(self, paletteSize, blockIDs):
bitsPerBlock = max(int(np.ceil(np.log2(paletteSize))), 1)
for bits in [1, 2, 3, 4, 5, 6, 8, 16]:
if bits >= bitsPerBlock:
bitsPerBlock = bits
break
else:
raise NotImplementedError("Too many bits per block needed {} at {} {} (Dim {})/{}".format(bitsPerBlock, self.x, self.z, self.dimension, self.y))
blocksPerWord = 32 // bitsPerBlock
numWords = - (-4096 // blocksPerWord)
data = struct.pack("<B", bitsPerBlock << 1)
for i in range(numWords):
word = 0
for j in range(blocksPerWord - 1, -1, -1):
if i * blocksPerWord + j < 4096:
word <<= bitsPerBlock
word |= blockIDs[i * blocksPerWord + j]
data += struct.pack("<I", word)
return data
# Make a palette, and get the block ids at the same time
def _savePalette(self, layer):
blocks = self.blocks[layer].swapaxes(1, 2).reshape(4096) # Y and Z saved in a inverted order
blockIDs = np.empty(4096, dtype=np.uint32)
palette = []
mapping = {}
for i, block in enumerate(blocks):
# Generate the palette nbt for the given block
short = (block.name, str(block.properties))
if short not in mapping:
if isinstance(block.properties, int): # 1.12
palette.append(nbt.TAG_Compound("", [nbt.TAG_String("name", block.name), nbt.TAG_Short("val", block.properties)]))
else: # 1.13
palette.append(nbt.TAG_Compound("", [
nbt.TAG_String("name", block.name),
nbt.TAG_Compound("states", block.properties),
nbt.TAG_Int("version", 17629200)
]))
mapping[short] = len(palette) - 1
blockIDs[i] = mapping[short]
return palette, blockIDs
@classmethod
def empty(cls, x, z, y, dimension=0):
subchunk = cls(None, x, z, y, dimension)
subchunk.version = 8
subchunk.blocks = [np.full((16, 16, 16), Block("minecraft:air"), dtype=Block)]
return subchunk
# Generic block storage.
class Block:
__slots__ = ["name", "properties", "nbt"]
def __init__(self, name, properties=None, nbtData=None):
self.name = name
self.properties = properties or []
self.nbt = nbtData
def __eq__(self, other):
if not isinstance(other, Block):
return False
return self.name == other.name and self.properties == other.properties and self.nbt == other.nbt
def __repr__(self):
return "{} {}".format(self.name, self.properties)
def __hash__(self):
return self.__repr__().__hash__()
# Handles NBT generation for command blocks.
class CommandBlock(Block):
nameMap = {"I": "command_block", "C": "chain_command_block", "R": "repeating_command_block"}
dMap = {"d": 0, "u": 1, "-z": 2, "+z": 3, "-x": 4, "+x": 5}
def __init__(self, cmd="", hover="", block="I", d="u", cond=False, redstone=False, time=0, first=False):
name = "minecraft:" + self.nameMap[block]
dv = self.dMap[d]
if cond:
dv += 8
nbtData = nbt.TAG_Compound("", [])
nbtData.add(nbt.TAG_Byte("auto", int(not redstone)))
nbtData.add(nbt.TAG_String("Command", cmd))
nbtData.add(nbt.TAG_String("CustomName", hover))
nbtData.add(nbt.TAG_Byte("powered", int(block == "R" and not redstone)))
if time == 0 and not first:
nbtData.add(nbt.TAG_Int("Version", 8))
else:
nbtData.add(nbt.TAG_Int("Version", 9))
nbtData.add(nbt.TAG_Byte("ExecuteOnFirstTick", int(first)))
nbtData.add(nbt.TAG_Int("TickDelay", time))
nbtData.add(nbt.TAG_Byte("conditionMet", 0))
nbtData.add(nbt.TAG_String("id", "CommandBlock"))
nbtData.add(nbt.TAG_Byte("isMovable", 1))
nbtData.add(nbt.TAG_Int("LPCommandMode", 0)) # Not sure what these LPModes do. This works.
nbtData.add(nbt.TAG_Byte("LPConditionalMode", 0))
nbtData.add(nbt.TAG_Byte("LPRedstoneMode", 0))
nbtData.add(nbt.TAG_Long("LastExecution", 0))
nbtData.add(nbt.TAG_String("LastOutput", ""))
nbtData.add(nbt.TAG_List("LastOutputParams", []))
nbtData.add(nbt.TAG_Int("SuccessCount", 0))
nbtData.add(nbt.TAG_Byte("TrackOutput", 1))
super().__init__(name, dv, nbtData)
class NotFoundError(Exception):
pass