import re import json import time import shutil from datetime import datetime, timezone from pathlib import Path from flask import ( Flask, render_template, request, redirect, url_for, send_from_directory, abort, jsonify, session, flash, ) from werkzeug.utils import secure_filename from bs4 import BeautifulSoup import config from helpers import ( allowed_file, load_index, save_index, fix_image_paths, extract_summary, extract_thumbnail, render_markdown, generate_static_page, ) app = Flask(__name__) app.config.from_object(config) # 确保必要的目录存在 for dir_path in [ config.UPLOAD_FOLDER, config.GENERATED_FOLDER, config.POSTS_DATA_FOLDER, config.INDEX_FILE.parent, config.UPLOAD_FOLDER / "editor_uploads", ]: dir_path.mkdir(parents=True, exist_ok=True) # 如果索引文件不存在,创建空列表 if not config.INDEX_FILE.exists(): with open(config.INDEX_FILE, "w", encoding="utf-8") as f: json.dump([], f, ensure_ascii=False, indent=2) @app.context_processor def inject_user_status(): """向所有模板注入登录状态""" return {"logged_in": "user" in session} # ==================== 登录 / 登出相关路由 ==================== @app.route("/login", methods=["GET", "POST"]) def login(): """密码登录(无需用户名)""" if request.method == "GET": return render_template("login.html") password = request.form.get("password", "") if password == config.PASSWORD: session["user"] = True flash("登录成功", "success") return redirect(url_for("index")) else: flash("密码错误", "danger") return render_template("login.html") @app.route("/logout") def logout(): session.pop("user", None) flash("已登出", "info") return redirect(url_for("index")) # ==================== 编辑器相关路由 ==================== @app.route("/editor", methods=["GET", "POST"]) def editor(): """Markdown 编辑器页面:创建新文章(需要登录)""" if "user" not in session: flash("请先登录", "warning") return redirect(url_for("login")) if request.method == "GET": return render_template("editor.html") # POST:保存文章 title = request.form.get("title", "").strip() if not title: flash("标题不能为空", "danger") return redirect(url_for("editor")) md_content = request.form.get("content", "") if not md_content.strip(): flash("内容不能为空", "danger") return redirect(url_for("editor")) # 生成唯一文章 ID post_id = str(int(time.time() * 1000)) # 确保文章数据目录存在 posts_data_dir = config.POSTS_DATA_FOLDER / post_id posts_data_dir.mkdir(parents=True, exist_ok=True) # 保存 Markdown 文件 md_path = posts_data_dir / "content.md" with open(md_path, "w", encoding="utf-8") as f: f.write(md_content) # 修正图片路径(允许 http、/ 开头路径保持原样) fixed_md = fix_image_paths(md_content, post_id) # 渲染 HTML html_body = render_markdown(fixed_md) # 提取摘要与缩略图 summary = extract_summary(html_body) thumbnail = extract_thumbnail(html_body) # 生成日期(精确到分钟) date_iso = datetime.now(timezone.utc).replace(second=0, microsecond=0).isoformat() # 生成静态页面 generate_static_page(post_id, title, html_body, date_iso, thumbnail) # 更新索引 index = load_index() index.append( { "id": post_id, "title": title, "date": date_iso, "summary": summary, "thumbnail": thumbnail, } ) index.sort(key=lambda p: p.get("date", ""), reverse=True) save_index(index) flash("文章发布成功", "success") return redirect(url_for("index")) @app.route("/editor/upload-image", methods=["POST"]) def editor_upload_image(): """处理编辑器中的图片上传,返回 Markdown 图片代码""" if "user" not in session: return jsonify({"success": False, "error": "需要登录"}), 403 if "image" not in request.files: return jsonify({"success": False, "error": "没有选择图片"}), 400 file = request.files["image"] if file.filename == "": return jsonify({"success": False, "error": "文件名为空"}), 400 # 只允许图片类型 if not allowed_file(file.filename): return jsonify({"success": False, "error": "不支持的文件类型"}), 400 # 保存到 editor_uploads 目录 editor_uploads_dir = config.UPLOAD_FOLDER / "editor_uploads" editor_uploads_dir.mkdir(parents=True, exist_ok=True) safe_name = secure_filename(file.filename) timestamp = str(int(time.time() * 1000)) new_name = f"{timestamp}_{safe_name}" file_path = editor_uploads_dir / new_name file.save(str(file_path)) # 生成可访问 URL 和 Markdown 片段 image_url = f"/static/uploads/posts/editor_uploads/{new_name}" markdown_code = f"![{safe_name}]({image_url})" return jsonify( { "success": True, "url": image_url, "markdown": markdown_code, } ) # ==================== 路由 ==================== @app.route("/") def index(): """首页:瀑布流文章列表(展示完整博文)""" posts = load_index() # 按日期倒序排列 posts.sort(key=lambda p: p.get("date", ""), reverse=True) # 为每篇文章获取内容 for post in posts: post_id = post.get("id", "") content = "" # 优先从原始 Markdown 文件渲染内容(即使 generated 文件不存在也能显示) md_path = config.POSTS_DATA_FOLDER / post_id / "content.md" if md_path.exists(): try: with open(md_path, "r", encoding="utf-8") as f: md_content = f.read() fixed_md = fix_image_paths(md_content, post_id) html_body = render_markdown(fixed_md) content = html_body except Exception: content = "" else: # 回退到生成的静态 HTML 文件 generated_path = config.GENERATED_FOLDER / f"{post_id}.html" if generated_path.exists(): try: with open(generated_path, "r", encoding="utf-8") as f: html_content = f.read() soup = BeautifulSoup(html_content, "html.parser") card_summary = soup.find("div", class_="card-summary") if card_summary: content = card_summary.decode_contents() except Exception: content = "" post["content"] = content return render_template("index.html", posts=posts) @app.route("/upload", methods=["GET", "POST"]) def upload(): """上传文章(原始文件上传,需要登录)""" # 登录检查 if "user" not in session: flash("请先登录", "warning") return redirect(url_for("login")) if request.method == "GET": return render_template("upload.html") # POST 处理 title = request.form.get("title", "").strip() if not title: return "标题不能为空", 400 markdown_file = request.files.get("markdown_file") if not markdown_file or markdown_file.filename == "": return "请选择 Markdown 文件", 400 if not allowed_file(markdown_file.filename): return "不支持的文件类型,请上传 .md 文件", 400 # 生成唯一 ID post_id = str(int(time.time() * 1000)) # 创建目录 upload_dir = config.UPLOAD_FOLDER / post_id upload_dir.mkdir(parents=True, exist_ok=True) posts_data_dir = config.POSTS_DATA_FOLDER / post_id posts_data_dir.mkdir(parents=True, exist_ok=True) # 保存 Markdown 文件 md_path = posts_data_dir / "content.md" markdown_file.save(str(md_path)) # 处理图片上传 images = request.files.getlist("images") for img in images: if img and img.filename: filename = secure_filename(img.filename) if filename: img.save(str(upload_dir / filename)) # 读取 Markdown 内容 with open(md_path, "r", encoding="utf-8") as f: md_content = f.read() # 修正图片路径 fixed_md = fix_image_paths(md_content, post_id) # 渲染 HTML html_body = render_markdown(fixed_md) # 提取摘要 summary = extract_summary(html_body) # 提取缩略图 thumbnail = extract_thumbnail(html_body) # 生成日期(精确到分钟) date_iso = datetime.now(timezone.utc).replace(second=0, microsecond=0).isoformat() # 生成静态页面 generate_static_page(post_id, title, html_body, date_iso, thumbnail) # 更新索引 index = load_index() index.append( { "id": post_id, "title": title, "date": date_iso, "summary": summary, "thumbnail": thumbnail, } ) # 按日期倒序排序 index.sort(key=lambda p: p.get("date", ""), reverse=True) save_index(index) return redirect(url_for("index")) @app.route("/post/") def view_post(post_id: str): """查看文章详情""" # 安全验证:只允许字母数字和下划线 if not re.match(r"^[a-zA-Z0-9_]+$", post_id): abort(404) try: return send_from_directory( config.GENERATED_FOLDER, f"{post_id}.html", ) except FileNotFoundError: abort(404) @app.route("/admin") def admin(): """管理页面""" posts = load_index() return render_template("admin.html", posts=posts) @app.route("/admin/delete/", methods=["POST"]) def delete_post(post_id: str): """删除文章""" # 安全验证 if not re.match(r"^[a-zA-Z0-9_]+$", post_id): abort(404) # 删除生成的静态文件 generated_file = config.GENERATED_FOLDER / f"{post_id}.html" if generated_file.exists(): generated_file.unlink() # 删除 posts_data 目录 posts_data_dir = config.POSTS_DATA_FOLDER / post_id if posts_data_dir.exists(): shutil.rmtree(posts_data_dir) # 删除上传的图片目录 upload_dir = config.UPLOAD_FOLDER / post_id if upload_dir.exists(): shutil.rmtree(upload_dir) # 从索引中移除 index = load_index() index = [p for p in index if p["id"] != post_id] save_index(index) return redirect(url_for("admin")) @app.errorhandler(404) def not_found(e): return render_template("404.html"), 404 if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)