python+vue实现多用户权限分配的文件网盘管理系统代码

代码语言:python

所属分类:其他

代码描述:python+vue实现多用户权限分配的文件网盘管理系统代码,支持目录权限设置,文件上传,目录下载,文件预览、重命名、新建目录,删除等操作。

代码标签: python vue 多用户 权限 分配 文件 网盘 管理 系统 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

# cms.py
# -*- coding: utf-8 -*-
import os
import json
import shutil
import mimetypes
import uvicorn
import aiofiles
import hashlib
from typing import List, Dict

from fastapi import FastAPI, HTTPException, Depends, Request, Response, UploadFile, File, Form, BackgroundTasks, Header
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, StreamingResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from pydantic import BaseModel, Field

# ========== 配置 ==========
ROOT_DIR = os.getenv("CMS_ROOT", "./root")
USER_DB  = os.path.join(ROOT_DIR, ".users.json")
os.makedirs(ROOT_DIR, exist_ok=True)

# ========== 安全 & 密码哈希 (MD5) ==========
security = HTTPBasic()

def hash_password(password: str) -> str:
    """使用 MD5 对密码进行哈希"""
    return hashlib.md5(password.encode('utf-8')).hexdigest()

# ========== Pydantic 模型 ==========
class UserPermission(BaseModel):
    path: str
    read: bool = True
    write: bool = False

class User(BaseModel):
    username: str
    password: str = Field(None, min_length=6)
    permissions: List[UserPermission] = []

class UserInDB(User):
    hashed_password: str

# ========== 用户管理 ==========
def load_users() -> Dict[str, UserInDB]:
    if not os.path.exists(USER_DB):
        admin_pass_hash = hash_password("123456")
        default_users = { "admin": { "username": "admin", "password": "123456", "hashed_password": admin_pass_hash, "permissions": [{"path": "/", "read": True, "write": True}] } }
        json.dump(default_users, open(USER_DB, "w", encoding="utf-8"), ensure_ascii=False, indent=2)
        return {"admin": UserInDB(**default_users["admin"])}

    with open(USER_DB, "r", encoding="utf-8") as f: users_data = json.load(f)
    
    processed_users = {}
    for k, v in users_data.items():
        if "permissions" not in v: v["permissions"] = [{"path": "/", "read": v.get("read", True), "write": v.get("write", True)}]
        v.pop("read", None); v.pop("write", None); v.pop("password", None)
        processed_users[k] = UserInDB(**v)
    return processed_users

def save_users(users: Dict[str, UserInDB]):
    user_dict = { k: v.dict(exclude={'password'}) for k, v in users.items() }
    with open(USER_DB, "w", encoding="utf-8") as f: json.dump(user_dict, f, ensure_ascii=False, indent=2)

users = load_users()

def get_current_user(creds: HTTPBasicCredentials = Depends(security)) -> UserInDB:
    user = users.get(creds.username)
    if not user or hash_password(creds.password) != user.hashed_password:
        raise HTTPException(status_code=401, detail="用户名或密码错误", headers={"WWW-Authenticate": "Basic"})
    return user

def get_admin_user(current_user: UserInDB = Depends(get_current_user)) -> UserInDB:
    if current_user.username != 'admin': raise HTTPException(status_code=403, detail="需要管理员权限")
    return current_user

# ========== 权限检查核心逻辑 ==========
def get_permission_for_path(user: UserInDB, path: str) -> UserPermission:
    norm_req_path = os.path.normpath(path).replace("\\", "/")
    if not norm_req_path.startswith('/'): norm_req_path = '/' + norm_req_path
    best_match, longest_match_len = None, -1
    for p in user.permissions:
        norm_perm_path = os.path.normpath(p.path).replace("\\", "/")
        if not norm_perm_path.startswith('/'): norm_perm_path = '/' + norm_perm_path
        if norm_req_path.startswith(norm_perm_path) and len(norm_perm_path) > longest_match_len:
            longest_match_len = len(norm_perm_path)
            best_match = p
    return best_match

def check_permission(user: UserInDB, path: str, needs_write: bool = False):
    permission = get_permission_for_path(user, path)
    if not permission or not permission.read: raise HTTPException(status_code=403, detail="禁止访问:无读取权限")
    if needs_write and not permission.write: raise HTTPException(status_code=403, detail="禁止访问:无写入权限")
    return True

# ========== 工具函数 ==========
def secure_path(path: str) -> str:
    base_dir = os.path.abspath(ROOT_DIR)
    target_path = os.path.join(base_dir, path.lstrip('/\\'))
    real_path = os.path.abspath(target_path)
    if not real_path.startswith(base_dir): raise HTTPException(status_code=403, detail="禁止访问:路径越界")
    return real_path

# ========== API ==========
app = FastAPI(title="文件管理系统", version="4.........完整代码请登录后点击上方下载按钮下载查看

网友评论0