python+redis实现自动化json流程数据自动执行完成代码
代码语言:python
所属分类:其他
代码描述:python+redis实现自动化json流程数据自动执行完成代码,定义个json流程,输入数据,流程自动运行完成,每一个节点运行状态都有记录,当前执行哪个节点也有记录。
代码标签: python redis 自动化 json 流程 数据 自动 执行 完成 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
import json import redis from datetime import datetime import uuid class WorkflowQueue: def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0): self.redis_client = redis.Redis( host=redis_host, port=redis_port, db=redis_db, decode_responses=True ) self.queue_key = 'workflow:queue' def create_workflow(self, input_data, nodes): """创建新的工作流""" workflow = { 'workflow_id': str(uuid.uuid4()), 'input_data': input_data, 'nodes': nodes, 'current_node': nodes[0]['node_id'], 'pre_node_data': input_data, 'status': 'pending', 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } self.redis_client.rpush(self.q.........完整代码请登录后点击上方下载按钮下载查看
网友评论0