python实现持久websocket推送及定时cron服务代码

代码语言:python

所属分类:其他

代码描述:python实现持久websocket推送及定时cron服务代码,通过api进行通讯,支持cron定时服务。

代码标签: python 持久 websocket 推送 定时 cron 服务 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*
#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*
# main.py

import asyncio
import json
import logging
import sqlite3
import uuid
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Optional

import httpx
import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Path, Body, status
from pydantic import BaseModel, Field

# --- 1. 日志配置 ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# --- 2. WebSocket 连接管理器 ---
class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self._lock = asyncio.Lock()

    async def connect(self, websocket: WebSocket) -> str:
        await websocket.accept()
        client_id = str(uuid.uuid4())
        async with self._lock:
            self.active_connections[client_id] = websocket
        logger.info(f"新客户端连接成功: {client_id}")
        return client_id

    async def reconnect(self, client_id: str, websocket: WebSocket) -> str:
        await websocket.accept()
        async with self._lock:
            self.active_connections[client_id] = websocket
        logger.info(f"客户端重连成功: {client_id}")
        return client_id

    async def disconnect(self, client_id: str):
        async with self._lock:
            if self.active_connections.pop(client_id, None):
                logger.info(f"客户端断开连接: {client_id}")

    async def send_json_to_clients(self, data: Any, client_ids: List[str]):
        if not client_ids:
            return
        message = {"type": "push", &.........完整代码请登录后点击上方下载按钮下载查看

网友评论0