python实现持久websocket推送及定时cron服务代码
代码语言:python
所属分类:其他
代码描述:python实现持久websocket推送及定时cron服务代码,通过api进行通讯,支持cron定时服务。
代码标签: python 持久 websocket 推送 定时 cron 服务 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*
#!/usr/local/python3/bin/python3
# -*- coding: utf-8 -*
# main.py
import asyncio
import json
import logging
import sqlite3
import uuid
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Optional
import httpx
import uvicorn
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Path, Body, status
from pydantic import BaseModel, Field
# --- 1. 日志配置 ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- 2. WebSocket 连接管理器 ---
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket) -> str:
await websocket.accept()
client_id = str(uuid.uuid4())
async with self._lock:
self.active_connections[client_id] = websocket
logger.info(f"新客户端连接成功: {client_id}")
return client_id
async def reconnect(self, client_id: str, websocket: WebSocket) -> str:
await websocket.accept()
async with self._lock:
self.active_connections[client_id] = websocket
logger.info(f"客户端重连成功: {client_id}")
return client_id
async def disconnect(self, client_id: str):
async with self._lock:
if self.active_connections.pop(client_id, None):
logger.info(f"客户端断开连接: {client_id}")
async def send_json_to_clients(self, data: Any, client_ids: List[str]):
if not client_ids:
return
message = {"type": "push", &.........完整代码请登录后点击上方下载按钮下载查看















网友评论0