python执行json工作流输出每一步的结果示例代码
代码语言:python
所属分类:其他
代码描述:python执行json工作流输出每一步的结果示例代码
代码标签: python 执行 json 工作流 输出 每一步 结果 示例 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
import json
import requests
class WorkflowExecutor:
def __init__(self, workflow_json):
self.workflow = workflow_json
self.node_outputs = {}
self.executed_nodes = set() # 跟踪已执行的节点
def execute_node(self, node):
node_type = node['type']
node_config = node['config']
node_id = str(node['id'])
# 如果节点已执行过,直接返回结果
if node_id in self.node_outputs:
return self.node_outputs[node_id]
if node_type == 'input':
return node_config.get('defaultValue')
elif node_type == 'process':
input_data = self.get_input_data(node)
# 创建一个本地变量来存储处理函数
local_vars = {'input_data': input_data}
# 执行处理函数
exec(node_config['function'], globals(), local_vars)
# 调用处理函数并返回结果
return local_vars['process'](input_data)
elif node_type == 'condition':
input_data = self.ge.........完整代码请登录后点击上方下载按钮下载查看















网友评论0