Logo
Overview
[H7CTF 2025] Kakashi's secrect jutsu writeup

[H7CTF 2025] Kakashi's secrect jutsu writeup

October 21, 2025
5 min read

TL;DR

CBC 상태를 재활용하는 encryption oracle을 이용해 secret을 바이트 단위로 추출했다. PoW를 브루트포스로 해결하고 연속 블록 충돌 조건으로 후보를 검증했다. 후보 탐색은 printable 우선 집합과 전체 바이트 집합을 단계적으로 사용했다.

Additional: 딜레이가 심한 관계로 인도 지역에 AWS로 공격했다.

Analysis

def proof_of_work(self):
proof = ''.join([random.choice(string.ascii_letters+string.digits) for _ in range(20)])
_hexdigest = sha256(proof.encode()).hexdigest()
self.send(f"sha256(XXXX+{proof[4:]}) == {_hexdigest}".encode())
x = self.recv(prompt=b'Give me XXXX: ')
if len(x) != 4 or sha256(x+proof[4:].encode()).hexdigest() != _hexdigest:
return False
return True

세션 시작은 4바이트 prefix를 찾는 PoW이다. 허용 alphabet이 6262 문자라 탐색 공간은 62462^4이며 전수 검사로 해결 가능하다.

secret = get_secret()
key = get_key()
IV = get_IV()
aes = AES.new(key, mode=AES.MODE_CBC, iv=IV)
self.send(f"Kakashi says — IV is: {IV.hex()}".encode())

세션별로 secret key IV를 고정한다. AES.new로 만든 CBC 객체를 재사용하므로 내부 IV는 매 암호화 후 마지막 블록으로 갱신된다. 서버는 처음 연결 시 초기 IV만 알려준다.

def pad(self, data):
pad_len = 16 - len(data)%16
return data + bytes([pad_len])*pad_len

패딩은 PKCS7이다. 사용자가 보낸 메시지 뒤에 secret을 이어붙이고 padding을 적용한다고 가정한다. 빈 입력을 보낼 때 ciphertext 길이가 항상 64바이트였으므로 secret 길이 48바이트라는 결론에 도달했다. 이 가정은 ciphertext 길이 관찰을 통해 검증했다.

if choice == b"1":
msg = self.recvhex(prompt=b"Your jutsu (in hex): ")
cipher = perform_encryption(aes, msg, secret, self.pad)
self.send(cipher.hex().encode())

메뉴 1이 encryption oracle을 제공한다. perform_encryption 구현은 주어지지 않았지만 동일 세션에서 IV가 연속적으로 갱신되는 점과 ciphertext 길이 분석으로 plaintext 구성이 pad(msg + secret)임을 추측했다. secret이 블록 끝에 오도록 prefix 길이를 조절하면 공격 대상 바이트를 마지막 위치에 둘 수 있다.

elif choice == b"2":
guess = self.recvhex(prompt=b"What did your Sharingan see? (in hex): ")
if guess == secret:
self.send(b"Kakashi: Impressive. Here is your scroll : " + flag)

복구한 secret을 메뉴 2로 제출하면 flag를 획득한다. 따라서 목표는 encryption oracle로 secret을 추출하는 것이다.

def recover_single_session():
conn = Conn()
conn.connect_and_setup()
recovered = bytearray()
while idx < SECRET_LEN:
pad_len = (-(idx + 1)) % 16
if pad_len == 0:
pad_len = 16
prefix = b"\x00" * pad_len
block_idx = (pad_len + idx) // 16

secret의 idxidx번째 바이트를 노릴 때 prefix 길이를 조정해 해당 바이트를 어떤 블록의 끝으로 보낸다. pad_len이 0이면 빈 메시지가 서버에서 세션 종료를 유발해 16으로 대체한다. 이는 padding 규칙을 유지하므로 공격에는 영향이 없다.

state_before = bytes(conn.state_iv)
ctA = conn.encrypt(prefix)
blocksA = chunk_blocks(ctA)
target_block = blocksA[block_idx]
prev_block = state_before if block_idx == 0 else blocksA[block_idx - 1]
IV_prime = bytes(conn.state_iv)

baseline 조회로 목표 블록 target_block과 chaining 값 prev_block을 수집한다. state_iv는 매 요청 후 마지막 블록으로 갱신되므로 IV_prime이 다음 요청의 초기 IV가 된다.

G_full = prefix + bytes(recovered) + bytes([candidate])
G = G_full[-16:]
first_plain = xor_bytes(xor_bytes(G, prev_block), IV_prime)
ctB = conn.encrypt(first_plain)
if chunk_blocks(ctB)[0] == target_block:
return candidate

secret 후보 바이트를 끝에 붙여 16바이트 조각 G를 만들고 CBC 역산으로 첫 블록 plaintext를 구성한다. 후보가 참이면 첫 ciphertext 블록이 baseline의 target_block과 일치한다. 후보가 틀리면 충돌이 사라진다. 이 방식은 secret 바이트를 앞에서부터 순차적으로 결정한다.

pri, rest = make_two_stage_candidates()
c = try_candidates(pri)
...
c = try_candidates(rest)

후보는 printable 문자 우선 집합과 나머지 바이트 집합으로 나누어 검사한다. secret이 ASCII flag일 가능성을 활용해 쿼리 수를 줄인다. 모든 후보가 실패하면 세션을 포기하고 재연결한다. 이는 perform_encryption 구조에 대한 추정이 틀렸을 경우 대비책으로 사용된다.

Exploit

  1. PoW 해결: 서버 배너에서 suffix와 digest를 추출해 62462^4 공간을 탐색한다. Python itertools.productsha256 비교로 약 수백만 해시를 계산하면 충분하다.
  2. 상태 추적: 각 encryption 응답의 마지막 블록을 내부 state_iv에 저장해 다음 요청의 chaining 값으로 사용한다. 이는 PyCryptodome CBC가 연속 스트림처럼 동작한다는 사실에 기반한 필수 조건이다.
  3. 바이트 복구: 목표 인덱스마다 prefix를 설정해 secret 바이트를 블록 끝에 놓고 baseline과 probe를 비교한다. printable 후보 집합 후 전체 바이트 집합을 시도한다. 후보가 일치하면 secret 바이트를 확정하고 다음 인덱스로 이동한다.
  4. 제출: 48바이트 secret을 모두 얻으면 메뉴 2로 전송한다. flag를 받기 전까지 세션이 끊길 수 있으므로 최대 8회까지 자동 재시도하도록 구성했다.
import time
import re
import string
import itertools
from hashlib import sha256
from typing import List
from pwn import context, remote
from pwnlib.exception import PwnlibException
HOST = "play.h7tex.com"
PORT = 44056
SECRET_LEN = 48
POW_RE = re.compile(br"sha256\(XXXX\+(.+?)\) == ([0-9a-f]{64})")
POW_ALPHABET = string.ascii_letters + string.digits
SOCK_TIMEOUT = 10.0
RETRY_DELAY = 0.4
MAX_CALLS_PER_SESSION = 200000
MAX_SESS_RESTARTS = 8
context.log_level = "debug"
DEBUG = True
def build_priority_candidates() -> List[int]:
pri = []
extras = [ord('{'), ord('}'), ord('_'), ord('-'), ord(':')]
for b in extras:
if b not in pri:
pri.append(b)
for b in range(0x20, 0x7f):
if b not in pri:
pri.append(b)
return pri
PRIORITY_CANDIDATES = build_priority_candidates()
FULL_CANDIDATES = list(range(256))
def make_two_stage_candidates():
pri = PRIORITY_CANDIDATES.copy()
rest = [b for b in FULL_CANDIDATES if b not in pri]
return pri, rest
def xor_bytes(a: bytes, b: bytes) -> bytes:
return bytes(x ^ y for x, y in zip(a, b))
def chunk_blocks(data: bytes, bs: int = 16):
return [data[i:i+bs] for i in range(0, len(data), bs)]
def find_longest_hex_token(data: bytes) -> bytes:
text = data.decode(errors="ignore")
tokens = []
for line in text.splitlines():
t = line.strip()
if not t:
continue
if all(ch in "0123456789abcdefABCDEF" for ch in t) and (len(t) % 2 == 0):
tokens.append(t)
if not tokens:
return b""
return max(tokens, key=len).encode()
class ConnectionDropped(Exception):
pass
class Conn:
def __init__(self):
self.sock = None
self.state_iv = None
self.encrypt_calls = 0
def connect_and_setup(self):
if self.sock:
try:
self.sock.close()
except Exception:
pass
self.sock = remote(HOST, PORT, timeout=SOCK_TIMEOUT)
# PoW
data = self.sock.recvuntil(b"Give me XXXX: ", timeout=SOCK_TIMEOUT)
m = POW_RE.search(data)
if not m:
raise RuntimeError("PoW prompt not found")
suffix, digest_hex = m.groups()
target = digest_hex.decode()
solved = False
for cand in itertools.product(POW_ALPHABET, repeat=4):
prefix = "".join(cand).encode()
if sha256(prefix + suffix).hexdigest() == target:
self.sock.sendline(prefix)
solved = True
break
if not solved:
raise RuntimeError("PoW unsolved")
# menu + IV
menu = self.sock.recvuntil(b"> ", timeout=SOCK_TIMEOUT)
text = menu.decode(errors="ignore")
iv = None
for line in text.splitlines():
if "IV is:" in line:
iv_hex = line.split("IV is:")[1].strip()
iv = bytes.fromhex(iv_hex)
break
if iv is None:
raise RuntimeError("IV not found in menu")
self.state_iv = bytearray(iv)
self.encrypt_calls = 0
if DEBUG:
print("[connect] IV:", iv.hex())
def encrypt(self, payload: bytes, timeout: float = SOCK_TIMEOUT) -> bytes:
try:
self.sock.sendline(b"1")
self.sock.recvuntil(b"(in hex): ", timeout=timeout)
self.sock.sendline(payload.hex().encode())
data = self.sock.recvuntil(b"> ", timeout=timeout)
token = find_longest_hex_token(data)
if not token:
raise ConnectionDropped("no hex token in reply")
ct = bytes.fromhex(token.decode())
if len(ct) < 16:
raise ConnectionDropped("cipher too short")
self.state_iv[:] = ct[-16:]
self.encrypt_calls += 1
if DEBUG and (self.encrypt_calls < 20 or self.encrypt_calls % 200 == 0):
print(f"[encrypt #{self.encrypt_calls}] payload_len={len(payload)} ct_len={len(ct)} iv={bytes(self.state_iv).hex()}")
if self.encrypt_calls > MAX_CALLS_PER_SESSION:
raise ConnectionDropped("exceeded max calls per session")
return ct
except (BrokenPipeError, ConnectionResetError, EOFError, ConnectionDropped) as e:
raise ConnectionDropped(e)
except PwnlibException as e:
raise ConnectionDropped(e)
except Exception as e:
raise ConnectionDropped(e)
def submit(self, secret: bytes) -> str:
try:
self.sock.sendline(b"2")
self.sock.recvuntil(b"(in hex): ", timeout=5)
self.sock.sendline(secret.hex().encode())
line = self.sock.recvline(timeout=3)
return line.decode(errors="ignore").strip() if line else ""
except Exception:
return ""
def recover_single_session():
conn = Conn()
conn.connect_and_setup()
recovered = bytearray()
idx = 0
while idx < SECRET_LEN:
pad_len = (-(idx + 1)) % 16
if pad_len == 0:
pad_len = 16
prefix = b"\x00" * pad_len
block_idx = (pad_len + idx) // 16
if DEBUG:
print(f"[IDX {idx}] pad_len={pad_len} block_idx={block_idx} iv={bytes(conn.state_iv).hex()} enc_calls={conn.encrypt_calls}")
pri, rest = make_two_stage_candidates()
def try_candidates(iterable):
nonlocal conn, idx, prefix, block_idx, recovered
for c in iterable:
try:
state_before = bytes(conn.state_iv)
ctA = conn.encrypt(prefix)
except ConnectionDropped as e:
if DEBUG:
print("[*] baseline: connection dropped during encrypt:", e)
raise ConnectionDropped(e)
blocksA = chunk_blocks(ctA)
if block_idx >= len(blocksA):
if DEBUG:
print(f"[!] ctA too short (need block {block_idx} have {len(blocksA)}) -> abort session")
raise ConnectionDropped("ctA too short")
target_block = blocksA[block_idx]
prev_block = state_before if block_idx == 0 else blocksA[block_idx - 1]
IV_prime = bytes(conn.state_iv)
G_full = prefix + bytes(recovered) + bytes([c])
G = G_full[-16:]
first_plain = xor_bytes(xor_bytes(G, prev_block), IV_prime)
try:
ctB = conn.encrypt(first_plain)
except ConnectionDropped as e:
if DEBUG:
print("[*] probe: connection dropped during encrypt:", e)
raise ConnectionDropped(e)
blocksB = chunk_blocks(ctB)
if not blocksB:
if DEBUG:
print("[!] ctB empty -> abort session")
raise ConnectionDropped("ctB empty")
if blocksB[0] == target_block:
return c
return None
c = try_candidates(pri)
if c is not None:
recovered.append(c)
print(f"[{idx+1:02d}/{SECRET_LEN}] byte = 0x{c:02x} ({chr(c) if 32<=c<=126 else '.'}) calls={conn.encrypt_calls} (from PRIORITY set)")
idx += 1
continue
if DEBUG:
print(f"[IDX {idx}] no match in PRIORITY set -> trying FULL 0..255 set (this will be slower)")
c = try_candidates(rest)
if c is not None:
recovered.append(c)
print(f"[{idx+1:02d}/{SECRET_LEN}] byte = 0x{c:02x} ({chr(c) if 32<=c<=126 else '.'}) calls={conn.encrypt_calls} (from FULL set)")
idx += 1
continue
if DEBUG:
print(f"[!] exhausted all candidates at idx={idx} -> abort session")
raise RuntimeError("No candidate matched in session (exhausted full set)")
return bytes(recovered), conn
def run_recover():
attempts = 0
while attempts < MAX_SESS_RESTARTS:
attempts += 1
try:
secret, conn = recover_single_session()
print("[+] recovered secret (hex):", secret.hex())
resp = conn.submit(secret)
if resp:
print("[+] submit response:", resp)
return
except ConnectionDropped as e:
print(f"[!] session dropped (attempt {attempts}/{MAX_SESS_RESTARTS}): {e}")
time.sleep(RETRY_DELAY)
continue
except RuntimeError as e:
print(f"[!] session runtime error (attempt {attempts}/{MAX_SESS_RESTARTS}): {e}")
time.sleep(RETRY_DELAY)
continue
except Exception as e:
print("[!] unexpected error:", type(e).__name__, e)
time.sleep(RETRY_DELAY)
continue
print("[FATAL] all session attempts exhausted. Consider raising MAX_SESS_RESTARTS or analyzing server behavior more closely.")
if __name__ == "__main__":
run_recover()
H7CTF{K4K4SH1_5H4R1NG4N_$33_411_258d69d7-2492-4a56-ac26-1d0b91117393}