python+vue实现多用户权限分配的文件网盘管理系统代码
代码语言:python
所属分类:其他
代码描述:python+vue实现多用户权限分配的文件网盘管理系统代码,支持目录权限设置,文件上传,目录下载,文件预览、重命名、新建目录,删除等操作。
代码标签: python vue 多用户 权限 分配 文件 网盘 管理 系统 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
# cms.py # -*- coding: utf-8 -*- import os import json import shutil import mimetypes import uvicorn import aiofiles import hashlib from typing import List, Dict from fastapi import FastAPI, HTTPException, Depends, Request, Response, UploadFile, File, Form, BackgroundTasks, Header from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, StreamingResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from pydantic import BaseModel, Field # ========== 配置 ========== ROOT_DIR = os.getenv("CMS_ROOT", "./root") USER_DB = os.path.join(ROOT_DIR, ".users.json") os.makedirs(ROOT_DIR, exist_ok=True) # ========== 安全 & 密码哈希 (MD5) ========== security = HTTPBasic() def hash_password(password: str) -> str: """使用 MD5 对密码进行哈希""" return hashlib.md5(password.encode('utf-8')).hexdigest() # ========== Pydantic 模型 ========== class UserPermission(BaseModel): path: str read: bool = True write: bool = False class User(BaseModel): username: str password: str = Field(None, min_length=6) permissions: List[UserPermission] = [] class UserInDB(User): hashed_password: str # ========== 用户管理 ========== def load_users() -> Dict[str, UserInDB]: if not os.path.exists(USER_DB): admin_pass_hash = hash_password("123456") default_users = { "admin": { "username": "admin", "password": "123456", "hashed_password": admin_pass_hash, "permissions": [{"path": "/", "read": True, "write": True}] } } json.dump(default_users, open(USER_DB, "w", encoding="utf-8"), ensure_ascii=False, indent=2) return {"admin": UserInDB(**default_users["admin"])} with open(USER_DB, "r", encoding="utf-8") as f: users_data = json.load(f) processed_users = {} for k, v in users_data.items(): if "permissions" not in v: v["permissions"] = [{"path": "/", "read": v.get("read", True), "write": v.get("write", True)}] v.pop("read", None); v.pop("write", None); v.pop("password", None) processed_users[k] = UserInDB(**v) return processed_users def save_users(users: Dict[str, UserInDB]): user_dict = { k: v.dict(exclude={'password'}) for k, v in users.items() } with open(USER_DB, "w", encoding="utf-8") as f: json.dump(user_dict, f, ensure_ascii=False, indent=2) users = load_users() def get_current_user(creds: HTTPBasicCredentials = Depends(security)) -> UserInDB: user = users.get(creds.username) if not user or hash_password(creds.password) != user.hashed_password: raise HTTPException(status_code=401, detail="用户名或密码错误", headers={"WWW-Authenticate": "Basic"}) return user def get_admin_user(current_user: UserInDB = Depends(get_current_user)) -> UserInDB: if current_user.username != 'admin': raise HTTPException(status_code=403, detail="需要管理员权限") return current_user # ========== 权限检查核心逻辑 ========== def get_permission_for_path(user: UserInDB, path: str) -> UserPermission: norm_req_path = os.path.normpath(path).replace("\\", "/") if not norm_req_path.startswith('/'): norm_req_path = '/' + norm_req_path best_match, longest_match_len = None, -1 for p in user.permissions: norm_perm_path = os.path.normpath(p.path).replace("\\", "/") if not norm_perm_path.startswith('/'): norm_perm_path = '/' + norm_perm_path if norm_req_path.startswith(norm_perm_path) and len(norm_perm_path) > longest_match_len: longest_match_len = len(norm_perm_path) best_match = p return best_match def check_permission(user: UserInDB, path: str, needs_write: bool = False): permission = get_permission_for_path(user, path) if not permission or not permission.read: raise HTTPException(status_code=403, detail="禁止访问:无读取权限") if needs_write and not permission.write: raise HTTPException(status_code=403, detail="禁止访问:无写入权限") return True # ========== 工具函数 ========== def secure_path(path: str) -> str: base_dir = os.path.abspath(ROOT_DIR) target_path = os.path.join(base_dir, path.lstrip('/\\')) real_path = os.path.abspath(target_path) if not real_path.startswith(base_dir): raise HTTPException(status_code=403, detail="禁止访问:路径越界") return real_path # ========== API ========== app = FastAPI(title="文件管理系统", version="4.3") # Version bump for the fix @app.get("/", response_class=HTMLResponse) async def index(): return HTMLResponse(content=INDEX_HTML, status_code=200) @app.post("/api/login") def login(current_user: UserInDB = Depends(get_current_user)): return {"username": current_user.username, "permissions": current_user.permissions} @app.get("/api/users", response_model=List[User]) def list_users_admin(admin: UserInDB = Depends(get_admin_user)): return [u for u in users.values()] @app.post("/api/users", response_model=User) def create_user_admin(user_form: User, admin: UserInDB = Depends(get_admin_user)): if user_form.username in users: raise HTTPException(status_code=400, detail="用户已存在") if not user_form.password: raise HTTPException(status_code=400, detail="密码不能为空") new_user = UserInDB(username=user_form.username, hashed_password=hash_password(user_form.password), permissions=user_form.permissions) users[new_user.username] = new_user save_users(users) return new_user @app.put("/api/users/{username}", response_model=User) def update_user_admin(username: str, user_form: User, admin: UserInDB = Depends(get_admin_user)): target_user = users.get(username) if not target_user: raise HTTPException(status_code=404, detail="用户不存在") if user_form.password: target_user.hashed_password = hash_password(user_form.password) target_user.permissions = user_form.permissions users[username] = target_user save_users(users) return target_user @app.delete("/api/users/{username}") def delete_user_admin(username: str, admin: UserInDB = Depends(get_admin_user)): if username == "admin": raise HTTPException(status_code=400, detail="不能删除管理员账户") if username not in users: raise HTTPException(status_code=404, detail="用户不存在") del users[username] save_users(users) return {"msg": f"用户 {username} 删除成功"} @app.get("/api/list") def list_dir(path: str = "/", current_user: UserInDB = Depends(get_current_user)): check_permission(current_user, path, needs_write=False) abs_path = secure_path(path) if not os.path.isdir(abs_path): raise HTTPException(status_code=400, detail="目标不是一个目录") items = [] for name in sorted(os.listdir(abs_path), key=lambda x: (not os.path.isdir(os.path.join(abs_path, x)), x.lower())): try: full_path = os.path.join(abs_path, name) rel_path = os.path.relpath(full_path, ROOT_DIR).replace("\\", "/") if not rel_path.startswith('/'): rel_path = '/' + rel_path if get_permission_for_path(current_user, rel_path): is_dir = os.path.isdir(full_path) items.append({"name": name, "type": "dir" if is_dir else "file", "size": os.path.getsize(full_path) if not is_dir else 0, "path": rel_path, "mime": mimetypes.guess_type(full_path)[0] or ""}) except OSError: continue return items @app.post("/api/mkdir") def make_dir(path: str, name: s.........完整代码请登录后点击上方下载按钮下载查看
网友评论0