TL;DR
CBC 상태를 재활용하는 encryption oracle을 이용해 secret을 바이트 단위로 추출했다. PoW를 브루트포스로 해결하고 연속 블록 충돌 조건으로 후보를 검증했다. 후보 탐색은 printable 우선 집합과 전체 바이트 집합을 단계적으로 사용했다.
Additional: 딜레이가 심한 관계로 인도 지역에 AWS로 공격했다.
Analysis
def proof_of_work(self): proof = ''.join([random.choice(string.ascii_letters+string.digits) for _ in range(20)]) _hexdigest = sha256(proof.encode()).hexdigest() self.send(f"sha256(XXXX+{proof[4:]}) == {_hexdigest}".encode()) x = self.recv(prompt=b'Give me XXXX: ') if len(x) != 4 or sha256(x+proof[4:].encode()).hexdigest() != _hexdigest: return False return True세션 시작은 4바이트 prefix를 찾는 PoW이다. 허용 alphabet이 문자라 탐색 공간은 이며 전수 검사로 해결 가능하다.
secret = get_secret()key = get_key()IV = get_IV()aes = AES.new(key, mode=AES.MODE_CBC, iv=IV)self.send(f"Kakashi says — IV is: {IV.hex()}".encode())세션별로 secret key IV를 고정한다. AES.new로 만든 CBC 객체를 재사용하므로 내부 IV는 매 암호화 후 마지막 블록으로 갱신된다. 서버는 처음 연결 시 초기 IV만 알려준다.
def pad(self, data): pad_len = 16 - len(data)%16 return data + bytes([pad_len])*pad_len패딩은 PKCS7이다. 사용자가 보낸 메시지 뒤에 secret을 이어붙이고 padding을 적용한다고 가정한다. 빈 입력을 보낼 때 ciphertext 길이가 항상 64바이트였으므로 secret 길이 48바이트라는 결론에 도달했다. 이 가정은 ciphertext 길이 관찰을 통해 검증했다.
if choice == b"1": msg = self.recvhex(prompt=b"Your jutsu (in hex): ") cipher = perform_encryption(aes, msg, secret, self.pad) self.send(cipher.hex().encode())메뉴 1이 encryption oracle을 제공한다. perform_encryption 구현은 주어지지 않았지만 동일 세션에서 IV가 연속적으로 갱신되는 점과 ciphertext 길이 분석으로 plaintext 구성이 pad(msg + secret)임을 추측했다. secret이 블록 끝에 오도록 prefix 길이를 조절하면 공격 대상 바이트를 마지막 위치에 둘 수 있다.
elif choice == b"2": guess = self.recvhex(prompt=b"What did your Sharingan see? (in hex): ") if guess == secret: self.send(b"Kakashi: Impressive. Here is your scroll : " + flag)복구한 secret을 메뉴 2로 제출하면 flag를 획득한다. 따라서 목표는 encryption oracle로 secret을 추출하는 것이다.
def recover_single_session(): conn = Conn() conn.connect_and_setup() recovered = bytearray() while idx < SECRET_LEN: pad_len = (-(idx + 1)) % 16 if pad_len == 0: pad_len = 16 prefix = b"\x00" * pad_len block_idx = (pad_len + idx) // 16secret의 번째 바이트를 노릴 때 prefix 길이를 조정해 해당 바이트를 어떤 블록의 끝으로 보낸다. pad_len이 0이면 빈 메시지가 서버에서 세션 종료를 유발해 16으로 대체한다. 이는 padding 규칙을 유지하므로 공격에는 영향이 없다.
state_before = bytes(conn.state_iv)ctA = conn.encrypt(prefix)blocksA = chunk_blocks(ctA)target_block = blocksA[block_idx]prev_block = state_before if block_idx == 0 else blocksA[block_idx - 1]IV_prime = bytes(conn.state_iv)baseline 조회로 목표 블록 target_block과 chaining 값 prev_block을 수집한다. state_iv는 매 요청 후 마지막 블록으로 갱신되므로 IV_prime이 다음 요청의 초기 IV가 된다.
G_full = prefix + bytes(recovered) + bytes([candidate])G = G_full[-16:]first_plain = xor_bytes(xor_bytes(G, prev_block), IV_prime)ctB = conn.encrypt(first_plain)if chunk_blocks(ctB)[0] == target_block: return candidatesecret 후보 바이트를 끝에 붙여 16바이트 조각 G를 만들고 CBC 역산으로 첫 블록 plaintext를 구성한다. 후보가 참이면 첫 ciphertext 블록이 baseline의 target_block과 일치한다. 후보가 틀리면 충돌이 사라진다. 이 방식은 secret 바이트를 앞에서부터 순차적으로 결정한다.
pri, rest = make_two_stage_candidates()c = try_candidates(pri)...c = try_candidates(rest)후보는 printable 문자 우선 집합과 나머지 바이트 집합으로 나누어 검사한다. secret이 ASCII flag일 가능성을 활용해 쿼리 수를 줄인다. 모든 후보가 실패하면 세션을 포기하고 재연결한다. 이는 perform_encryption 구조에 대한 추정이 틀렸을 경우 대비책으로 사용된다.
Exploit
- PoW 해결: 서버 배너에서 suffix와 digest를 추출해 공간을 탐색한다. Python
itertools.product와sha256비교로 약 수백만 해시를 계산하면 충분하다. - 상태 추적: 각 encryption 응답의 마지막 블록을 내부
state_iv에 저장해 다음 요청의 chaining 값으로 사용한다. 이는 PyCryptodome CBC가 연속 스트림처럼 동작한다는 사실에 기반한 필수 조건이다. - 바이트 복구: 목표 인덱스마다 prefix를 설정해 secret 바이트를 블록 끝에 놓고 baseline과 probe를 비교한다. printable 후보 집합 후 전체 바이트 집합을 시도한다. 후보가 일치하면 secret 바이트를 확정하고 다음 인덱스로 이동한다.
- 제출: 48바이트 secret을 모두 얻으면 메뉴 2로 전송한다. flag를 받기 전까지 세션이 끊길 수 있으므로 최대 8회까지 자동 재시도하도록 구성했다.
import timeimport reimport stringimport itertoolsfrom hashlib import sha256from typing import List
from pwn import context, remotefrom pwnlib.exception import PwnlibException
HOST = "play.h7tex.com"PORT = 44056SECRET_LEN = 48
POW_RE = re.compile(br"sha256\(XXXX\+(.+?)\) == ([0-9a-f]{64})")POW_ALPHABET = string.ascii_letters + string.digits
SOCK_TIMEOUT = 10.0RETRY_DELAY = 0.4MAX_CALLS_PER_SESSION = 200000MAX_SESS_RESTARTS = 8
context.log_level = "debug"DEBUG = True
def build_priority_candidates() -> List[int]: pri = [] extras = [ord('{'), ord('}'), ord('_'), ord('-'), ord(':')] for b in extras: if b not in pri: pri.append(b) for b in range(0x20, 0x7f): if b not in pri: pri.append(b) return pri
PRIORITY_CANDIDATES = build_priority_candidates()FULL_CANDIDATES = list(range(256))
def make_two_stage_candidates(): pri = PRIORITY_CANDIDATES.copy() rest = [b for b in FULL_CANDIDATES if b not in pri] return pri, rest
def xor_bytes(a: bytes, b: bytes) -> bytes: return bytes(x ^ y for x, y in zip(a, b))
def chunk_blocks(data: bytes, bs: int = 16): return [data[i:i+bs] for i in range(0, len(data), bs)]
def find_longest_hex_token(data: bytes) -> bytes: text = data.decode(errors="ignore") tokens = [] for line in text.splitlines(): t = line.strip() if not t: continue if all(ch in "0123456789abcdefABCDEF" for ch in t) and (len(t) % 2 == 0): tokens.append(t) if not tokens: return b"" return max(tokens, key=len).encode()
class ConnectionDropped(Exception): pass
class Conn: def __init__(self): self.sock = None self.state_iv = None self.encrypt_calls = 0
def connect_and_setup(self): if self.sock: try: self.sock.close() except Exception: pass self.sock = remote(HOST, PORT, timeout=SOCK_TIMEOUT)
# PoW data = self.sock.recvuntil(b"Give me XXXX: ", timeout=SOCK_TIMEOUT) m = POW_RE.search(data) if not m: raise RuntimeError("PoW prompt not found") suffix, digest_hex = m.groups() target = digest_hex.decode() solved = False for cand in itertools.product(POW_ALPHABET, repeat=4): prefix = "".join(cand).encode() if sha256(prefix + suffix).hexdigest() == target: self.sock.sendline(prefix) solved = True break if not solved: raise RuntimeError("PoW unsolved")
# menu + IV menu = self.sock.recvuntil(b"> ", timeout=SOCK_TIMEOUT) text = menu.decode(errors="ignore") iv = None for line in text.splitlines(): if "IV is:" in line: iv_hex = line.split("IV is:")[1].strip() iv = bytes.fromhex(iv_hex) break if iv is None: raise RuntimeError("IV not found in menu") self.state_iv = bytearray(iv) self.encrypt_calls = 0 if DEBUG: print("[connect] IV:", iv.hex())
def encrypt(self, payload: bytes, timeout: float = SOCK_TIMEOUT) -> bytes: try: self.sock.sendline(b"1") self.sock.recvuntil(b"(in hex): ", timeout=timeout) self.sock.sendline(payload.hex().encode()) data = self.sock.recvuntil(b"> ", timeout=timeout) token = find_longest_hex_token(data) if not token: raise ConnectionDropped("no hex token in reply") ct = bytes.fromhex(token.decode()) if len(ct) < 16: raise ConnectionDropped("cipher too short") self.state_iv[:] = ct[-16:] self.encrypt_calls += 1 if DEBUG and (self.encrypt_calls < 20 or self.encrypt_calls % 200 == 0): print(f"[encrypt #{self.encrypt_calls}] payload_len={len(payload)} ct_len={len(ct)} iv={bytes(self.state_iv).hex()}") if self.encrypt_calls > MAX_CALLS_PER_SESSION: raise ConnectionDropped("exceeded max calls per session") return ct except (BrokenPipeError, ConnectionResetError, EOFError, ConnectionDropped) as e: raise ConnectionDropped(e) except PwnlibException as e: raise ConnectionDropped(e) except Exception as e: raise ConnectionDropped(e)
def submit(self, secret: bytes) -> str: try: self.sock.sendline(b"2") self.sock.recvuntil(b"(in hex): ", timeout=5) self.sock.sendline(secret.hex().encode()) line = self.sock.recvline(timeout=3) return line.decode(errors="ignore").strip() if line else "" except Exception: return ""
def recover_single_session(): conn = Conn() conn.connect_and_setup()
recovered = bytearray() idx = 0
while idx < SECRET_LEN: pad_len = (-(idx + 1)) % 16 if pad_len == 0: pad_len = 16 prefix = b"\x00" * pad_len block_idx = (pad_len + idx) // 16
if DEBUG: print(f"[IDX {idx}] pad_len={pad_len} block_idx={block_idx} iv={bytes(conn.state_iv).hex()} enc_calls={conn.encrypt_calls}")
pri, rest = make_two_stage_candidates()
def try_candidates(iterable): nonlocal conn, idx, prefix, block_idx, recovered for c in iterable: try: state_before = bytes(conn.state_iv) ctA = conn.encrypt(prefix) except ConnectionDropped as e: if DEBUG: print("[*] baseline: connection dropped during encrypt:", e) raise ConnectionDropped(e)
blocksA = chunk_blocks(ctA) if block_idx >= len(blocksA): if DEBUG: print(f"[!] ctA too short (need block {block_idx} have {len(blocksA)}) -> abort session") raise ConnectionDropped("ctA too short")
target_block = blocksA[block_idx] prev_block = state_before if block_idx == 0 else blocksA[block_idx - 1] IV_prime = bytes(conn.state_iv)
G_full = prefix + bytes(recovered) + bytes([c]) G = G_full[-16:] first_plain = xor_bytes(xor_bytes(G, prev_block), IV_prime)
try: ctB = conn.encrypt(first_plain) except ConnectionDropped as e: if DEBUG: print("[*] probe: connection dropped during encrypt:", e) raise ConnectionDropped(e)
blocksB = chunk_blocks(ctB) if not blocksB: if DEBUG: print("[!] ctB empty -> abort session") raise ConnectionDropped("ctB empty")
if blocksB[0] == target_block: return c return None
c = try_candidates(pri) if c is not None: recovered.append(c) print(f"[{idx+1:02d}/{SECRET_LEN}] byte = 0x{c:02x} ({chr(c) if 32<=c<=126 else '.'}) calls={conn.encrypt_calls} (from PRIORITY set)") idx += 1 continue
if DEBUG: print(f"[IDX {idx}] no match in PRIORITY set -> trying FULL 0..255 set (this will be slower)")
c = try_candidates(rest) if c is not None: recovered.append(c) print(f"[{idx+1:02d}/{SECRET_LEN}] byte = 0x{c:02x} ({chr(c) if 32<=c<=126 else '.'}) calls={conn.encrypt_calls} (from FULL set)") idx += 1 continue
if DEBUG: print(f"[!] exhausted all candidates at idx={idx} -> abort session") raise RuntimeError("No candidate matched in session (exhausted full set)")
return bytes(recovered), conn
def run_recover(): attempts = 0 while attempts < MAX_SESS_RESTARTS: attempts += 1 try: secret, conn = recover_single_session() print("[+] recovered secret (hex):", secret.hex()) resp = conn.submit(secret) if resp: print("[+] submit response:", resp) return except ConnectionDropped as e: print(f"[!] session dropped (attempt {attempts}/{MAX_SESS_RESTARTS}): {e}") time.sleep(RETRY_DELAY) continue except RuntimeError as e: print(f"[!] session runtime error (attempt {attempts}/{MAX_SESS_RESTARTS}): {e}") time.sleep(RETRY_DELAY) continue except Exception as e: print("[!] unexpected error:", type(e).__name__, e) time.sleep(RETRY_DELAY) continue print("[FATAL] all session attempts exhausted. Consider raising MAX_SESS_RESTARTS or analyzing server behavior more closely.")
if __name__ == "__main__": run_recover()H7CTF{K4K4SH1_5H4R1NG4N_$33_411_258d69d7-2492-4a56-ac26-1d0b91117393}