python使用TensorFlow基于seq2seq实现人工智能中文聊天机器人代码
代码语言:python
所属分类:人工智能
代码描述:python使用TensorFlow基于seq2seq实现人工智能中文聊天机器人代码,首先对问答的数据进行训练,然后根据生成的模型进行预测问答。
代码标签: python TensorFlow seq2seq 人工 智能 中文 聊天 机器人 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
# coding:utf-8 import sys import numpy as np import tensorflow as tf from tensorflow.contrib.legacy_seq2seq.python.ops import seq2seq from numpy import unicode import jieba import random class WordToken(object): def __init__(self): # 最小起始id号, 保留的用于表示特殊标记 self.START_ID = 4 self.word2id_dict = {} self.id2word_dict = {} def load_file_list(self, file_list, min_freq): """ 加载样本文件列表,全部切词后统计词频,按词频由高到低排序后顺次编号 并存到self.word2id_dict和self.id2word_dict中 """ words_count = {} for file in file_list: with open(file, 'r',encoding='utf-8') as file_object: for line in file_object.readlines(): line = line.strip() seg_list = jieba.cut(line) for str in seg_list: if str in words_count: words_count[str] = words_count[str] + 1 else: words_count[str] = 1 sorted_list = [[v[1], v[0]] for v in words_count.items()] sorted_list.sort(reverse=True) for index, item in enumerate(sorted_list): word = item[1] if item[0] < min_freq: break self.word2id_dict[word] = self.START_ID + index self.id2word_dict[self.START_ID + index] = word return index def word2id(self, word): if not isinstance(word, unicode): print ("Exception: error word not unicode") sys.exit(1) if word in self.word2id_dict: return self.word2id_dict[word] else: return None def id2word(self, id): id = int(id) if id in self.id2word_dict: return self.id2word_dict[id] else: return None # 输入序列长度 input_seq_len = 5 # 输出序列长度 output_seq_len = 5 # 空值填充0 PAD_ID = 0 # 输出序列起始标记 GO_ID = 1 # 结尾标记 EOS_ID = 2 # LSTM神经元size size = 8 # 初始学习率 init_learning_rate = 1 # 在样本中出现频率超过这个值才会进入词表 min_freq = 1 wordToken = WordToken() # 放在全局的位置,为了动态算出num_encoder_symbols和num_decoder_symbols max_token_id = wordToken.load_file_list(['/data/wwwroot/default/dataset/ask/question', '/data/wwwroot/default/dataset/ask/answer'], min_freq) num_encoder_symbols = max_token_id + 5 num_decoder_symbols = max_token_id + 5 def get_id_list_from(sentence): sentence_id_list = [] seg_list = jieba.cut(sentence) print(seg_list) for str in seg_list: id = wordToken.word2id(str) if id: sentence_id_list.append(wordToken.word2id(str)) return sentence_id_list def get_train_set(): global num_encoder_symbols, num_decoder_symbols train_set = [] with open('/data/wwwroot/default/dataset/ask/question', 'r', encoding='utf-8') as question_file: with open('/data/wwwroot/default/dataset/ask/answer', 'r', encoding='utf-8') as answer_file: while True: question = question_file.readline() answer = answer_file.readline() if question and answer: question = question.strip() answer = answer.strip() question_id_list = get_id_list_from(question) answer_id_list = get_id_list_from(answer) if len(question_id_list) > 0 and len(answer_id_list) > 0: answer_id_list.append(EOS_ID) train_set.append([question_id_list, answer_id_list]) else: break return train_set def get_samples(train_set, batch_num): raw_encoder_input = [] raw_decoder_input = [] if batch_num >= len(train_set): batch_train_set = train_set else: random_start = random.randint(0, len(train_set)-batch_num) batch_train_set = train_set[random_start:random_start+batch_num] for sample in batch_train_set: raw_encoder_input.append([PAD_ID] * (input_seq_len - len(sample[0])) + sample[0]) raw_decoder_input.append([GO_ID] + sample[1] + [PAD_ID] * (output_seq_len - len(sample[1]) - 1)) encoder_inputs = [] decoder_inputs = [] target_weights = [] for length_idx in range(input_seq_len): encoder_inputs.append(np.array([encoder_input[length_idx] for encoder_input in raw_encoder_input], dtype=np.int32)) for length_idx in range(output_seq_len): decoder_inputs.append(np.array([decoder_input[length_idx] for decoder_input in raw_decoder_input], dtype=np.int32)) target_weights.append(np.array([ 0.0 if length_idx == output_seq_len - 1 or decoder_input[length_idx] == PAD_ID else 1.0 for decoder_input in raw_decoder_input ], dtype=np.float32)) return encoder_inputs, decoder_inputs, target_weights def seq_to_encoder(input_seq): """从输入空格分隔的数字id串,转成预测用的encoder、decoder、target_weight等 """ input_seq_array = [int(v) for v in input_seq.split()] encoder_input = [PAD_ID] * (input_seq_len - len(input_seq_array)) + input_seq_array decoder_input = [GO_ID] + [PAD_ID] * (output_seq_len - 1) encoder_inputs = [np.array([v], dtype=np.int32) for v in encoder_input] decoder_inputs = [np.array([v], dtype=np.int32) for v in decoder_input] target_weights = [np.array([1.0], dtype=np.float32)] * output_seq_len return encoder_inputs, decoder_inputs, target_weights def get_model(feed_previous=False): """构造模型 """ learning_rate = tf.Variable(float(init_learning_rate), trainable=False, dtype=tf.float32) learning_rate_decay_op = learning_rate.assign(learning_rate * 0.9) .........完整代码请登录后点击上方下载按钮下载查看
网友评论0