python+redis实现自动化json流程数据自动执行完成代码
代码语言:python
所属分类:其他
代码描述:python+redis实现自动化json流程数据自动执行完成代码,定义个json流程,输入数据,流程自动运行完成,每一个节点运行状态都有记录,当前执行哪个节点也有记录。
代码标签: python redis 自动化 json 流程 数据 自动 执行 完成 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
import json
import redis
from datetime import datetime
import uuid
class WorkflowQueue:
def __init__(self, redis_host='localhost', redis_port=6379, redis_db=0):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.queue_key = 'workflow:queue'
def create_workflow(self, input_data, nodes):
"""创建新的工作流"""
workflow = {
'workflow_id': str(uuid.uuid4()),
'input_data': input_data,
'nodes': nodes,
'current_node': nodes[0]['node_id'],
'pre_node_data': input_data,
'status': 'pending',
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
self.redis_client.rpush(self.q.........完整代码请登录后点击上方下载按钮下载查看















网友评论0