python执行json工作流输出每一步的结果示例代码

代码语言:python

所属分类:其他

代码描述:python执行json工作流输出每一步的结果示例代码

代码标签: python 执行 json 工作流 输出 每一步 结果 示例 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

import json
import requests

class WorkflowExecutor:
   
def __init__(self, workflow_json):
        self
.workflow = workflow_json
        self
.node_outputs = {}
        self
.executed_nodes = set()  # 跟踪已执行的节点

   
def execute_node(self, node):
        node_type
= node['type']
        node_config
= node['config']
        node_id
= str(node['id'])
       
       
# 如果节点已执行过,直接返回结果
       
if node_id in self.node_outputs:
           
return self.node_outputs[node_id]
       
       
if node_type == 'input':
           
return node_config.get('defaultValue')
           
       
elif node_type == 'process':
            input_data
= self.get_input_data(node)
           
# 创建一个本地变量来存储处理函数
            local_vars
= {'input_data': input_data}
           
# 执行处理函数
           
exec(node_config['function'], globals(), local_vars)
           
# 调用处理函数并返回结果
           
return local_vars['process'](input_data)
           

       
elif node_type == 'condition':
            input_data
= self.get_input_data(node)
           
try:
               
# 尝试将输入转换为数字,如果可能的话
               
if isinstance(input_data, str) and input_data.replace('.', '', 1).isdigit():
                   
if input_data.isdigit():
                        numeric_input
= int(input_data)
                   
else:
                        numeric_input
= float(input_data)
                    condition_result
= eval(node_config['condition'].replace('input', 'numeric_input'))
               
else:
                    condition_result
= eval(node_config['condition'], {'input': input_data})
               
               
# 返回一个元组,包含条件结果和原始输入数据
               
return (condition_re.........完整代码请登录后点击上方下载按钮下载查看

网友评论0