Polish up the heap analysis script and make it more CLI friendly.

It can now render the heap layout over a sequence of ram dumps.

The mpy analysis is also better at parsing mpy files.
crypto-aes
Scott Shawcroft 5 years ago
parent 416abe33ed
commit da330f0cab
  1. 843
      tools/analyze_heap_dump.py
  2. 315
      tools/analyze_mpy.py
  3. 2
      tools/gc_activity.md
  4. 5
      tools/output_gc_until_repl.txt

@ -9,6 +9,10 @@ import sys
import pygraphviz as pgv
import io
import html
import os.path
import string
import click
from analyze_mpy import Prelude
@ -32,176 +36,211 @@ READLINE_HIST_SIZE = 8
SKIP_SYMBOLS = [".debug_ranges", ".debug_frame", ".debug_loc", ".comment", ".debug_str", ".debug_line", ".debug_abbrev", ".debug_info", "COMMON"]
ownership_graph = pgv.AGraph(directed=True)
with open(sys.argv[1], "rb") as f:
ram = f.read()
with open(sys.argv[2], "rb") as f:
rom = f.read()
symbols = {} # name -> address, size
symbol_lookup = {} # address -> name
manual_symbol_map = {} # autoname -> name
def add_symbol(name, address=None, size=None):
global symbols
if address:
address = int(address, 0)
if size:
size = int(size, 0)
if name in symbols:
if address and symbols[name][0] and symbols[name][0] != address:
print("Conflicting symbol: {}".format(name))
return
if not address:
address = symbols[name][0]
if not size:
size = symbols[name][1]
symbols[name] = (address, size)
if address:
if not size:
size = 4
for offset in range(0, size, 4):
symbol_lookup[address + offset] = "{}+{}".format(name, offset)
with open(sys.argv[3], "r") as f:
common_symbols = False
name = None
for line in f:
line = line.strip()
parts = line.split()
if line.startswith("Common symbol"):
common_symbols = True
if line == "Discarded input sections":
common_symbols = False
if common_symbols:
if len(parts) == 1:
name = parts[0]
elif len(parts) == 2 and name:
add_symbol(name, size=parts[0])
name = None
elif len(parts) == 3:
add_symbol(parts[0], size=parts[1])
name = None
else:
if len(parts) == 2 and parts[0].startswith("0x") and not parts[1].startswith("0x"):
add_symbol(parts[1], parts[0])
if len(parts) == 4 and parts[0] not in SKIP_SYMBOLS and parts[1].startswith("0x") and parts[2].startswith("0x"):
name, address, size, source = parts
if name.startswith((".text", ".rodata", ".bss")) and name.count(".") > 1:
name = name.split(".")[-1]
add_symbol(name, address, size)
# Linker symbols
if len(parts) >= 4 and parts[0].startswith("0x") and parts[2] == "=" and parts[1] != ".":
add_symbol(parts[1], parts[0])
rom_start = symbols["_sfixed"][0]
ram_start = symbols["_srelocate"][0]
def load(address, size=4):
if size is None:
raise ValueError("You must provide a size")
if address > ram_start:
ram_address = address - ram_start
if (ram_address + size) > len(ram):
raise ValueError("Unable to read 0x{:08x} from ram.".format(address))
return ram[ram_address:ram_address+size]
elif address < len(rom):
if (address + size) > len(rom):
raise ValueError("Unable to read 0x{:08x} from rom.".format(address))
return rom[address:address+size]
def load_pointer(address):
return struct.unpack("<I", load(address))[0]
heap_start, heap_size = symbols["heap"]
heap = load(heap_start, heap_size)
total_byte_len = len(heap)
# These change every run so we load them from the symbol table
mp_state_ctx = symbols["mp_state_ctx"][0]
manual_symbol_map["mp_state_ctx+24"] = "mp_state_ctx.vm.last_pool"
last_pool = load_pointer(mp_state_ctx + 24) # (gdb) p &mp_state_ctx.vm.last_pool
manual_symbol_map["mp_state_ctx+92"] = "mp_state_ctx.vm.dict_main.map.table"
dict_main_table = load_pointer(mp_state_ctx + 92) # (gdb) p &mp_state_ctx.vm.dict_main.map.table
manual_symbol_map["mp_state_ctx+72"] = "mp_state_ctx.vm.mp_loaded_modules_dict.map.table"
imports_table = load_pointer(mp_state_ctx + 72) # (gdb) p &mp_state_ctx.vm.mp_loaded_modules_dict.map.table
manual_symbol_map["mp_state_ctx+108"] = "mp_state_ctx.vm.mp_sys_path_obj.items"
manual_symbol_map["mp_state_ctx+124"] = "mp_state_ctx.vm.mp_sys_argv_obj.items"
for i in range(READLINE_HIST_SIZE):
manual_symbol_map["mp_state_ctx+{}".format(128 + i * 4)] = "mp_state_ctx.vm.readline_hist[{}]".format(i)
tuple_type = symbols["mp_type_tuple"][0]
type_type = symbols["mp_type_type"][0]
map_type = symbols["mp_type_map"][0]
dict_type = symbols["mp_type_dict"][0]
property_type = symbols["mp_type_property"][0]
str_type = symbols["mp_type_str"][0]
function_types = [symbols["mp_type_fun_" + x][0] for x in ["bc", "builtin_0", "builtin_1", "builtin_2", "builtin_3", "builtin_var"]]
bytearray_type = symbols["mp_type_bytearray"][0]
dynamic_type = 0x40000000 # placeholder, doesn't match any memory
type_colors = {
dict_type: "red",
property_type: "yellow",
map_type: "blue",
type_type: "orange",
tuple_type: "skyblue",
str_type: "pink",
bytearray_type: "purple"
}
pool_shift = heap_start % BYTES_PER_BLOCK
print("Total heap length:", total_byte_len)
atb_length = total_byte_len * BITS_PER_BYTE // (BITS_PER_BYTE + BITS_PER_BYTE * BLOCKS_PER_ATB // BLOCKS_PER_FTB + BITS_PER_BYTE * BLOCKS_PER_ATB * BYTES_PER_BLOCK)
print("ATB length:", atb_length)
pool_length = atb_length * BLOCKS_PER_ATB * BYTES_PER_BLOCK
print("Total allocatable:", pool_length)
gc_finaliser_table_byte_len = (atb_length * BLOCKS_PER_ATB + BLOCKS_PER_FTB - 1) // BLOCKS_PER_FTB
print("FTB length:", gc_finaliser_table_byte_len)
pool_start = heap_start + total_byte_len - pool_length - pool_shift
pool = heap[-pool_length-pool_shift:]
map_element_blocks = [dict_main_table, imports_table]
string_blocks = []
bytecode_blocks = []
qstr_pools = []
qstr_chunks = []
block_data = {}
# Find all the qtr pool addresses.
prev_pool = last_pool
while prev_pool > ram_start:
qstr_pools.append(prev_pool)
prev_pool = load_pointer(prev_pool)
longest_free = 0
current_free = 0
current_allocation = 0
total_free = 0
for i in range(atb_length):
# Each atb byte is four blocks worth of info
atb = heap[i]
for j in range(4):
block_state = (atb >> (j * 2)) & 0x3
if block_state != AT_FREE and current_free > 0:
print("{} bytes free".format(current_free * BYTES_PER_BLOCK))
current_free = 0
if block_state != AT_TAIL and current_allocation > 0:
@click.command()
@click.argument("ram_filename")
@click.argument("bin_filename")
@click.argument("map_filename")
@click.option("--print_block_contents", default=False,
help="Prints the contents of each allocated block")
@click.option("--print_unknown_types", default=False,
help="Prints the micropython base type if we don't understand it.")
@click.option("--print_block_state", default=False,
help="Prints the heap block states (allocated or free)")
@click.option("--print_conflicting_symbols", default=False,
help="Prints conflicting symbols from the map")
@click.option("--print-heap-structure/--no-print-heap-structure", default=False,
help="Print heap structure")
@click.option("--output_directory", default="heapvis",
help="Destination for rendered output")
@click.option("--draw-heap-layout/--no-draw-heap-layout", default=True,
help="Draw the heap layout")
@click.option("--draw-heap-ownership/--no-draw-heap-ownership", default=False,
help="Draw the ownership graph of blocks on the heap")
@click.option("--draw-heap-ownership/--no-draw-heap-ownership", default=False,
help="Draw the ownership graph of blocks on the heap")
@click.option("--analyze-snapshots", default="last", type=click.Choice(['all', 'last']))
def do_all_the_things(ram_filename, bin_filename, map_filename, print_block_contents,
print_unknown_types, print_block_state, print_conflicting_symbols,
print_heap_structure, output_directory, draw_heap_layout,
draw_heap_ownership, analyze_snapshots):
with open(ram_filename, "rb") as f:
ram_dump = f.read()
with open(bin_filename, "rb") as f:
rom = f.read()
symbols = {} # name -> address, size
symbol_lookup = {} # address -> name
manual_symbol_map = {} # autoname -> name
def add_symbol(name, address=None, size=None):
if "lto_priv" in name:
name = name.split(".")[0]
if address:
address = int(address, 0)
if size:
size = int(size, 0)
if name in symbols:
if address and symbols[name][0] and symbols[name][0] != address:
if print_conflicting_symbols:
print("Conflicting symbol: {} at addresses 0x{:08x} and 0x{:08x}".format(name, address, symbols[name][0]))
return
if not address:
address = symbols[name][0]
if not size:
size = symbols[name][1]
symbols[name] = (address, size)
if address:
if not size:
size = 4
for offset in range(0, size, 4):
symbol_lookup[address + offset] = "{}+{}".format(name, offset)
with open(map_filename, "r") as f:
common_symbols = False
name = None
for line in f:
line = line.strip()
parts = line.split()
if line.startswith("Common symbol"):
common_symbols = True
if line == "Discarded input sections":
common_symbols = False
if common_symbols:
if len(parts) == 1:
name = parts[0]
elif len(parts) == 2 and name:
add_symbol(name, size=parts[0])
name = None
elif len(parts) == 3:
add_symbol(parts[0], size=parts[1])
name = None
else:
if len(parts) == 1 and parts[0].startswith((".text", ".rodata", ".bss")) and parts[0].count(".") > 1 and not parts[0].isnumeric() and ".str" not in parts[0]:
name = parts[0].split(".")[2]
if len(parts) == 3 and parts[0].startswith("0x") and parts[1].startswith("0x") and name:
add_symbol(name, parts[0], parts[1])
name = None
if len(parts) == 2 and parts[0].startswith("0x") and not parts[1].startswith("0x"):
add_symbol(parts[1], parts[0])
if len(parts) == 4 and parts[0] not in SKIP_SYMBOLS and parts[1].startswith("0x") and parts[2].startswith("0x"):
name, address, size, source = parts
if name.startswith((".text", ".rodata", ".bss")) and name.count(".") > 1:
name = name.split(".")[-1]
add_symbol(name, address, size)
name = None
# Linker symbols
if len(parts) >= 4 and parts[0].startswith("0x") and parts[2] == "=" and parts[1] != ".":
add_symbol(parts[1], parts[0])
rom_start = symbols["_sfixed"][0]
ram_start = symbols["_srelocate"][0]
ram_end = symbols["_estack"][0]
ram_length = ram_end - ram_start
if analyze_snapshots == "all":
snapshots = range(len(ram_dump) // ram_length - 1, -1, -1)
elif analyze_snapshots == "last":
snapshots = range(len(ram_dump) // ram_length - 1, len(ram_dump) // ram_length - 2, -1)
for snapshot_num in snapshots:
ram = ram_dump[ram_length*snapshot_num:ram_length*(snapshot_num + 1)]
ownership_graph = pgv.AGraph(directed=True)
def load(address, size=4):
if size is None:
raise ValueError("You must provide a size")
if address > ram_start:
ram_address = address - ram_start
if (ram_address + size) > len(ram):
raise ValueError("Unable to read 0x{:08x} from ram.".format(address))
return ram[ram_address:ram_address+size]
elif address < len(rom):
if (address + size) > len(rom):
raise ValueError("Unable to read 0x{:08x} from rom.".format(address))
return rom[address:address+size]
def load_pointer(address):
return struct.unpack("<I", load(address))[0]
heap_start, heap_size = symbols["heap"]
heap = load(heap_start, heap_size)
total_byte_len = len(heap)
# These change every run so we load them from the symbol table
mp_state_ctx = symbols["mp_state_ctx"][0]
manual_symbol_map["mp_state_ctx+20"] = "mp_state_ctx.vm.last_pool"
last_pool = load_pointer(mp_state_ctx + 20) # (gdb) p &mp_state_ctx.vm.last_pool
manual_symbol_map["mp_state_ctx+88"] = "mp_state_ctx.vm.dict_main.map.table"
dict_main_table = load_pointer(mp_state_ctx + 88) # (gdb) p &mp_state_ctx.vm.dict_main.map.table
manual_symbol_map["mp_state_ctx+68"] = "mp_state_ctx.vm.mp_loaded_modules_dict.map.table"
imports_table = load_pointer(mp_state_ctx + 68) # (gdb) p &mp_state_ctx.vm.mp_loaded_modules_dict.map.table
manual_symbol_map["mp_state_ctx+104"] = "mp_state_ctx.vm.mp_sys_path_obj.items"
manual_symbol_map["mp_state_ctx+120"] = "mp_state_ctx.vm.mp_sys_argv_obj.items"
for i in range(READLINE_HIST_SIZE):
manual_symbol_map["mp_state_ctx+{}".format(128 + i * 4)] = "mp_state_ctx.vm.readline_hist[{}]".format(i)
tuple_type = symbols["mp_type_tuple"][0]
type_type = symbols["mp_type_type"][0]
map_type = symbols["mp_type_map"][0]
dict_type = symbols["mp_type_dict"][0]
property_type = symbols["mp_type_property"][0]
str_type = symbols["mp_type_str"][0]
function_types = [symbols["mp_type_fun_" + x][0] for x in ["bc", "builtin_0", "builtin_1", "builtin_2", "builtin_3", "builtin_var"]]
bytearray_type = symbols["mp_type_bytearray"][0]
dynamic_type = 0x40000000 # placeholder, doesn't match any memory
type_colors = {
dict_type: "red",
property_type: "yellow",
map_type: "blue",
type_type: "orange",
tuple_type: "skyblue",
str_type: "pink",
bytearray_type: "purple"
}
pool_shift = heap_start % BYTES_PER_BLOCK
atb_length = total_byte_len * BITS_PER_BYTE // (BITS_PER_BYTE + BITS_PER_BYTE * BLOCKS_PER_ATB // BLOCKS_PER_FTB + BITS_PER_BYTE * BLOCKS_PER_ATB * BYTES_PER_BLOCK)
pool_length = atb_length * BLOCKS_PER_ATB * BYTES_PER_BLOCK
gc_finaliser_table_byte_len = (atb_length * BLOCKS_PER_ATB + BLOCKS_PER_FTB - 1) // BLOCKS_PER_FTB
if print_heap_structure:
print("mp_state_ctx at 0x{:08x} and length {}".format(*symbols["mp_state_ctx"]))
print("Total heap length:", total_byte_len)
print("ATB length:", atb_length)
print("Total allocatable:", pool_length)
print("FTB length:", gc_finaliser_table_byte_len)
pool_start = heap_start + total_byte_len - pool_length - pool_shift
pool = heap[-pool_length-pool_shift:]
total_height = 65 * 18
total_width = (pool_length // (64 * 16)) * 90
map_element_blocks = [dict_main_table, imports_table]
string_blocks = []
bytecode_blocks = []
qstr_pools = []
qstr_chunks = []
block_data = {}
# Find all the qtr pool addresses.
prev_pool = last_pool
while prev_pool > ram_start:
qstr_pools.append(prev_pool)
prev_pool = load_pointer(prev_pool)
def save_allocated_block(end, current_allocation):
allocation_length = current_allocation * BYTES_PER_BLOCK
end = (i * BLOCKS_PER_ATB + j) * BYTES_PER_BLOCK
start = end - allocation_length
address = pool_start + start
data = pool[start:end]
print("0x{:x} {} bytes allocated".format(address, allocation_length))
if print_block_state:
print("0x{:x} {} bytes allocated".format(address, allocation_length))
if print_block_contents:
print(data)
rows = ""
for k in range(current_allocation - 1):
@ -214,6 +253,7 @@ for i in range(atb_length):
ownership_graph.add_node(address, label=table, style="invisible", shape="plaintext")
potential_type = None
node = ownership_graph.get_node(address)
node.attr["height"] = 0.25 * current_allocation
block_data[address] = data
for k in range(len(data) // 4):
word = struct.unpack_from("<I", data, offset=(k * 4))[0]
@ -226,9 +266,8 @@ for i in range(atb_length):
bgcolor = "green"
elif potential_type in type_colors:
bgcolor = type_colors[potential_type]
else:
pass
#print("unknown type", hex(potential_type))
elif print_unknown_types:
print("unknown type", hex(potential_type))
node.attr["label"] = "<" + node.attr["label"].replace("\"gray\"", "\"" + bgcolor + "\"") + ">"
if potential_type == str_type and k == 3:
@ -262,180 +301,294 @@ for i in range(atb_length):
if k == 2 and 0x20000000 < word < 0x20040000:
bytecode_blocks.append(word)
current_allocation = 0
if block_state == AT_FREE:
current_free += 1
total_free += 1
elif block_state == AT_HEAD:
current_allocation = 1
elif block_state == AT_TAIL:
current_allocation += 1
longest_free = max(longest_free, current_free)
if current_free > 0:
print("{} bytes free".format(current_free * BYTES_PER_BLOCK))
def is_qstr(obj):
return obj & 0xff800007 == 0x00000006
def find_qstr(qstr_index):
pool_ptr = last_pool
if not is_qstr(qstr_index):
return "object"
qstr_index >>= 3
while pool_ptr != 0:
#print(hex(pool_ptr))
if pool_ptr in block_data:
pool = block_data[pool_ptr]
prev, total_prev_len, alloc, length = struct.unpack_from("<IIII", pool)
else:
rom_offset = pool_ptr - rom_start
prev, total_prev_len, alloc, length = struct.unpack_from("<IIII", rom[rom_offset:rom_offset+32])
pool = rom[rom_offset:rom_offset+length*4]
#print("rom pool")
#print(hex(prev), total_prev_len, alloc, length)
#print(qstr_index, total_prev_len)
if qstr_index >= total_prev_len:
offset = (qstr_index - total_prev_len) * 4 + 16
start = struct.unpack_from("<I", pool, offset=offset)[0]
#print(hex(start))
if start < heap_start:
start -= rom_start
if start > len(rom):
return "more than rom: {:x}".format(start + rom_start)
qstr_hash, qstr_len = struct.unpack("<BB", rom[start:start+2])
return rom[start+2:start+2+qstr_len].decode("utf-8")
longest_free = 0
current_free = 0
current_allocation = 0
total_free = 0
for i in range(atb_length):
# Each atb byte is four blocks worth of info
atb = heap[i]
for j in range(4):
block_state = (atb >> (j * 2)) & 0x3
if block_state != AT_FREE and current_free > 0:
if print_block_state:
print("{} bytes free".format(current_free * BYTES_PER_BLOCK))
current_free = 0
if block_state != AT_TAIL and current_allocation > 0:
save_allocated_block((i * BLOCKS_PER_ATB + j) * BYTES_PER_BLOCK, current_allocation)
current_allocation = 0
if block_state == AT_FREE:
current_free += 1
total_free += 1
elif block_state == AT_HEAD or block_state == AT_MARK:
current_allocation = 1
elif block_state == AT_TAIL and current_allocation > 0:
# In gc_free the logging happens before the tail is freed. So checking
# current_allocation > 0 ensures we only extend an allocation thats started.
current_allocation += 1
longest_free = max(longest_free, current_free)
#if current_free > 0:
# print("{} bytes free".format(current_free * BYTES_PER_BLOCK))
if current_allocation > 0:
save_allocated_block(pool_length, current_allocation)
def is_qstr(obj):
return obj & 0xff800007 == 0x00000006
def find_qstr(qstr_index):
pool_ptr = last_pool
if not is_qstr(qstr_index):
return "object"
qstr_index >>= 3
while pool_ptr != 0:
if pool_ptr > ram_start:
if pool_ptr in block_data:
pool = block_data[pool_ptr]
prev, total_prev_len, alloc, length = struct.unpack_from("<IIII", pool)
else:
print("missing qstr pool: {:08x}".format(pool_ptr))
return "missing"
else:
rom_offset = pool_ptr - rom_start
prev, total_prev_len, alloc, length = struct.unpack_from("<IIII", rom[rom_offset:rom_offset+32])
pool = rom[rom_offset:rom_offset+length*4]
if qstr_index >= total_prev_len:
offset = (qstr_index - total_prev_len) * 4 + 16
start = struct.unpack_from("<I", pool, offset=offset)[0]
if start < heap_start:
start -= rom_start
if start > len(rom):
return "more than rom: {:x}".format(start + rom_start)
qstr_hash, qstr_len = struct.unpack("<BB", rom[start:start+2])
return rom[start+2:start+2+qstr_len].decode("utf-8")
else:
if start > heap_start + len(heap):
return "out of range: {:x}".format(start)
local = start - heap_start
qstr_hash, qstr_len = struct.unpack("<BB", heap[local:local+2])
return heap[local+2:local+2+qstr_len].decode("utf-8")
pool_ptr = prev
return "unknown"
def format(obj):
if obj & 1 != 0:
return obj >> 1
if is_qstr(obj):
return find_qstr(obj)
else:
return "0x{:08x}".format(obj)
for block in sorted(map_element_blocks):
if block == 0:
continue
try:
node = ownership_graph.get_node(block)
except KeyError:
print("Unable to find memory block for 0x{:08x}. Is there something running?".format(block))
continue
if block not in block_data:
continue
data = block_data[block]
cells = []
for i in range(len(data) // 8):
key, value = struct.unpack_from("<II", data, offset=(i * 8))
if key == MP_OBJ_NULL or key == MP_OBJ_SENTINEL:
cells.append(("", " "))
else:
cells.append((key, format(key)))
if value in block_data:
edge = ownership_graph.get_edge(block, value)
edge.attr["tailport"] = str(key)
rows = ""
for i in range(len(cells) // 2):
rows += "<tr><td port=\"{}\">{}</td><td port=\"{}\">{}</td></tr>".format(
cells[2*i][0],
cells[2*i][1],
cells[2*i+1][0],
cells[2*i+1][1])
node.attr["shape"] = "plaintext"
node.attr["style"] = "invisible"
node.attr["label"] = "<<table bgcolor=\"gold\" border=\"1\" cellpadding=\"0\" cellspacing=\"0\"><tr><td colspan=\"2\">0x{:08x}</td></tr>{}</table>>".format(block, rows)
for node, degree in ownership_graph.in_degree_iter():
if degree == 0:
address_bytes = struct.pack("<I", int(node))
location = -1
for _ in range(ram.count(address_bytes)):
location = ram.find(address_bytes, location + 1)
pointer_location = ram_start + location
source = "0x{:08x}".format(pointer_location)
if pointer_location in symbol_lookup:
source = symbol_lookup[pointer_location]
if source in manual_symbol_map:
source = manual_symbol_map[source]
if "readline_hist" in source:
string_blocks.append(int(node))
ownership_graph.add_edge(source, node)
for block in string_blocks:
if block == 0:
continue
node = ownership_graph.get_node(block)
node.attr["fillcolor"] = "hotpink"
if block in block_data:
raw_string = block_data[block]
else:
if start > heap_start + len(heap):
return "out of range: {:x}".format(start)
local = start - heap_start
qstr_hash, qstr_len = struct.unpack("<BB", heap[local:local+2])
return heap[local+2:local+2+qstr_len].decode("utf-8")
pool_ptr = prev
return "unknown"
def format(obj):
if obj & 1 != 0:
return obj >> 1
if is_qstr(obj):
return find_qstr(obj)
else:
return "0x{:08x}".format(obj)
for block in sorted(map_element_blocks):
try:
node = ownership_graph.get_node(block)
except KeyError:
print("Unable to find memory block for 0x{:08x}. Is there something running?".format(block))
continue
#node.attr["fillcolor"] = "gold"
data = block_data[block]
#print("0x{:08x}".format(block))
cells = []
for i in range(len(data) // 8):
key, value = struct.unpack_from("<II", data, offset=(i * 8))
if key == MP_OBJ_NULL or key == MP_OBJ_SENTINEL:
#print(" <empty slot>")
cells.append(("", " "))
else:
#print(" {}, {}".format(format(key), format(value)))
cells.append((key, format(key)))
if value in block_data:
edge = ownership_graph.get_edge(block, value)
edge.attr["tailport"] = str(key)
rows = ""
for i in range(len(cells) // 2):
rows += "<tr><td port=\"{}\">{}</td><td port=\"{}\">{}</td></tr>".format(
cells[2*i][0],
cells[2*i][1],
cells[2*i+1][0],
cells[2*i+1][1])
node.attr["shape"] = "plaintext"
node.attr["style"] = "invisible"
node.attr["label"] = "<<table bgcolor=\"gold\" border=\"1\" cellpadding=\"0\" cellspacing=\"0\"><tr><td colspan=\"2\">0x{:08x}</td></tr>{}</table>>".format(block, rows)
for node, degree in ownership_graph.in_degree_iter():
if degree == 0:
address_bytes = struct.pack("<I", int(node))
location = -1
for _ in range(ram.count(address_bytes)):
location = ram.find(address_bytes, location + 1)
pointer_location = ram_start + location
source = "0x{:08x}".format(pointer_location)
if pointer_location in symbol_lookup:
source = symbol_lookup[pointer_location]
if source in manual_symbol_map:
source = manual_symbol_map[source]
if "readline_hist" in source:
string_blocks.append(int(node))
ownership_graph.add_edge(source, node)
for block in string_blocks:
node = ownership_graph.get_node(block)
node.attr["fillcolor"] = "hotpink"
string = block_data[block].decode('utf-8')
wrapped = []
for i in range(0, len(string), 16):
wrapped.append(string[i:i+16])
node.attr["label"] = "\n".join(wrapped)
node.attr["style"] = "filled"
node.attr["fontname"] = "FiraCode-Medium"
node.attr["fontpath"] = "/Users/tannewt/Library/Fonts/"
node.attr["fontsize"] = 8
for block in bytecode_blocks:
node = ownership_graph.get_node(block)
node.attr["fillcolor"] = "lightseagreen"
data = block_data[block]
prelude = Prelude(io.BufferedReader(io.BytesIO(data)))
node.attr["shape"] = "plaintext"
node.attr["style"] = "invisible"
code_info_size = prelude.code_info_size
rows = ""
remaining_bytecode = len(data) - 16
while code_info_size >= 16:
rows += "<tr><td colspan=\"16\" bgcolor=\"palegreen\" height=\"18\" width=\"80\"></td></tr>"
code_info_size -= 16
remaining_bytecode -= 16
if code_info_size > 0:
rows += ("<tr><td colspan=\"{}\" bgcolor=\"palegreen\" height=\"18\" width=\"{}\"></td>"
"<td colspan=\"{}\" bgcolor=\"seagreen\" height=\"18\" width=\"{}\"></td></tr>"
).format(code_info_size, code_info_size * (80 / 16), (16 - code_info_size), (80 / 16) * (16 - code_info_size))
remaining_bytecode -= 16
for i in range(remaining_bytecode // 16):
rows += "<tr><td colspan=\"16\" bgcolor=\"seagreen\" height=\"18\" width=\"80\"></td></tr>"
node.attr["label"] = "<<table border=\"1\" cellspacing=\"0\"><tr><td colspan=\"16\" bgcolor=\"lightseagreen\" height=\"18\" width=\"80\">0x{:08x}</td></tr>{}</table>>".format(block, rows)
for block in qstr_chunks:
if block not in block_data:
ownership_graph.delete_node(block)
continue
data = block_data[block]
string = ""
offset = 0
while offset < len(data) - 1:
qstr_hash, qstr_len = struct.unpack_from("<BB", data, offset=offset)
if qstr_hash == 0:
string += " " * (len(data) - offset)
offset = len(data)
continue
offset += 2 + qstr_len + 1
string += " " + data[offset - qstr_len - 1: offset - 1].decode("utf-8")
#print(string)
wrapped = []
for i in range(0, len(string), 16):
wrapped.append(html.escape(string[i:i+16]))
node = ownership_graph.get_node(block)
node.attr["label"] = "<<table border=\"1\" cellspacing=\"0\" bgcolor=\"lightsalmon\" width=\"80\"><tr><td height=\"18\" >0x{:08x}</td></tr><tr><td height=\"{}\" >{}</td></tr></table>>".format(block, 18 * (len(wrapped) - 1), "<br/>".join(wrapped))
node.attr["fontname"] = "FiraCode-Medium"
node.attr["fontpath"] = "/Users/tannewt/Library/Fonts/"
node.attr["fontsize"] = 8
print("Total free space:", BYTES_PER_BLOCK * total_free)
print("Longest free space:", BYTES_PER_BLOCK * longest_free)
with open("heap.dot", "w") as f:
f.write(ownership_graph.string())
ownership_graph.layout(prog="dot")
ownership_graph.draw("heap.png")
print("Unable to find memory block for string at 0x{:08x}.".format(block))
continue
try:
raw_string = block_data[block].decode('utf-8')
except:
raw_string = str(block_data[block])
wrapped = []
for i in range(0, len(raw_string), 16):
wrapped.append(raw_string[i:i+16])
node.attr["label"] = "\n".join(wrapped)
node.attr["style"] = "filled"
node.attr["fontname"] = "FiraCode-Medium"
node.attr["fontpath"] = "/Users/tannewt/Library/Fonts/"
node.attr["fontsize"] = 8
node.attr["height"] = len(wrapped) * 0.25
for block in bytecode_blocks:
node = ownership_graph.get_node(block)
node.attr["fillcolor"] = "lightseagreen"
if block in block_data:
data = block_data[block]
else:
print("Unable to find memory block for bytecode at 0x{:08x}.".format(block))
continue
prelude = Prelude(io.BufferedReader(io.BytesIO(data)))
node.attr["shape"] = "plaintext"
node.attr["style"] = "invisible"
code_info_size = prelude.code_info_size
rows = ""
remaining_bytecode = len(data) - 16
while code_info_size >= 16:
rows += "<tr><td colspan=\"16\" bgcolor=\"palegreen\" height=\"18\" width=\"80\"></td></tr>"
code_info_size -= 16
remaining_bytecode -= 16
if code_info_size > 0:
rows += ("<tr><td colspan=\"{}\" bgcolor=\"palegreen\" height=\"18\" width=\"{}\"></td>"
"<td colspan=\"{}\" bgcolor=\"seagreen\" height=\"18\" width=\"{}\"></td></tr>"
).format(code_info_size, code_info_size * (80 / 16), (16 - code_info_size), (80 / 16) * (16 - code_info_size))
remaining_bytecode -= 16
for i in range(remaining_bytecode // 16):
rows += "<tr><td colspan=\"16\" bgcolor=\"seagreen\" height=\"18\" width=\"80\"></td></tr>"
node.attr["label"] = "<<table border=\"1\" cellspacing=\"0\"><tr><td colspan=\"16\" bgcolor=\"lightseagreen\" height=\"18\" width=\"80\">0x{:08x}</td></tr>{}</table>>".format(block, rows)
for block in qstr_chunks:
if block not in block_data:
ownership_graph.delete_node(block)
continue
data = block_data[block]
qstrs_in_chunk = ""
offset = 0
while offset < len(data) - 1:
qstr_hash, qstr_len = struct.unpack_from("<BB", data, offset=offset)
if qstr_hash == 0:
qstrs_in_chunk += " " * (len(data) - offset)
offset = len(data)
continue
offset += 2 + qstr_len + 1
qstrs_in_chunk += " " + data[offset - qstr_len - 1: offset - 1].decode("utf-8")
printable_qstrs = ""
for i in range(len(qstrs_in_chunk)):
c = qstrs_in_chunk[i]
if c not in string.printable or c in "\v\f":
printable_qstrs += ""
else:
printable_qstrs += qstrs_in_chunk[i]
wrapped = []
for i in range(0, len(printable_qstrs), 16):
wrapped.append(html.escape(printable_qstrs[i:i+16]))
node = ownership_graph.get_node(block)
node.attr["label"] = "<<table border=\"1\" cellspacing=\"0\" bgcolor=\"lightsalmon\" width=\"80\"><tr><td height=\"18\" >0x{:08x}</td></tr><tr><td height=\"{}\" >{}</td></tr></table>>".format(block, 18 * (len(wrapped) - 1), "<br/>".join(wrapped))
node.attr["fontname"] = "FiraCode-Medium"
node.attr["fontpath"] = "/Users/tannewt/Library/Fonts/"
node.attr["fontsize"] = 8
print("Total free space:", BYTES_PER_BLOCK * total_free)
print("Longest free space:", BYTES_PER_BLOCK * longest_free)
# First render the graph of objects on the heap.
if draw_heap_ownership:
ownership_graph.layout(prog="dot")
fn = os.path.join(output_directory, "heap_ownership{:04d}.png".format(snapshot_num))
print(fn)
ownership_graph.draw(fn)
# Second, render the heap layout in memory order.
for node in ownership_graph:
try:
address = int(node.name)
except ValueError:
ownership_graph.remove_node(node)
continue
block = (address - pool_start) // 16
x = block // 64
y = 64 - block % 64
try:
height = float(node.attr["height"])
except:
height = 0.25
#print(hex(address), "height", height, y)
#if address in block_data:
# print(hex(address), block, len(block_data[address]), x, y, height)
node.attr["pos"] = "{},{}".format(x * 80, (y - (height - 0.25) * 2) * 18) # in inches
# Clear edge positioning from ownership graph layout.
if draw_heap_ownership:
for edge in ownership_graph.iteredges():
del edge.attr["pos"]
# Reformat block nodes so they are the correct size and do not have keys in them.
for block in sorted(map_element_blocks):
try:
node = ownership_graph.get_node(block)
except KeyError:
if block != 0:
print("Unable to find memory block for 0x{:08x}. Is there something running?".format(block))
continue
#node.attr["fillcolor"] = "gold"
if block not in block_data:
continue
data = block_data[block]
#print("0x{:08x}".format(block))
cells = []
for i in range(len(data) // 8):
key, value = struct.unpack_from("<II", data, offset=(i * 8))
if key == MP_OBJ_NULL or key == MP_OBJ_SENTINEL:
#print(" <empty slot>")
cells.append(("", " "))
else:
#print(" {}, {}".format(format(key), format(value)))
cells.append((key, ""))
if value in block_data:
edge = ownership_graph.get_edge(block, value)
edge.attr["tailport"] = str(key)
rows = ""
for i in range(len(cells) // 2):
rows += "<tr><td port=\"{}\" height=\"18\" width=\"40\">{}</td><td port=\"{}\" height=\"18\" width=\"40\">{}</td></tr>".format(
cells[2*i][0],
cells[2*i][1],
cells[2*i+1][0],
cells[2*i+1][1])
node.attr["label"] = "<<table bgcolor=\"gold\" border=\"1\" cellpadding=\"0\" cellspacing=\"0\">{}</table>>".format(rows)
ownership_graph.add_node("center", pos="{},{}".format(total_width // 2 - 40, total_height // 2), shape="plaintext", label=" ")
ownership_graph.graph_attr["viewport"] = "{},{},1,{}".format(total_width, total_height, "center")
ownership_graph.has_layout = True
if draw_heap_layout:
fn = os.path.join(output_directory, "heap_layout{:04d}.png".format(snapshot_num))
print(fn)
ownership_graph.draw(fn)
if __name__ == "__main__":
do_all_the_things()

@ -13,46 +13,79 @@ bytecode_format_sizes = {
}
bytecodes = {
0x00: {"name": "MP_BC_LOAD_FAST_MULTI",
"format": "MP_OPCODE_BYTE"},
0x10: {"name": "MP_BC_LOAD_CONST_FALSE",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_LOAD_CONST_NONE (0x11)
#define MP_BC_LOAD_CONST_TRUE (0x12)
#define MP_BC_LOAD_CONST_SMALL_INT (0x14) // signed var-int
#define MP_BC_LOAD_CONST_STRING (0x16) // qstr
0x11: {"name": "MP_BC_LOAD_CONST_NONE",
"format": "MP_OPCODE_BYTE"},
0x12: {"name": "MP_BC_LOAD_CONST_TRUE",
"format": "MP_OPCODE_BYTE"},
0x14: {"name": "MP_BC_LOAD_CONST_SMALL_INT",
"format": "MP_OPCODE_VAR_UINT"},
0x16: {"name": "MP_BC_LOAD_CONST_STRING",
"format": "MP_OPCODE_QSTR"},
0x17: {"name": "MP_BC_LOAD_CONST_OBJ",
"format": "MP_OPCODE_VAR_UINT"},
#define MP_BC_LOAD_CONST_OBJ (0x17) // ptr
#define MP_BC_LOAD_NULL (0x18)
0x18: {"name": "MP_BC_LOAD_NULL",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_LOAD_FAST_N (0x19) // uint
#define MP_BC_LOAD_DEREF (0x1a) // uint
#define MP_BC_LOAD_NAME (0x1b) // qstr
#define MP_BC_LOAD_GLOBAL (0x1c) // qstr
#define MP_BC_LOAD_ATTR (0x1d) // qstr
#define MP_BC_LOAD_METHOD (0x1e) // qstr
#define MP_BC_LOAD_SUPER_METHOD (0x1f) // qstr
0x1a: {"name": "MP_BC_LOAD_DEREF",
"format": "MP_OPCODE_VAR_UINT"},
0x1b: {"name": "MP_BC_LOAD_NAME",
"format": "MP_OPCODE_QSTR"},
0x1c: {"name": "MP_BC_LOAD_GLOBAL",
"format": "MP_OPCODE_QSTR"},
0x1d: {"name": "MP_BC_LOAD_ATTR",
"format": "MP_OPCODE_QSTR"},
0x1e: {"name": "MP_BC_LOAD_METHOD",
"format": "MP_OPCODE_QSTR"},
0x1f: {"name": "MP_BC_LOAD_SUPER_METHOD",
"format": "MP_OPCODE_QSTR"},
0x20: {"name": "MP_BC_LOAD_BUILD_CLASS",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_LOAD_BUILD_CLASS (0x20)
#define MP_BC_LOAD_SUBSCR (0x21)
0x21: {"name": "MP_BC_LOAD_SUBSCR",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_STORE_FAST_N (0x22) // uint
#define MP_BC_STORE_DEREF (0x23) // uint
#define MP_BC_STORE_NAME (0x24) // qstr
#define MP_BC_STORE_GLOBAL (0x25) // qstr
#define MP_BC_STORE_ATTR (0x26) // qstr
#define MP_BC_STORE_SUBSCR (0x27)
0x24: {"name": "MP_BC_STORE_NAME",
"format": "MP_OPCODE_QSTR"},
0x25: {"name": "MP_BC_STORE_GLOBAL",
"format": "MP_OPCODE_QSTR"},
0x26: {"name": "MP_BC_STORE_ATTR",
"format": "MP_OPCODE_QSTR"},
0x27: {"name": "MP_BC_LOAD_SUBSCR",
"format": "MP_OPCODE_BYTE"},
0x28: {"name": "MP_BC_DELETE_FAST",
"format": "MP_OPCODE_VAR_UINT"},
#define MP_BC_DELETE_FAST (0x28) // uint
#define MP_BC_DELETE_DEREF (0x29) // uint
#define MP_BC_DELETE_NAME (0x2a) // qstr
#define MP_BC_DELETE_GLOBAL (0x2b) // qstr
#define MP_BC_DUP_TOP (0x30)
0x30: {"name": "MP_BC_DUP_TOP",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_DUP_TOP_TWO (0x31)
#define MP_BC_POP_TOP (0x32)
#define MP_BC_ROT_TWO (0x33)
#define MP_BC_ROT_THREE (0x34)
0x32: {"name": "MP_BC_POP_TOP",
"format": "MP_OPCODE_BYTE"},
0x33: {"name": "MP_BC_ROT_TWO",
"format": "MP_OPCODE_BYTE"},
0x34: {"name": "MP_BC_ROT_THREE",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_JUMP (0x35) // rel byte code offset, 16-bit signed, in excess
#define MP_BC_POP_JUMP_IF_TRUE (0x36) // rel byte code offset, 16-bit signed, in excess
#define MP_BC_POP_JUMP_IF_FALSE (0x37) // rel byte code offset, 16-bit signed, in excess
0x35: {"name": "MP_BC_JUMP",
"format": "MP_OPCODE_OFFSET"},
0x36: {"name": "MP_BC_POP_JUMP_IF_TRUE",
"format": "MP_OPCODE_OFFSET"},
0x37: {"name": "MP_BC_POP_JUMP_IF_FALSE",
"format": "MP_OPCODE_OFFSET"},
#define MP_BC_JUMP_IF_TRUE_OR_POP (0x38) // rel byte code offset, 16-bit signed, in excess
#define MP_BC_JUMP_IF_FALSE_OR_POP (0x39) // rel byte code offset, 16-bit signed, in excess
#define MP_BC_SETUP_WITH (0x3d) // rel byte code offset, 16-bit unsigned
@ -62,44 +95,172 @@ bytecodes = {
#define MP_BC_END_FINALLY (0x41)
#define MP_BC_GET_ITER (0x42)
#define MP_BC_FOR_ITER (0x43) // rel byte code offset, 16-bit unsigned
#define MP_BC_POP_BLOCK (0x44)
0x43: {"name": "MP_BC_FOR_ITER",
"format": "MP_OPCODE_OFFSET"},
0x44: {"name": "MP_BC_POP_BLOCK",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_POP_EXCEPT (0x45)
#define MP_BC_UNWIND_JUMP (0x46) // rel byte code offset, 16-bit signed, in excess; then a byte
#define MP_BC_GET_ITER_STACK (0x47)
0x47: {"name": "MP_BC_GET_ITER_STACK",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_BUILD_TUPLE (0x50) // uint
#define MP_BC_BUILD_LIST (0x51) // uint
#define MP_BC_BUILD_MAP (0x53) // uint
#define MP_BC_STORE_MAP (0x54)
0x50: {"name": "MP_BC_BUILD_TUPLE",
"format": "MP_OPCODE_VAR_UINT"},
0x51: {"name": "MP_BC_BUILD_LIST",
"format": "MP_OPCODE_VAR_UINT"},
0x53: {"name": "MP_BC_BUILD_MAP",
"format": "MP_OPCODE_VAR_UINT"},
0x54: {"name": "MP_BC_STORE_MAP",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_BUILD_SET (0x56) // uint
#define MP_BC_BUILD_SLICE (0x58) // uint
#define MP_BC_STORE_COMP (0x57) // uint
0x57: {"name": "MP_BC_STORE_COMP",
"format": "MP_OPCODE_VAR_UINT"},
#define MP_BC_UNPACK_SEQUENCE (0x59) // uint
#define MP_BC_UNPACK_EX (0x5a) // uint
#define MP_BC_RETURN_VALUE (0x5b)
#define MP_BC_RAISE_VARARGS (0x5c) // byte
0x5b: {"name": "MP_BC_RETURN_VALUE",
"format": "MP_OPCODE_BYTE"},
0x5c: {"name": "MP_BC_RAISE_VARARGS",
"format": "MP_OPCODE_BYTE_EXTRA"},
#define MP_BC_YIELD_VALUE (0x5d)
#define MP_BC_YIELD_FROM (0x5e)
#define MP_BC_MAKE_FUNCTION (0x60) // uint
#define MP_BC_MAKE_FUNCTION_DEFARGS (0x61) // uint
#define MP_BC_MAKE_CLOSURE (0x62) // uint
#define MP_BC_MAKE_CLOSURE_DEFARGS (0x63) // uint
#define MP_BC_CALL_FUNCTION (0x64) // uint
#define MP_BC_CALL_FUNCTION_VAR_KW (0x65) // uint
#define MP_BC_CALL_METHOD (0x66) // uint
#define MP_BC_CALL_METHOD_VAR_KW (0x67) // uint
#define MP_BC_IMPORT_NAME (0x68) // qstr
0x60: {"name": "MP_BC_MAKE_FUNCTION",
"format": "MP_OPCODE_VAR_UINT"},
0x61: {"name": "MP_BC_MAKE_FUNCTION_DEFARGS",
"format": "MP_OPCODE_VAR_UINT"},
0x62: {"name": "MP_BC_MAKE_CLOSURE",
"format": "MP_OPCODE_VAR_UINT_EXTRA"},
0x63: {"name": "MP_BC_MAKE_CLOSURE",
"format": "MP_OPCODE_VAR_UINT_EXTRA"},
0x64: {"name": "MP_BC_CALL_FUNCTION",
"format": "MP_OPCODE_VAR_UINT"},
0x65: {"name": "MP_BC_CALL_FUNCTION_VAR_KW",
"format": "MP_OPCODE_VAR_UINT"},
0x66: {"name": "MP_BC_CALL_METHOD",
"format": "MP_OPCODE_VAR_UINT"},
0x67: {"name": "MP_BC_CALL_METHOD_VAR_KW",
"format": "MP_OPCODE_VAR_UINT"},
0x68: {"name": "MP_BC_IMPORT_NAME",
"format": "MP_OPCODE_QSTR"},
0x69: {"name": "MP_BC_IMPORT_FROM",
"format": "MP_OPCODE_QSTR"},
#define MP_BC_IMPORT_FROM (0x69) // qstr
#define MP_BC_IMPORT_STAR (0x6a)
#define MP_BC_LOAD_CONST_SMALL_INT_MULTI (0x70) // + N(64)
0x7f: {"name": "MP_BC_LOAD_CONST_SMALL_INT_MULTI -1",
"format": "MP_OPCODE_BYTE"},
0x80: {"name": "MP_BC_LOAD_CONST_SMALL_INT_MULTI 0",
"format": "MP_OPCODE_BYTE"},
0x81: {"name": "MP_BC_LOAD_CONST_SMALL_INT_MULTI 1",
"format": "MP_OPCODE_BYTE"},
0x82: {"name": "MP_BC_LOAD_CONST_SMALL_INT_MULTI 2",
"format": "MP_OPCODE_BYTE"},
0x83: {"name": "MP_BC_LOAD_CONST_SMALL_INT_MULTI 3",
"format": "MP_OPCODE_BYTE"},
0x84: {"name": "MP_BC_LOAD_CONST_SMALL_INT_MULTI 4",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_LOAD_FAST_MULTI (0xb0) // + N(16)
0xb0: {"name": "MP_BC_LOAD_FAST_MULTI 0",
"format": "MP_OPCODE_BYTE"},
0xb1: {"name": "MP_BC_LOAD_FAST_MULTI 1",
"format": "MP_OPCODE_BYTE"},
0xb2: {"name": "MP_BC_LOAD_FAST_MULTI 2",
"format": "MP_OPCODE_BYTE"},
0xb3: {"name": "MP_BC_LOAD_FAST_MULTI 3",
"format": "MP_OPCODE_BYTE"},
0xb4: {"name": "MP_BC_LOAD_FAST_MULTI 4",
"format": "MP_OPCODE_BYTE"},
0xb5: {"name": "MP_BC_LOAD_FAST_MULTI 5",
"format": "MP_OPCODE_BYTE"},
0xb6: {"name": "MP_BC_LOAD_FAST_MULTI 6",
"format": "MP_OPCODE_BYTE"},
0xb7: {"name": "MP_BC_LOAD_FAST_MULTI 7",
"format": "MP_OPCODE_BYTE"},
0xb8: {"name": "MP_BC_LOAD_FAST_MULTI 8",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_STORE_FAST_MULTI (0xc0) // + N(16)
0xc0: {"name": "MP_BC_STORE_FAST_MULTI 0",
"format": "MP_OPCODE_BYTE"},
0xc1: {"name": "MP_BC_STORE_FAST_MULTI 1",
"format": "MP_OPCODE_BYTE"},
0xc2: {"name": "MP_BC_STORE_FAST_MULTI 2",
"format": "MP_OPCODE_BYTE"},
0xc3: {"name": "MP_BC_STORE_FAST_MULTI 3",
"format": "MP_OPCODE_BYTE"},
0xc4: {"name": "MP_BC_STORE_FAST_MULTI 4",
"format": "MP_OPCODE_BYTE"},
0xc5: {"name": "MP_BC_STORE_FAST_MULTI 5",
"format": "MP_OPCODE_BYTE"},
0xc6: {"name": "MP_BC_STORE_FAST_MULTI 6",
"format": "MP_OPCODE_BYTE"},
0xc7: {"name": "MP_BC_STORE_FAST_MULTI 7",
"format": "MP_OPCODE_BYTE"},
#define MP_BC_UNARY_OP_MULTI (0xd0) // + op(<MP_UNARY_OP_NUM_BYTECODE)
# // 9 relational operations, should return a bool
0xd7: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_LESS",
"format": "MP_OPCODE_BYTE"},
0xd8: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_MORE",
"format": "MP_OPCODE_BYTE"},
0xd9: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_EQUAL",
"format": "MP_OPCODE_BYTE"},
0xda: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_LESS_EQUAL",
"format": "MP_OPCODE_BYTE"},
0xdb: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_MORE_EQUAL",
"format": "MP_OPCODE_BYTE"},
0xdc: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_NOT_EQUAL",
"format": "MP_OPCODE_BYTE"},
# dc: MP_BINARY_OP_NOT_EQUAL,
# dd: MP_BINARY_OP_IN,
# de: MP_BINARY_OP_IS,
# df: MP_BINARY_OP_EXCEPTION_MATCH,
#
# // 12 inplace arithmetic operations
# e0: MP_BINARY_OP_INPLACE_OR,
# e1: MP_BINARY_OP_INPLACE_XOR,
# e2: MP_BINARY_OP_INPLACE_AND,
# e3: MP_BINARY_OP_INPLACE_LSHIFT,
# e4: MP_BINARY_OP_INPLACE_RSHIFT,
# e5: MP_BINARY_OP_INPLACE_ADD,
0xe5: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_INPLACE_ADD",
"format": "MP_OPCODE_BYTE"},
0xe6: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_INPLACE_SUBTRACT",
"format": "MP_OPCODE_BYTE"},
# e7: MP_BINARY_OP_INPLACE_MULTIPLY,
# e8: MP_BINARY_OP_INPLACE_FLOOR_DIVIDE,
# e9: MP_BINARY_OP_INPLACE_TRUE_DIVIDE,
# ea: MP_BINARY_OP_INPLACE_MODULO,
# eb: MP_BINARY_OP_INPLACE_POWER,
#
# // 12 normal arithmetic operations
# ec: MP_BINARY_OP_OR,
# ed: MP_BINARY_OP_XOR,
# ee: MP_BINARY_OP_AND,
# ef: MP_BINARY_OP_LSHIFT,
# f0: MP_BINARY_OP_RSHIFT,
#define MP_BC_BINARY_OP_MULTI (0xd7) // + op(<MP_BINARY_OP_NUM_BYTECODE)
0xf1: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_ADD",
"format": "MP_OPCODE_BYTE"},
0xf2: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_SUBTRACT",
"format": "MP_OPCODE_BYTE"},
0xf3: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_MULTIPLY",
"format": "MP_OPCODE_BYTE"},
0xf4: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_FLOOR_DIVIDE",
"format": "MP_OPCODE_BYTE"},
0xf5: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_TRUE_DIVIDE",
"format": "MP_OPCODE_BYTE"},
0xf6: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_MODULO",
"format": "MP_OPCODE_BYTE"},
0xf7: {"name": "MP_BC_BINARY_OP_MULTI MP_BINARY_OP_POWER",
"format": "MP_OPCODE_BYTE"},
}
def read_uint(encoded_uint, peek=False):
@ -149,40 +310,94 @@ class RawCode:
def __init__(self, encoded_raw_code):
bc_len = read_uint(encoded_raw_code)
bc = encoded_raw_code.read(bc_len)
prelude = Prelude(io.BufferedReader(io.BytesIO(bc)))
encoded_code_info = bc[:prelude.code_info_size]
bc_start = prelude.code_info_size
while bc[bc_start] == 0xff:
bc_start += 1
bc = bc[bc_start:]
bc = io.BufferedReader(io.BytesIO(bc))
prelude = Prelude(bc)
encoded_code_info = bc.read(prelude.code_info_size)
bc.read(1)
while bc.peek(1)[0] == 0xff:
bc.read(1)
bc = bytearray(bc.read())
#print(encoded_code_info, bc)
self.qstrs = []
self.simple_name = self._load_qstr(encoded_raw_code)
self.source_file = self._load_qstr(encoded_raw_code)
# the simple name and source file qstr indexes get written back into the byte code somehow
#print(bc)
self._load_bytecode_qstrs(encoded_raw_code, bc)
#print(encoded_raw_code.peek(20)[:20])
n_obj = read_uint(encoded_raw_code)
n_raw_code = read_uint(encoded_raw_code)
self.const_table = []
for i in range(prelude.n_pos_args + prelude.n_kwonly_args):
self.const_table.append(self._load_qstr(encoded_raw_code))
print("load args", self.const_table[-1])
for i in range(n_obj):
self.const_table.append(self._load_obj(encoded_raw_code))
print("load obj", self.const_table[-1])
for i in range(n_raw_code):
print("load raw code")
self.const_table.append(RawCode(encoded_raw_code))
print(self.qstrs[self.simple_name], self.qstrs[self.source_file])
print(binascii.hexlify(encoded_raw_code.peek(20)[:20]))
#print(binascii.hexlify(encoded_raw_code.peek(20)[:20]))
def _load_qstr(self, encoded_qstr):
string_len = read_uint(encoded_qstr)
string = encoded_qstr.read(string_len).decode("utf-8")
print(string)
if string in self.qstrs:
return self.qstr.index(string)
return self.qstrs.index(string)
new_index = len(self.qstrs)
self.qstrs.append(string)
return new_index
def _load_bytecode_qstrs(encoded_raw_code, bytecode):
def _load_obj(self, encoded_obj):
obj_type = encoded_obj.read(1)
if obj_type == b'e':
return "..."
else:
str_len = read_uint(encoded_obj)
s = encoded_obj.read(str_len)
if obj_type == b's':
return s.decode("utf-8")
elif obj_type == b'b':
return s
elif obj_type == b'i':
return int(s)