python+redis实现自动化json流程数据自动执行完成代码

代码语言:python

所属分类:其他

代码描述:python+redis实现自动化json流程数据自动执行完成代码,定义个json流程,输入数据,流程自动运行完成,每一个节点运行状态都有记录,当前执行哪个节点也有记录。

代码标签: python redis 自动化 json 流程 数据 自动 执行 完成 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

import json
import redis
from datetime import datetime
import uuid

class WorkflowQueue:
    def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=True
        )
        self.queue_key = 'workflow:queue'
        
    def create_workflow(self, input_data, nodes):
        """创建新的工作流"""
        workflow = {
            'workflow_id': str(uuid.uuid4()),
            'input_data': input_data,
            'nodes': nodes,
            'current_node': nodes[0]['node_id'],
            'pre_node_data': input_data,
            'status': 'pending',
            'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        
        self.redis_client.rpush(self.q.........完整代码请登录后点击上方下载按钮下载查看

网友评论0