python+html实现简洁美观的跨平台文件管理网盘系统代码
代码语言:python
所属分类:其他
代码描述:python+html实现简洁美观的跨平台文件管理网盘系统代码,html采用vue实现。
代码标签: python html 简洁 美观 跨平台 文件 管理 网盘 系统 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
#!/usr/local/python3/bin/python3 # -*- coding: utf-8 -* import os import shutil import zipfile import mimetypes import tempfile from pathlib import Path from datetime import datetime import uvicorn from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from typing import List, Optional # --- 配置 --- # 警告: 为了安全,请在生产环境中使用绝对路径,例如 "/path/to/your/files"。 BASE_DIR = Path(__file__).resolve().parent / "file_root" BASE_DIR.mkdir(exist_ok=True) # 确保根目录存在 app = FastAPI() # --- 安全性辅助函数 --- def secure_path(path_str: str) -> Path: """ 防止路径遍历攻击。将用户提供的路径转换为相对于BASE_DIR的安全路径。 """ # 规范化路径,移除 '..' 和 '.' # lstrip('/') 防止用户输入绝对路径 normalized_path = os.path.normpath(path_str.lstrip('/\\')) # 路径中不能包含 '..' if '..' in normalized_path.split(os.sep): raise HTTPException(status_code=403, detail="Permission Denied: Invalid path component '..'") # 最终路径 full_path = BASE_DIR.joinpath(normalized_path).resolve() # 检查最终路径是否在BASE_DIR下 # 使用 startswith 检查字符串形式的路径,这是最可靠的方法 if not str(full_path).startswith(str(BASE_DIR.resolve())): raise HTTPException(status_code=403, detail="Permission Denied: Path traversal attempt") return full_path # --- API 端点 --- # 挂载静态文件目录用于预览 app.mount("/view", StaticFiles(directory=BASE_DIR), name="view") @app.get("/api/files") async def list_files(path: str = "."): """列出指定路径下的文件和文件夹""" target_path = secure_path(path) if not target_path.exists() or not target_path.is_dir(): raise HTTPException(status_code=404, detail="Directory not found") items = [] for item in sorted(target_path.iterdir(), key=lambda p: (not p.is_dir(), p.name.lower())): stat = item.stat() items.append({ "name": item.name, "path": os.path.join(path, item.name).replace('\\', '/'), "is_dir": item.is_dir(), "size": stat.st_size, "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), }) return items @app.post("/api/upload") async def upload_files( files: List[UploadFile] = File(...), relative_paths: List[str] = Form(...), path: str = Form("."), extract_zip: bool = Form(False) ): """上传文件,支持多文件、文件夹上传和解压zip""" target_dir = secure_path(path) if not target_dir.is_dir(): raise HTTPException(status_code=400, detail="Target path is not a directory") if len(files) != len(relative_paths): raise HTTPException(status_code=400, detail="Mismatch between files and relative paths count.") for file, relative_path in zip(files, relative_paths): try: # 安全检查 relative_path if ".." in relative_path: # 跳过不安全的文件 print(f"Skipping potentially unsafe path: {relative_path}") continue # 构建完整的目标文件路径 file_destination = secure_path(os.path.join(path, relative_path)) # 创建父目录 file_destination.parent.mkdir(parents=True, exist_ok=True) # 写入文件 with file_destination.open("wb") as buffer: shutil.copyfileobj(file.file, buffer) # 如果需要,解压zip文件 if extract_zip and file.filename.endswith('.zip'): zip_target_dir = file_destination.parent with zipfile.ZipFile(file_destination, 'r') as zip_ref: zip_ref.extractall(zip_target_dir) os.remove(file_destination) # 删除原始zip文件 except Exception as e: # 在循环中,一个文件失败不应阻止其他文件 print(f"Could not upload file: {file.filename}. Error: {e}") # 可以选择在这里收集错误信息并返回给前端 finally: file.file.close() return {"message": f"{len(files)} files processed in upload to '{path}'"} @app.post("/api/create_folder") async def create_folder(path: str = Form(...), folder_name: str = Form(...)): """创建新文件夹""" if not folder_name or ".." in folder_name or "/" in folder_name or "\\" in folder_name: raise HTTPException(status_code=400, detail="Invalid folder name") target_path = secure_path(path) / folder_name if target_path.exists(): raise HTTPException(status_code=409, detail="Folder or file with this name already exists") target_path.mkdir() return {"message": "Folder created successfully"} @app.post("/api/delete") async def delete_items(items: List[str]): """删除文件或文件夹""" if not items: raise HTTPException(status_code=400, detail="No items selected for deletion") deleted_count = 0 errors = [] for item_path_str in items: item_path = secure_path(item_path_str) try: if item_path.is_dir(): shutil.rmtree(item_path) else: item_path.unlink() deleted_count += 1 except Exception as e: errors.append(f"Error deleting {item_path.name}: {e}") if errors: raise HTTPException(status_code=500, detail=". ".join(errors)) return {"message": f"{deleted_count} items deleted successfully"} @app.post("/api/rename") async def rename_item(old_path: str = Form(...), new_name: str = Form(...)): """重命名文件或文件夹""" if not new_name or ".." in new_name or "/" in new_name or "\\" in new_name: raise HTTPException(status_code=400, detail="Invalid new name") old_item_path = secure_path(old_path) new_item_path = old_item_path.parent / new_name if not old_item_path.exists(): raise HTTPException(status_code=404, detail="Item not found") if new_item_path.exists(): raise HTTPException(status_code=409, detail="An item with the new name already exists") old_item_path.rename(new_item_path) return {"message": "Item renamed successfully"} @app.get("/api/download") async def download_item(path: str): """下载文件或将文件夹压缩后下载""&.........完整代码请登录后点击上方下载按钮下载查看
网友评论0