python实现持久websocket推送及定时cron服务代码
代码语言:python
所属分类:其他
代码描述:python实现持久websocket推送及定时cron服务代码,通过api进行通讯,支持cron定时服务。
代码标签: python 持久 websocket 推送 定时 cron 服务 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
#!/usr/local/python3/bin/python3 # -*- coding: utf-8 -* #!/usr/local/python3/bin/python3 # -*- coding: utf-8 -* # main.py import asyncio import json import logging import sqlite3 import uuid from contextlib import asynccontextmanager from typing import Dict, Any, List, Optional import httpx import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Path, Body, status from pydantic import BaseModel, Field # --- 1. 日志配置 --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # --- 2. WebSocket 连接管理器 --- class ConnectionManager: def __init__(self): self.active_connections: Dict[str, WebSocket] = {} self._lock = asyncio.Lock() async def connect(self, websocket: WebSocket) -> str: await websocket.accept() client_id = str(uuid.uuid4()) async with self._lock: self.active_connections[client_id] = websocket logger.info(f"新客户端连接成功: {client_id}") return client_id async def reconnect(self, client_id: str, websocket: WebSocket) -> str: await websocket.accept() async with self._lock: self.active_connections[client_id] = websocket logger.info(f"客户端重连成功: {client_id}") return client_id async def disconnect(self, client_id: str): async with self._lock: if self.active_connections.pop(client_id, None): logger.info(f"客户端断开连接: {client_id}") async def send_json_to_clients(self, data: Any, client_ids: List[str]): if not client_ids: return message = {"type": "push", &.........完整代码请登录后点击上方下载按钮下载查看
网友评论0