#coding=utf-8
#Version:python3.6.0
#Tools:Pycharm 2017.3.2
import numpy as np
import tensorflow as tf
import re
TRAIN_PATH="data/ptb.train.txt"
EVAL_PATH="data/ptb.valid.txt"
TEST_PATH="data/ptb.test.txt"
HIDDEN_SIZE=300
NUM_LAYERS=2
VOCAB_SIZE=10000
TRAIN_BATCH_SIZE=20
TRAIN_NUM_STEP=35
EVAL_BATCH_SIZE=1
EVAL_NUM_STEP=1
NUM_EPOCH=5
LSTM_KEEP_PROB=0.9
EMBEDDING_KEEP_PROB=0.9
MAX_GRED_NORM=5
SHARE_EMB_AND_SOFTMAX=True
class PTBModel(object):
def init(self,is_training,batch_size,num_steps):
self.batch_size=batch_size
self.num_steps=num_steps
self.input_data=tf.placeholder(tf.int32,[batch_size,num_steps])
self.targets=tf.placeholder(tf.int32,[batch_size,num_steps])
dropout_keep_prob=LSTM_KEEP_PROB if is_training else 1.0
lstm_cells=[
tf.nn.rnn_cell.DropoutWrapper(tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE),
output_keep_prob=dropout_keep_prob)
for _ in range (NUM_LAYERS)]
cell=tf.nn.rnn_cell.MultiRNNCell(lstm_cells)
self.initial_state=cell.zero_state(batch_size,tf.float32)
embedding=tf.get_variable("embedding",[VOCAB_SIZE,HIDDEN_SIZE])
inputs=tf.nn.embedding_lookup(embedding,self.input_data)
if is_training:
inputs=tf.nn.dropout(inputs,EMBEDDING_KEEP_PROB)
outputs=[]
state=self.initial_state
with tf.variable_scope("RNN"):
for time_step in range(num_steps):
if time_step>0:tf.get_variable_scope().reuse_variables()
cell_output,state=cell(inputs[:,time_step,:],state)
outputs.append(cell_output)
把输出队列展开成[batch,hidden_size*num_steps]的形状,然后再reshape成[batch*numsteps,hidden_size]的形状
output=tf.reshape(tf.concat(outputs,1),[-1,HIDDEN_SIZE])
if SHARE_EMB_AND_SOFTMAX:
weight=tf.transpose(embedding)
else:
weight=tf.get_variable("weight",[HIDDEN_SIZE,VOCAB_SIZE])
bias=tf.get_variable("bias",[VOCAB_SIZE])
logits=tf.matmul(output,weight)+bias
loss=tf.nn.sparse_softmax_cross_entropy_with_logits(
labels=tf.reshape(self.targets,[-1]),
logits=logits
)
self.cost=tf.reduce_sum(loss)/batch_size
self.final_state=state
# 只在训练模型时定义反向传播操作
if not is_training:return
trainable_variables=tf.trainable_variables()
#控制梯度大小
grads,_=tf.clip_by_global_norm(
tf.gradients(self.cost,trainable_variables),MAX_GRED_NORM)
# 定义优化方法
optimizer=tf.train.GradientDescentOptimizer(learning_rate=1.0)
# zip() 函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的对象,这样做的好处是节约了不少的内存。
#定义训练步骤
self.train_op=optimizer.apply_gradients(
zip(grads,trainable_variables))
def run_epoch(session,model,batches,train_op,output_log,step):
total_costs=0.0
iters=0
state=session.run(model.initial_state)
for x,y in batches:
cost,state,_=session.run(
[model.cost,model.final_state,train_op],
{model.input_data:x,model.targets:y,
model.initial_state:state}
)
total_costs+=cost
iters+=model.num_steps
# 只有在训练时输出日志
if output_log and step %100==0:
print("After %d steps,perplexity is %.3f"%(
step,np.exp(total_costs/iters)
))
step +=1
return step,np.exp(total_costs/iters)
# 从文件中读取数据,并返回包含单词编号的数组
def read_data(file_path):
with open(file_path,"r") as fin:
id_string=" ".join([line.strip() for line in fin.readlines()])
id_list=[int(w) for w in id_string.split()] # 将读取的单词编号转为整数
return id_list
def make_batches(id_list,batch_size,num_step):
# 计算总的batch数量,每个batch包含的单词数量是batch_size*num_step try:
num_batches=(len(id_list)-1)/(batch_size*num_step)
data=np.array(id_list[:num_batches*batch_size*num_step])
data=np.reshape(data,[batch_size,num_batches*num_step])
data_batches=np.split(data,num_batches,axis=1)
label=np.array(id_list[1:num_batches*batch_size*num_step+1])
label=np.reshape(label,[batch_size,num_batches*num_step])
label_batches=np.split(label,num_batches,axis=1)
return list(zip(data_batches,label_batches))
def main():
# 定义初始化函数
intializer=tf.random_uniform_initializer(-0.05,0.05)
with tf.variable_scope("language_model",reuse=None,initializer=intializer):
train_model=PTBModel(True,TRAIN_BATCH_SIZE,TRAIN_NUM_STEP)
with tf.variable_scope("language_model",reuse=True,initializer=intializer):
eval_model=PTBModel(False,EVAL_BATCH_SIZE,EVAL_NUM_STEP)
with tf.Session() as session:
tf.global_variables_initializer().run()
train_batches=make_batches(read_data(TRAIN_PATH),TRAIN_BATCH_SIZE,TRAIN_NUM_STEP)
eval_batches=make_batches(read_data(EVAL_PATH),EVAL_BATCH_SIZE,EVAL_NUM_STEP)
test_batches=make_batches(read_data(TEST_PATH),EVAL_BATCH_SIZE,EVAL_NUM_STEP)
step=0
for i in range(NUM_EPOCH):
print("In iteration:%d" % (i+1))
step,train_pplx=run_epoch(session,train_model,train_batches,train_model.train_op,True,step)
print("Epoch:%d Train perplexity:%.3f"%(i+1,train_pplx))
,eval_pplx=run_epoch(session,eval_model,eval_batches,tf.no_op,False,0)
print("Epoch:%d Eval perplexity:%.3f"%(i+1,eval_pplx))
_,test_pplx=run_epoch(session,eval_model,test_batches,tf.no_op(),False,0)
print("Test perplexity:%.3f"% test_pplx)
if __name_ == '__main__':
main()