python+redis实现有状态的任务编排步骤执行恢复代码
代码语言:python
所属分类:其他
代码描述:python+redis实现有状态的任务编排步骤执行恢复代码,将执行的步骤写到redis中,每执行一步就会记录步骤,一旦断点,会继续执行上次的步骤,适合任务编排工作流。
代码标签: python redis实 有状态 任务 编排 步骤 执行 恢复 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
import redis import json import threading import os import time import signal import sys from enum import Enum from typing import Dict, List, Callable from dataclasses import dataclass # 任务状态枚举 class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" PAUSED = "paused" COMPLETED = "completed" FAILED = "failed" # 任务步骤数据类 @dataclass class TaskStep: name: str handler: Callable next_steps: List[str] = None retry_count: int = 3 # 任务执行器类 class TaskExecutor: def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0): self.redis = redis.Redis(host=redis_host, port=redis_port, db=redis_db) self.steps: Dict[str, TaskStep] = {} self.running = True self._lock = threading.Lock() def register_step(self, name: str, handler: Callable, next_steps: List[str] = None): """注册任务步骤""" self.steps[name] = TaskStep(name=name, handler=handler, next_steps=next_steps) def create_task(self, task_id: str, start_step: str, data: dict = None): """创建新任务""" task_info = { 'id': task_id, 'current_step': start_step, 'status': TaskStatus.PENDING.value, 'data': data or {},.........完整代码请登录后点击上方下载按钮下载查看
网友评论0