python搭建一个自带消息队列处理的异步任务http api服务代码
代码语言:python
所属分类:其他
代码描述:python搭建一个自带消息队列处理的异步任务http api服务代码,一个http对外提供消息队列入列,一个线程从消息队列获取消息进行处理,处理完成,异步http通知,消息队列采用文件夹中文件存储,处理完一个消息就删除该消息文件。
代码标签: python 搭建 自带 消息 队列 处理 异步 任务 http api 服务 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
import asyncio import aiohttp import json import threading import queue import logging import os from aiohttp import web from pathlib import Path import uuid import time # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize message queue and storage path message_queue = queue.Queue() messages_dir = Path("messages") # Ensure messages directory exists messages_dir.mkdir(exist_ok=True) async def notify_callback(message, callback_url): """Send callback notification to specified URL""" try: async with aiohttp.ClientSession() as session: # 添加headers确保正确的内容类型 headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } # 打印发送的数据用于调试 print("Sending data:", json.dumps(message)) async with session.post(callback_url, json=message, headers=headers) as response: # 打印响应状态和内容用于调试 print("Response status:", response.status) print("Response content:", await response.text()) return response.status == 200 except Exception as e: logger.error(f"Error sending callback: {e}") return False async def store_message(message): """Store message to individual file""" try: # Generate unique filename filename = f"msg_{uuid.uuid4()}.json" message_path = messages_dir / filename # Write message to file message_path.write_text(json.dumps(message)) message_queue.put((message, message_path)) return True except Exception as e: logger.error(f"Error storing message: {e}") return False async def handle_message(request): """HTTP endpoint to receive messages""" try: .........完整代码请登录后点击上方下载按钮下载查看
网友评论0