python搭建一个自带消息队列处理的异步任务http api服务代码

代码语言:python

所属分类:其他

代码描述:python搭建一个自带消息队列处理的异步任务http api服务代码,一个http对外提供消息队列入列,一个线程从消息队列获取消息进行处理,处理完成,异步http通知,消息队列采用文件夹中文件存储,处理完一个消息就删除该消息文件。

代码标签: python 搭建 自带 消息 队列 处理 异步 任务 http api 服务 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

import asyncio
import aiohttp
import json
import threading
import queue
import logging
import os
from aiohttp import web
from pathlib import Path
import uuid
import time

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize message queue and storage path
message_queue = queue.Queue()
messages_dir = Path("messages")

# Ensure messages directory exists
messages_dir.mkdir(exist_ok=True)

async def notify_callback(message, callback_url):
    """Send callback notification to specified URL"""
    try:
        async with aiohttp.ClientSession() as session:
            # 添加headers确保正确的内容类型
            headers = {
                'Content-Type': 'application/json',
                'Accept': 'application/json'
            }
            # 打印发送的数据用于调试
            print("Sending data:", json.dumps(message))
            
            async with session.post(callback_url, 
                                  json=message,
                                  headers=headers) as response:
                # 打印响应状态和内容用于调试
                print("Response status:", response.status)
                print("Response content:", await response.text())
                return response.status == 200
    except Exception as e:
        logger.error(f"Error sending callback: {e}")
        return False

async def store_message(message):
    """Store message to individual file"""
    try:
        # Generate unique filename
        filename = f"msg_{uuid.uuid4()}.json"
        message_path = messages_dir / filename
        
        # Write message to file
        message_path.write_text(json.dumps(message))
        message_queue.put((message, message_path))
        return True
    except Exception as e:
        logger.error(f"Error storing message: {e}")
        return False

async def handle_message(request):
    """HTTP endpoint to receive messages"""
    try:
      .........完整代码请登录后点击上方下载按钮下载查看

网友评论0