refactor: clean up comments and docstrings
This commit is contained in:
34
pack.py
34
pack.py
@@ -42,10 +42,7 @@ def make_payload_block(filename: str, raw: bytes) -> bytes:
|
|||||||
return header + compressed + crc
|
return header + compressed + crc
|
||||||
|
|
||||||
|
|
||||||
# ── PNG chunk helpers ─────────────────────────────────────────────
|
|
||||||
|
|
||||||
def parse_chunks(data: bytes):
|
def parse_chunks(data: bytes):
|
||||||
"""Parse PNG data into list of (chunk_type, chunk_data)."""
|
|
||||||
if data[:8] != PNG_SIG:
|
if data[:8] != PNG_SIG:
|
||||||
raise ValueError("Not a valid PNG")
|
raise ValueError("Not a valid PNG")
|
||||||
pos = 8
|
pos = 8
|
||||||
@@ -60,22 +57,18 @@ def parse_chunks(data: bytes):
|
|||||||
|
|
||||||
|
|
||||||
def make_png_chunk(chunk_type: bytes, chunk_data: bytes) -> bytes:
|
def make_png_chunk(chunk_type: bytes, chunk_data: bytes) -> bytes:
|
||||||
"""Build a valid PNG chunk: length + type + data + CRC32."""
|
|
||||||
length = struct.pack(">I", len(chunk_data))
|
length = struct.pack(">I", len(chunk_data))
|
||||||
crc = struct.pack(">I", zlib.crc32(chunk_type + chunk_data) & 0xFFFFFFFF)
|
crc = struct.pack(">I", zlib.crc32(chunk_type + chunk_data) & 0xFFFFFFFF)
|
||||||
return length + chunk_type + chunk_data + crc
|
return length + chunk_type + chunk_data + crc
|
||||||
|
|
||||||
|
|
||||||
def assemble_png(chunks) -> bytes:
|
def assemble_png(chunks) -> bytes:
|
||||||
"""Reassemble PNG from list of (type, data) tuples."""
|
|
||||||
result = bytearray(PNG_SIG)
|
result = bytearray(PNG_SIG)
|
||||||
for chunk_type, chunk_data in chunks:
|
for chunk_type, chunk_data in chunks:
|
||||||
result += make_png_chunk(chunk_type, chunk_data)
|
result += make_png_chunk(chunk_type, chunk_data)
|
||||||
return bytes(result)
|
return bytes(result)
|
||||||
|
|
||||||
|
|
||||||
# ── Chunk mode ────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def pack_chunk(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
def pack_chunk(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
||||||
chunks = parse_chunks(cover_bytes)
|
chunks = parse_chunks(cover_bytes)
|
||||||
|
|
||||||
@@ -95,8 +88,6 @@ def pack_chunk(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
|||||||
return assemble_png(result)
|
return assemble_png(result)
|
||||||
|
|
||||||
|
|
||||||
# ── LSB mode ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _paeth(a, b, c):
|
def _paeth(a, b, c):
|
||||||
p = a + b - c
|
p = a + b - c
|
||||||
pa, pb, pc = abs(p - a), abs(p - b), abs(p - c)
|
pa, pb, pc = abs(p - a), abs(p - b), abs(p - c)
|
||||||
@@ -130,7 +121,6 @@ def _unfilter_row(ftype, row, prev, bpp):
|
|||||||
def pack_lsb(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
def pack_lsb(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
||||||
chunks = parse_chunks(cover_bytes)
|
chunks = parse_chunks(cover_bytes)
|
||||||
|
|
||||||
# ── IHDR ──
|
|
||||||
ihdr = next(cd for ct, cd in chunks if ct == b"IHDR")
|
ihdr = next(cd for ct, cd in chunks if ct == b"IHDR")
|
||||||
width, height = struct.unpack(">II", ihdr[:8])
|
width, height = struct.unpack(">II", ihdr[:8])
|
||||||
bit_depth, color_type = ihdr[8], ihdr[9]
|
bit_depth, color_type = ihdr[8], ihdr[9]
|
||||||
@@ -144,11 +134,9 @@ def pack_lsb(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
|||||||
channels = 3 if color_type == 2 else 4
|
channels = 3 if color_type == 2 else 4
|
||||||
bpp = channels
|
bpp = channels
|
||||||
|
|
||||||
# ── Collect & decompress IDAT ──
|
|
||||||
idat_data = b"".join(cd for ct, cd in chunks if ct == b"IDAT")
|
idat_data = b"".join(cd for ct, cd in chunks if ct == b"IDAT")
|
||||||
raw = zlib.decompress(idat_data)
|
raw = zlib.decompress(idat_data)
|
||||||
|
|
||||||
# ── Unfilter ──
|
|
||||||
stride = width * bpp
|
stride = width * bpp
|
||||||
pixels = bytearray()
|
pixels = bytearray()
|
||||||
prev = bytearray(stride)
|
prev = bytearray(stride)
|
||||||
@@ -159,7 +147,6 @@ def pack_lsb(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
|||||||
prev = _unfilter_row(ftype, row, prev, bpp)
|
prev = _unfilter_row(ftype, row, prev, bpp)
|
||||||
pixels.extend(prev)
|
pixels.extend(prev)
|
||||||
|
|
||||||
# ── Capacity check ──
|
|
||||||
capacity = len(pixels) # 1 bit per byte
|
capacity = len(pixels) # 1 bit per byte
|
||||||
needed = len(payload_block) * 8
|
needed = len(payload_block) * 8
|
||||||
if needed > capacity:
|
if needed > capacity:
|
||||||
@@ -169,21 +156,19 @@ def pack_lsb(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
|||||||
file=sys.stderr)
|
file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# ── Embed bits into LSB ──
|
|
||||||
bit_idx = 0
|
bit_idx = 0
|
||||||
for byte in payload_block:
|
for byte in payload_block:
|
||||||
for shift in range(7, -1, -1):
|
for shift in range(7, -1, -1):
|
||||||
pixels[bit_idx] = (pixels[bit_idx] & 0xFE) | ((byte >> shift) & 1)
|
pixels[bit_idx] = (pixels[bit_idx] & 0xFE) | ((byte >> shift) & 1)
|
||||||
bit_idx += 1
|
bit_idx += 1
|
||||||
|
|
||||||
# ── Re-filter (type 0 = None) & compress ──
|
# re-filter with type 0 (None) and compress
|
||||||
filtered = bytearray()
|
filtered = bytearray()
|
||||||
for y in range(height):
|
for y in range(height):
|
||||||
filtered.append(0)
|
filtered.append(0)
|
||||||
filtered.extend(pixels[y * stride:(y + 1) * stride])
|
filtered.extend(pixels[y * stride:(y + 1) * stride])
|
||||||
compressed = zlib.compress(bytes(filtered), 6)
|
compressed = zlib.compress(bytes(filtered), 6)
|
||||||
|
|
||||||
# ── Reassemble PNG ──
|
|
||||||
result = []
|
result = []
|
||||||
idat_done = False
|
idat_done = False
|
||||||
for ct, cd in chunks:
|
for ct, cd in chunks:
|
||||||
@@ -196,10 +181,7 @@ def pack_lsb(cover_bytes: bytes, payload_block: bytes) -> bytes:
|
|||||||
return assemble_png(result)
|
return assemble_png(result)
|
||||||
|
|
||||||
|
|
||||||
# ── Clean mode ────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def clean_png(data: bytes) -> bytes:
|
def clean_png(data: bytes) -> bytes:
|
||||||
"""Strip all embedded payloads (append + chunk) and bootstrap metadata from a PNG."""
|
|
||||||
marker = BOOTSTRAP_MARKER.encode("latin-1")
|
marker = BOOTSTRAP_MARKER.encode("latin-1")
|
||||||
chunks = parse_chunks(data)
|
chunks = parse_chunks(data)
|
||||||
cleaned = []
|
cleaned = []
|
||||||
@@ -217,7 +199,6 @@ def clean_png(data: bytes) -> bytes:
|
|||||||
|
|
||||||
|
|
||||||
def build_exif_user_comment(text: str) -> bytes:
|
def build_exif_user_comment(text: str) -> bytes:
|
||||||
"""Build minimal eXIf chunk data with UserComment field."""
|
|
||||||
user_comment = b"ASCII\x00\x00\x00" + text.encode("ascii")
|
user_comment = b"ASCII\x00\x00\x00" + text.encode("ascii")
|
||||||
uc_len = len(user_comment)
|
uc_len = len(user_comment)
|
||||||
# TIFF header (8) + IFD0: count(2) + entry(12) + next(4) = 18
|
# TIFF header (8) + IFD0: count(2) + entry(12) + next(4) = 18
|
||||||
@@ -242,7 +223,6 @@ def build_exif_user_comment(text: str) -> bytes:
|
|||||||
|
|
||||||
|
|
||||||
def _make_py_b64(fname: str, last: bool = False) -> str:
|
def _make_py_b64(fname: str, last: bool = False) -> str:
|
||||||
"""Generate base64-encoded Python bootstrap script for CMD."""
|
|
||||||
import base64
|
import base64
|
||||||
find = "d.rfind(m)" if last else "d.find(m)"
|
find = "d.rfind(m)" if last else "d.find(m)"
|
||||||
script = (
|
script = (
|
||||||
@@ -259,7 +239,6 @@ def _make_py_b64(fname: str, last: bool = False) -> str:
|
|||||||
return base64.b64encode(script.encode()).decode()
|
return base64.b64encode(script.encode()).decode()
|
||||||
|
|
||||||
|
|
||||||
# Common PowerShell payload-extraction tail (after $i is set to MAGIC position)
|
|
||||||
_PS_EXTRACT_TAIL = (
|
_PS_EXTRACT_TAIL = (
|
||||||
"$p=$i+8;"
|
"$p=$i+8;"
|
||||||
"$nl=$d[$p]*256+$d[$p+1];$p+=2;"
|
"$nl=$d[$p]*256+$d[$p+1];$p+=2;"
|
||||||
@@ -274,14 +253,12 @@ _PS_EXTRACT_TAIL = (
|
|||||||
'Write-Host "Extracted: $n"'
|
'Write-Host "Extracted: $n"'
|
||||||
)
|
)
|
||||||
|
|
||||||
# PowerShell: find FIRST MAGIC (for bootstrap — extracts unpacker)
|
|
||||||
_PS_FIND_FIRST = (
|
_PS_FIND_FIRST = (
|
||||||
"$s='PNGZIP';$i=-1;"
|
"$s='PNGZIP';$i=-1;"
|
||||||
"do{$i=$t.IndexOf($s,$i+1,[StringComparison]::Ordinal)}"
|
"do{$i=$t.IndexOf($s,$i+1,[StringComparison]::Ordinal)}"
|
||||||
"while($i-ge0-and($d[$i+6]-ne0-or$d[$i+7]-ne1));"
|
"while($i-ge0-and($d[$i+6]-ne0-or$d[$i+7]-ne1));"
|
||||||
)
|
)
|
||||||
|
|
||||||
# PowerShell: find LAST MAGIC (for direct extraction — extracts payload)
|
|
||||||
_PS_FIND_LAST = (
|
_PS_FIND_LAST = (
|
||||||
"$s='PNGZIP';$i=$d.Length;"
|
"$s='PNGZIP';$i=$d.Length;"
|
||||||
"do{$i=$t.LastIndexOf($s,$i-1,[StringComparison]::Ordinal)}"
|
"do{$i=$t.LastIndexOf($s,$i-1,[StringComparison]::Ordinal)}"
|
||||||
@@ -295,7 +272,6 @@ _PS_READ_HEAD = (
|
|||||||
|
|
||||||
|
|
||||||
def _make_ps_encoded(fname: str, last: bool = False) -> str:
|
def _make_ps_encoded(fname: str, last: bool = False) -> str:
|
||||||
"""Generate base64-encoded PowerShell bootstrap script for CMD."""
|
|
||||||
import base64
|
import base64
|
||||||
find = _PS_FIND_LAST if last else _PS_FIND_FIRST
|
find = _PS_FIND_LAST if last else _PS_FIND_FIRST
|
||||||
script = _PS_READ_HEAD.replace("IMG", fname) + find + _PS_EXTRACT_TAIL
|
script = _PS_READ_HEAD.replace("IMG", fname) + find + _PS_EXTRACT_TAIL
|
||||||
@@ -303,7 +279,6 @@ def _make_ps_encoded(fname: str, last: bool = False) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def _py_oneliner(fname: str, last: bool = False) -> str:
|
def _py_oneliner(fname: str, last: bool = False) -> str:
|
||||||
"""Python one-liner for shell (Linux/macOS/PowerShell)."""
|
|
||||||
find = "d.rfind" if last else "d.find"
|
find = "d.rfind" if last else "d.find"
|
||||||
return (
|
return (
|
||||||
"python3 -c \"d=open('IMG','rb').read();"
|
"python3 -c \"d=open('IMG','rb').read();"
|
||||||
@@ -318,13 +293,11 @@ def _py_oneliner(fname: str, last: bool = False) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def _ps_oneliner(fname: str, last: bool = False) -> str:
|
def _ps_oneliner(fname: str, last: bool = False) -> str:
|
||||||
"""PowerShell one-liner."""
|
|
||||||
find = _PS_FIND_LAST if last else _PS_FIND_FIRST
|
find = _PS_FIND_LAST if last else _PS_FIND_FIRST
|
||||||
return (_PS_READ_HEAD + find + _PS_EXTRACT_TAIL).replace("IMG", fname)
|
return (_PS_READ_HEAD + find + _PS_EXTRACT_TAIL).replace("IMG", fname)
|
||||||
|
|
||||||
|
|
||||||
def _add_oneliner_block(parts: list, fname: str, last: bool, label: str):
|
def _add_oneliner_block(parts: list, fname: str, last: bool, label: str):
|
||||||
"""Append a full set of 4 one-liners to parts list."""
|
|
||||||
parts.append("")
|
parts.append("")
|
||||||
parts.append(f"=== {label} ===")
|
parts.append(f"=== {label} ===")
|
||||||
parts.append("")
|
parts.append("")
|
||||||
@@ -344,7 +317,6 @@ def _add_oneliner_block(parts: list, fname: str, last: bool, label: str):
|
|||||||
|
|
||||||
|
|
||||||
def build_bootstrap_comment(output_name: str, has_self_extract: bool = True) -> bytes:
|
def build_bootstrap_comment(output_name: str, has_self_extract: bool = True) -> bytes:
|
||||||
"""Build tEXt chunk data with all bootstrap one-liners for PNG metadata."""
|
|
||||||
fname = os.path.basename(output_name)
|
fname = os.path.basename(output_name)
|
||||||
parts = [BOOTSTRAP_MARKER]
|
parts = [BOOTSTRAP_MARKER]
|
||||||
if has_self_extract:
|
if has_self_extract:
|
||||||
@@ -356,8 +328,6 @@ def build_bootstrap_comment(output_name: str, has_self_extract: bool = True) ->
|
|||||||
return b"Comment\x00" + text.encode("latin-1")
|
return b"Comment\x00" + text.encode("latin-1")
|
||||||
|
|
||||||
|
|
||||||
# ── Main ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
ap = argparse.ArgumentParser(description="Pack file into PNG")
|
ap = argparse.ArgumentParser(description="Pack file into PNG")
|
||||||
ap.add_argument("cover", help="Cover PNG image")
|
ap.add_argument("cover", help="Cover PNG image")
|
||||||
@@ -380,7 +350,6 @@ def main():
|
|||||||
print("Error: not a valid PNG", file=sys.stderr)
|
print("Error: not a valid PNG", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
# ── Clean mode ──
|
|
||||||
if args.clean:
|
if args.clean:
|
||||||
result = clean_png(cover)
|
result = clean_png(cover)
|
||||||
out = args.output if args.payload is None else args.output
|
out = args.output if args.payload is None else args.output
|
||||||
@@ -391,7 +360,6 @@ def main():
|
|||||||
print(f"Cleaned: {out} ({after_kb:.1f} KB, was {before_kb:.1f} KB)")
|
print(f"Cleaned: {out} ({after_kb:.1f} KB, was {before_kb:.1f} KB)")
|
||||||
return
|
return
|
||||||
|
|
||||||
# ── Pack mode — payload required ──
|
|
||||||
if args.payload is None:
|
if args.payload is None:
|
||||||
print("Error: payload argument is required (unless --clean)", file=sys.stderr)
|
print("Error: payload argument is required (unless --clean)", file=sys.stderr)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|||||||
16
unpack.py
16
unpack.py
@@ -19,8 +19,6 @@ MAGIC = b"PNGZIP\x00\x01"
|
|||||||
PNG_SIG = b"\x89PNG\r\n\x1a\n"
|
PNG_SIG = b"\x89PNG\r\n\x1a\n"
|
||||||
|
|
||||||
|
|
||||||
# ── Shared helpers ────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def find_iend(data: bytes) -> int:
|
def find_iend(data: bytes) -> int:
|
||||||
"""Return offset of the first byte AFTER the IEND chunk, or -1."""
|
"""Return offset of the first byte AFTER the IEND chunk, or -1."""
|
||||||
idx = data.rfind(b"IEND")
|
idx = data.rfind(b"IEND")
|
||||||
@@ -87,20 +85,14 @@ def parse_payload_blocks(data: bytes):
|
|||||||
offset = pos
|
offset = pos
|
||||||
|
|
||||||
|
|
||||||
# ── Append mode ───────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def find_append_payloads(data: bytes):
|
def find_append_payloads(data: bytes):
|
||||||
"""Find payloads appended after IEND."""
|
|
||||||
iend = find_iend(data)
|
iend = find_iend(data)
|
||||||
if iend == -1 or iend >= len(data):
|
if iend == -1 or iend >= len(data):
|
||||||
return []
|
return []
|
||||||
return list(parse_payload_blocks(data[iend:]))
|
return list(parse_payload_blocks(data[iend:]))
|
||||||
|
|
||||||
|
|
||||||
# ── Chunk mode ────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def find_chunk_payloads(data: bytes):
|
def find_chunk_payloads(data: bytes):
|
||||||
"""Find payloads stored in pnZp chunks."""
|
|
||||||
if data[:8] != PNG_SIG:
|
if data[:8] != PNG_SIG:
|
||||||
return []
|
return []
|
||||||
chunks = parse_chunks(data)
|
chunks = parse_chunks(data)
|
||||||
@@ -112,8 +104,6 @@ def find_chunk_payloads(data: bytes):
|
|||||||
return list(parse_payload_blocks(combined))
|
return list(parse_payload_blocks(combined))
|
||||||
|
|
||||||
|
|
||||||
# ── LSB mode ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _paeth(a, b, c):
|
def _paeth(a, b, c):
|
||||||
p = a + b - c
|
p = a + b - c
|
||||||
pa, pb, pc = abs(p - a), abs(p - b), abs(p - c)
|
pa, pb, pc = abs(p - a), abs(p - b), abs(p - c)
|
||||||
@@ -145,7 +135,6 @@ def _unfilter_row(ftype, row, prev, bpp):
|
|||||||
|
|
||||||
|
|
||||||
def find_lsb_payloads(data: bytes):
|
def find_lsb_payloads(data: bytes):
|
||||||
"""Find payloads encoded in LSB of pixel data."""
|
|
||||||
if data[:8] != PNG_SIG:
|
if data[:8] != PNG_SIG:
|
||||||
return []
|
return []
|
||||||
try:
|
try:
|
||||||
@@ -224,10 +213,7 @@ def find_lsb_payloads(data: bytes):
|
|||||||
return list(parse_payload_blocks(bytes(extracted)))
|
return list(parse_payload_blocks(bytes(extracted)))
|
||||||
|
|
||||||
|
|
||||||
# ── Auto-detect all modes ────────────────────────────────────────
|
|
||||||
|
|
||||||
def find_all_payloads(data: bytes):
|
def find_all_payloads(data: bytes):
|
||||||
"""Try all detection methods: append → chunk → lsb."""
|
|
||||||
results = find_append_payloads(data)
|
results = find_append_payloads(data)
|
||||||
if results:
|
if results:
|
||||||
return results
|
return results
|
||||||
@@ -240,8 +226,6 @@ def find_all_payloads(data: bytes):
|
|||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
# ── Main ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
ap = argparse.ArgumentParser(description="Extract file from PNG")
|
ap = argparse.ArgumentParser(description="Extract file from PNG")
|
||||||
ap.add_argument("image", help="PNG with hidden payload")
|
ap.add_argument("image", help="PNG with hidden payload")
|
||||||
|
|||||||
Reference in New Issue
Block a user