python执行json工作流输出每一步的结果示例代码
代码语言:python
所属分类:其他
代码描述:python执行json工作流输出每一步的结果示例代码
代码标签: python 执行 json 工作流 输出 每一步 结果 示例 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
import json import requests class WorkflowExecutor: def __init__(self, workflow_json): self.workflow = workflow_json self.node_outputs = {} self.executed_nodes = set() # 跟踪已执行的节点 def execute_node(self, node): node_type = node['type'] node_config = node['config'] node_id = str(node['id']) # 如果节点已执行过,直接返回结果 if node_id in self.node_outputs: return self.node_outputs[node_id] if node_type == 'input': return node_config.get('defaultValue') elif node_type == 'process': input_data = self.get_input_data(node) # 创建一个本地变量来存储处理函数 local_vars = {'input_data': input_data} # 执行处理函数 exec(node_config['function'], globals(), local_vars) # 调用处理函数并返回结果 return local_vars['process'](input_data) elif node_type == 'condition': input_data = self.get_input_data(node) try: # 尝试将输入转换为数字,如果可能的话 if isinstance(input_data, str) and input_data.replace('.', '', 1).isdigit(): if input_data.isdigit(): numeric_input = int(input_data) else: numeric_input = float(input_data) condition_result = eval(node_config['condition'].replace('input', 'numeric_input')) else: condition_result = eval(node_config['condition'], {'input': input_data}) # 返回一个元组,包含条件结果和原始输入数据 return (condition_re.........完整代码请登录后点击上方下载按钮下载查看
网友评论0