|
@@ -0,0 +1,237 @@
|
|
|
+#!/usr/bin/python3
|
|
|
+
|
|
|
+import contextlib
|
|
|
+import enum
|
|
|
+import hashlib
|
|
|
+import io
|
|
|
+import itertools
|
|
|
+import mmap
|
|
|
+import pathlib
|
|
|
+import pickle
|
|
|
+import string
|
|
|
+import struct
|
|
|
+import typing
|
|
|
+
|
|
|
+import angr
|
|
|
+import msgspec
|
|
|
+
|
|
|
+from . import vtable as vt_helpers
|
|
|
+from .types import ByteSignature, Code, IntLiteral
|
|
|
+
|
|
|
+KEY_AS_IS = string.Template("${name}")
|
|
|
+
|
|
|
+
|
|
|
+def KEY_SUFFIX(s):
|
|
|
+ return string.Template(f"${{name}} [{s}]")
|
|
|
+
|
|
|
+
|
|
|
+# collection of functions that can be used during value read operations
|
|
|
+eval_functions = {
|
|
|
+ # truncate the given value to the given number of bits
|
|
|
+ # https://stackoverflow.com/a/53424236
|
|
|
+ "truncate": lambda val, num_bits: val & (2**num_bits - 1),
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def convert_types(*types):
|
|
|
+ def _dec_hook(type: typing.Type, obj: typing.Any) -> typing.Any:
|
|
|
+ if type in types:
|
|
|
+ return type(obj)
|
|
|
+ raise NotImplementedError
|
|
|
+
|
|
|
+ return _dec_hook
|
|
|
+
|
|
|
+
|
|
|
+# entries may output multiple values, such as separate Windows / Linux vtable indices
|
|
|
+# or bytesigs + offsets
|
|
|
+ResultValues = dict[string.Template, typing.Any]
|
|
|
+
|
|
|
+
|
|
|
+class BaseBinary:
|
|
|
+ path: pathlib.Path
|
|
|
+ angr: angr.Project
|
|
|
+ _file: io.IOBase
|
|
|
+
|
|
|
+ def __init__(self, path: pathlib.Path, cache_path: pathlib.Path | None = None):
|
|
|
+ self.path = path
|
|
|
+ self._file = open(self.path, "rb")
|
|
|
+
|
|
|
+ file_hash = hashlib.sha256(self.path.read_bytes())
|
|
|
+ cached_proj = (cache_path or pathlib.Path()) / f"{file_hash.hexdigest()}.angr.pkl"
|
|
|
+ if not cached_proj.exists():
|
|
|
+ self.angr = angr.Project(self.path, load_options={"auto_load_libs": False})
|
|
|
+ cached_proj.write_bytes(pickle.dumps(self.angr))
|
|
|
+ else:
|
|
|
+ self.angr = pickle.loads(cached_proj.read_bytes())
|
|
|
+
|
|
|
+ @contextlib.contextmanager
|
|
|
+ def mmap(self):
|
|
|
+ mm = mmap.mmap(self._file.fileno(), 0, access=mmap.ACCESS_READ)
|
|
|
+ yield mm
|
|
|
+ mm.close()
|
|
|
+
|
|
|
+ def read(self, address, size) -> bytes:
|
|
|
+ # shorthand to read a value from a physical file
|
|
|
+ with self.mmap() as memory:
|
|
|
+ return memory[address : address + size]
|
|
|
+
|
|
|
+
|
|
|
+class WindowsBinary(BaseBinary):
|
|
|
+ def __init__(self, path: pathlib.Path, cache_path: pathlib.Path | None = None):
|
|
|
+ super().__init__(path, cache_path)
|
|
|
+
|
|
|
+
|
|
|
+class LinuxBinary(BaseBinary):
|
|
|
+ def __init__(self, path: pathlib.Path, cache_path: pathlib.Path | None = None):
|
|
|
+ super().__init__(path, cache_path)
|
|
|
+
|
|
|
+
|
|
|
+PlatformBinary = WindowsBinary | LinuxBinary
|
|
|
+
|
|
|
+
|
|
|
+class NumericOutputFormat(enum.StrEnum):
|
|
|
+ INT = "int"
|
|
|
+ HEX = "hex"
|
|
|
+ HEX_SUFFIX = "hex_suffix"
|
|
|
+
|
|
|
+ def format_value(self, value) -> str:
|
|
|
+ if self == NumericOutputFormat.HEX:
|
|
|
+ return hex(value)
|
|
|
+ elif self == NumericOutputFormat.HEX_SUFFIX:
|
|
|
+ return f"{value:X}h"
|
|
|
+ raise NotImplementedError(f"Missing numeric output for {self.value}")
|
|
|
+
|
|
|
+
|
|
|
+class BaseEntry(msgspec.Struct, kw_only=True):
|
|
|
+ # the partial path pointing to a binary
|
|
|
+ target: pathlib.Path
|
|
|
+
|
|
|
+ def process(self, bin: PlatformBinary) -> ResultValues:
|
|
|
+ raise NotImplementedError(f"Cannot process {type(self).__qualname__}")
|
|
|
+
|
|
|
+
|
|
|
+class LocationEntry(BaseEntry):
|
|
|
+ symbol: str | None = None
|
|
|
+ offset: IntLiteral = IntLiteral("0")
|
|
|
+ bytescan: ByteSignature | None = None
|
|
|
+
|
|
|
+ offset_fmt: NumericOutputFormat = NumericOutputFormat.HEX
|
|
|
+
|
|
|
+ def __post_init__(self):
|
|
|
+ if self.bytescan:
|
|
|
+ return
|
|
|
+ if self.symbol:
|
|
|
+ return
|
|
|
+ raise ValueError("Missing location anchor (expected either 'bytescan' or 'symbol')")
|
|
|
+
|
|
|
+ def calculate_phys_address(self, bin: PlatformBinary) -> int:
|
|
|
+ # returns the physical offset within the file
|
|
|
+ if self.bytescan:
|
|
|
+ with bin.mmap() as memory:
|
|
|
+ matches = self.bytescan.expr.finditer(memory)
|
|
|
+ match = next(matches, None)
|
|
|
+ if match:
|
|
|
+ return match.start() + self.offset
|
|
|
+ else:
|
|
|
+ raise AssertionError(
|
|
|
+ "No matches found for 'bytescan' value " f"{self.bytescan.display_str}"
|
|
|
+ )
|
|
|
+ sym = bin.angr.loader.find_symbol(self.symbol)
|
|
|
+ if not sym:
|
|
|
+ raise AssertionError("Could not find symbol {self.symbol}")
|
|
|
+ offset = bin.angr.loader.main_object.addr_to_offset(sym.rebased_addr + self.offset)
|
|
|
+ assert offset
|
|
|
+ return offset
|
|
|
+
|
|
|
+
|
|
|
+class VirtualFunctionEntry(BaseEntry, tag="vfn"):
|
|
|
+ # linux-specific entry that takes a symbol and returns values for Windows / Linux
|
|
|
+ symbol: str
|
|
|
+ typename: str | None = msgspec.field(name="vtable", default=None)
|
|
|
+
|
|
|
+ def __post_init__(self):
|
|
|
+ raise ValueError("Missing vfn?")
|
|
|
+
|
|
|
+ @property
|
|
|
+ def typename_from_symbol(self):
|
|
|
+ if not self.symbol.startswith("_ZN"):
|
|
|
+ return
|
|
|
+ start_range = 3
|
|
|
+ if self.symbol.startswith("_ZNK"):
|
|
|
+ start_range = 4
|
|
|
+
|
|
|
+ # this only handles the simple case of a non-template classname
|
|
|
+ int_prefix = "".join(itertools.takewhile(str.isdigit, self.symbol[start_range:]))
|
|
|
+ chars_to_read = int(int_prefix)
|
|
|
+
|
|
|
+ end_range = start_range + len(int_prefix) + chars_to_read
|
|
|
+ if not self.symbol[end_range].isdigit():
|
|
|
+ raise ValueError(f"Could not parse function symbol {self.symbol} into a type name")
|
|
|
+ return self.symbol[start_range:end_range]
|
|
|
+
|
|
|
+ def process(self, bin: PlatformBinary) -> ResultValues:
|
|
|
+ # returns windows and linux vtable offsets
|
|
|
+ # TODO: implement
|
|
|
+ assert isinstance(bin, LinuxBinary)
|
|
|
+ self.typename = self.typename or self.typename_from_symbol
|
|
|
+ vtda = bin.angr.analyses.VtableDisambiguator()
|
|
|
+ vtsym = bin.angr.loader.find_symbol(f"_ZTV{self.typename}")
|
|
|
+ if not vtsym:
|
|
|
+ raise ValueError(f"Could not find vtable symbol _ZTV{self.typename}")
|
|
|
+ orig_vtable, *thunk_vtables = vt_helpers.get_vtables_from_address(bin, vtda, vtsym)
|
|
|
+ win_vtable = vt_helpers.get_windows_vtables_from(bin, vtda, vtsym)
|
|
|
+
|
|
|
+ sym = bin.angr.loader.find_symbol(self.symbol)
|
|
|
+ return {
|
|
|
+ KEY_SUFFIX("LINUX"): orig_vtable.index(sym) if sym in orig_vtable else None,
|
|
|
+ KEY_SUFFIX("WINDOWS"): win_vtable.index(sym) if sym in win_vtable else None,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+class ByteSigEntry(LocationEntry, tag="bytesig", kw_only=True):
|
|
|
+ # value to be inserted into gameconf after asserting that the given location matches
|
|
|
+ contents: ByteSignature
|
|
|
+
|
|
|
+ # most bytesigs are expected to be unique; escape hatch for those that are just typecasted
|
|
|
+ allow_multiple: bool = False
|
|
|
+
|
|
|
+ def process(self, bin: PlatformBinary) -> ResultValues:
|
|
|
+ with bin.mmap() as memory:
|
|
|
+ matches = self.contents.expr.finditer(memory)
|
|
|
+ match = next(matches, False)
|
|
|
+ if not match:
|
|
|
+ # no matches found at all, fail validation
|
|
|
+ raise AssertionError(f"No matches found for {self.contents.display_str}")
|
|
|
+ if not self.allow_multiple and next(matches, False):
|
|
|
+ # non-unique byte pattern, fail validation
|
|
|
+ raise AssertionError(f"Multiple matches found for {self.contents.display_str}")
|
|
|
+ return {
|
|
|
+ KEY_AS_IS: self.contents.gameconf_str,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+class ValueReadEntry(LocationEntry, tag="value", kw_only=True):
|
|
|
+ # value to decode at a given symbol / offset
|
|
|
+ struct: struct.Struct
|
|
|
+ assert_stmt: Code | None = msgspec.field(default=None, name="assert")
|
|
|
+ modify_stmt: Code | None = msgspec.field(default=None, name="modify")
|
|
|
+
|
|
|
+ def process(self, bin: PlatformBinary) -> ResultValues:
|
|
|
+ address = self.calculate_phys_address(bin)
|
|
|
+ data = bin.read(address, self.struct.size)
|
|
|
+ result, *_ = self.struct.unpack(data)
|
|
|
+
|
|
|
+ if self.modify_stmt:
|
|
|
+ result = self.modify_stmt.eval(value=result)
|
|
|
+
|
|
|
+ # run assertion to ensure value is expected
|
|
|
+ if self.assert_stmt and not self.assert_stmt.eval(value=result, **eval_functions):
|
|
|
+ raise AssertionError(f"Assertion failed: '{self.assert_stmt}' for value {result}")
|
|
|
+ return {
|
|
|
+ KEY_AS_IS: result,
|
|
|
+ KEY_SUFFIX("OFFSET"): self.offset_fmt.format_value(self.offset),
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ConfigEntry = typing.Union[VirtualFunctionEntry, ByteSigEntry, ValueReadEntry]
|
|
|
+GameConfDict = dict[str, ConfigEntry]
|