python+redis实现有状态的任务编排步骤执行恢复代码

代码语言:python

所属分类:其他

代码描述:python+redis实现有状态的任务编排步骤执行恢复代码,将执行的步骤写到redis中,每执行一步就会记录步骤,一旦断点,会继续执行上次的步骤,适合任务编排工作流。

代码标签: python redis实 有状态 任务 编排 步骤 执行 恢复 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

import redis
import json
import threading
import os
import time
import signal
import sys
from enum import Enum
from typing import Dict, List, Callable
from dataclasses import dataclass

# 任务状态枚举
class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    FAILED = "failed"

# 任务步骤数据类
@dataclass
class TaskStep:
    name: str
    handler: Callable
    next_steps: List[str] = None
    retry_count: int = 3
    
# 任务执行器类
class TaskExecutor:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
        self.steps: Dict[str, TaskStep] = {}
        self.running = True
        self._lock = threading.Lock()
        
    def register_step(self, name: str, handler: Callable, next_steps: List[str] = None):
        """注册任务步骤"""
        self.steps[name] = TaskStep(name=name, handler=handler, next_steps=next_steps)
        
    def create_task(self, task_id: str, start_step: str, data: dict = None):
        """创建新任务"""
        task_info = {
            'id': task_id,
            'current_step': start_step,
            'status': TaskStatus.PENDING.value,
            'data': data or {},.........完整代码请登录后点击上方下载按钮下载查看

网友评论0