在使用多GPU并行训练深度学习神经网络时,以TFrecords 形式读取MNIST数据的训练数据进行训练,发现比直接用MNIST训练数据训练相同模型时,发现前者收敛速度慢和运行时间长,已知模型没有问题,想请大神帮忙看看是什么原因导致运行速度慢,运行时间长
import os
import time
import numpy as np
import tensorflow as tf
from datetime import datetime
import tensorflow.compat.v1 as v1
from tensorflow.examples.tutorials.mnist import input_data
BATCH_SIZE = 100
LEARNING_RATE = 1e-4
LEARNING_RATE_DECAY = 0.99
REGULARZTION_RATE = 1e-4
EPOCHS = 10000
MOVING_AVERAGE_DECAY = 0.99
N_GPU = 2
MODEL_SAVE_PATH = r'F:\model\log_dir'
MODEL_NAME = 'model.ckpt'
TRAIN_PATH = r'F:\model\threads_file\MNIST_data_tfrecords\train.tfrecords'
TEST_PATH = r'F:\model\threads_file\MNIST_data_tfrecords\test.tfrecords'
def __int64_feature(value):
return v1.train.Feature(int64_list=v1.train.Int64List(value=[value]))
def __bytes_feature(value):
return v1.train.Feature(bytes_list=v1.train.BytesList(value=[value]))
def creat_tfrecords(path, data, labels):
writer = tf.io.TFRecordWriter(path)
for i in range(len(data)):
image = data[i].tostring()
label = labels[i]
examples = v1.train.Example(features=v1.train.Features(feature={
'image': __bytes_feature(image),
'label': __int64_feature(label)
}))
writer.write(examples.SerializeToString())
writer.close()
def parser(record):
features = v1.parse_single_example(record, features={
'image': v1.FixedLenFeature([], tf.string),
'label': v1.FixedLenFeature([], tf.int64)
})
image = tf.decode_raw(features['image'], tf.uint8)
image = tf.reshape(image, [28, 28, 1])
image = tf.cast(image, tf.float32)
label = tf.cast(features['label'], tf.int32)
label = tf.one_hot(label, 10, on_value=1, off_value=0)
return image, label
def get_input(batch_size, path):
dataset = tf.data.TFRecordDataset([path])
dataset = dataset.map(parser)
dataset = dataset.shuffle(10000)
dataset = dataset.repeat(100)
dataset = dataset.batch(batch_size)
iterator = dataset.make_one_shot_iterator()
image, label = iterator.get_next()
return image, label
def model_inference(images, labels, rate, regularzer=None, reuse_variables=None):
with v1.variable_scope(v1.get_variable_scope(), reuse=reuse_variables):
with tf.compat.v1.variable_scope('First_conv'):
w1 = tf.compat.v1.get_variable('weights', [3, 3, 1, 32], tf.float32,
initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.1))
if regularzer:
tf.add_to_collection('losses', regularzer(w1))
b1 = tf.compat.v1.get_variable('biases', [32], tf.float32,
initializer=tf.compat.v1.constant_initializer(0.1))
activation1 = tf.nn.relu(tf.nn.conv2d(images, w1, strides=[1, 1, 1, 1], padding='SAME') + b1)
out1 = tf.nn.max_pool2d(activation1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1],
padding='SAME')
with tf.compat.v1.variable_scope('Second_conv'):
w2 = tf.compat.v1.get_variable('weight', [3, 3, 32, 64], tf.float32,
initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.1))
if regularzer:
tf.add_to_collection('losses', regularzer(w2))
b2 = tf.compat.v1.get_variable('biases', [64], tf.float32,
initializer=tf.compat.v1.constant_initializer(0.1))
activation2 = tf.nn.relu(tf.nn.conv2d(out1, w2, strides=[1, 1, 1, 1], padding='SAME') + b2)
out2 = tf.nn.max_pool2d(activation2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1],
padding='SAME')
out3 = tf.reshape(out2, [-1, 7*7*64], name='flatten')
with tf.compat.v1.variable_scope('FC_1'):
w3 = tf.compat.v1.get_variable('weight', [7*7*64, 1024], tf.float32,
initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.1))
if regularzer:
tf.add_to_collection('losses', regularzer(w3))
b3 = tf.compat.v1.get_variable('biases', [1024], tf.float32,
initializer=tf.compat.v1.constant_initializer(0.1))
activation3 = tf.nn.relu(tf.matmul(out3, w3) + b3)
out4 = tf.nn.dropout(activation3, keep_prob=rate)
with tf.compat.v1.variable_scope('FC_2'):
w4 = tf.compat.v1.get_variable('weight', [1024, 10], tf.float32,
initializer=tf.compat.v1.truncated_normal_initializer(stddev=0.1))
if regularzer:
tf.add_to_collection('losses', regularzer(w4))
b4 = tf.compat.v1.get_variable('biases', [10], tf.float32,
initializer=tf.compat.v1.constant_initializer(0.1))
output = tf.nn.softmax(tf.matmul(out4, w4) + b4)
with tf.compat.v1.variable_scope('Loss_entropy'):
if regularzer:
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=tf.argmax(labels, 1), logits=output)) \
+ tf.add_n(tf.get_collection('losses'))
else:
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=tf.argmax(labels, 1), logits=output))
with tf.compat.v1.variable_scope('Accuracy'):
correct_data = tf.equal(tf.math.argmax(labels, 1), tf.math.argmax(output, 1))
accuracy = tf.reduce_mean(tf.cast(correct_data, tf.float32, name='accuracy'))
return output, loss, accuracy
def average_gradients(tower_grads):
average_grads = []
for grad_and_vars in zip(*tower_grads):
grads = []
for g, v2 in grad_and_vars:
expanded_g = tf.expand_dims(g, 0)
grads.append(expanded_g)
grad = tf.concat(grads, 0)
grad = tf.reduce_mean(grad, 0)
v = grad_and_vars[0][1]
grad_and_var = (grad, v)
average_grads.append(grad_and_var)
return average_grads
def main(argv=None):
with tf.Graph().as_default(), tf.device('/cpu:0'):
x, y = get_input(batch_size=BATCH_SIZE, path=TRAIN_PATH)
regularizer = tf.contrib.layers.l2_regularizer(REGULARZTION_RATE)
global_step = v1.get_variable('global_step', [], initializer=v1.constant_initializer(0), trainable=False)
lr = v1.train.exponential_decay(LEARNING_RATE, global_step, 55000/BATCH_SIZE, LEARNING_RATE_DECAY)
opt = v1.train.AdamOptimizer(lr)
tower_grads = []
reuse_variables = False
device = ['/gpu:0', '/cpu:0']
for i in range(len(device)):
with tf.device(device[i]):
with v1.name_scope(device[i][1:4] + '_0') as scope:
out, cur_loss, acc = model_inference(x, y, 0.3, regularizer, reuse_variables)
reuse_variables = True
grads = opt.compute_gradients(cur_loss)
tower_grads.append(grads)
grads = average_gradients(tower_grads)
for grad, var in grads:
if grad is not None:
v1.summary.histogram('gradients_on_average/%s' % var.op.name, grad)
apply_gradient_op = opt.apply_gradients(grads, global_step)
for var in v1.trainable_variables():
tf.summary.histogram(var.op.name, var)
variable_averages = v1.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
variable_to_average = (v1.trainable_variables() + v1.moving_average_variables())
variable_averages_op = variable_averages.apply(variable_to_average)
train_op = tf.group(apply_gradient_op, variable_averages_op)
saver = v1.train.Saver(max_to_keep=1)
summary_op = v1.summary.merge_all() # merge_all 可以将所有summary全部保存到磁盘
init = v1.global_variables_initializer()
with v1.Session(config=v1.ConfigProto(allow_soft_placement=True, log_device_placement=True)) as sess:
init.run()
summary_writer = v1.summary.FileWriter(MODEL_SAVE_PATH, sess.graph) # 指定一个文件用来保存图
for step in range(EPOCHS):
try:
start_time = time.time()
_, loss_value, out_value, acc_value = sess.run([train_op, cur_loss, out, acc])
duration = time.time() - start_time
if step != 0 and step % 100 == 0:
num_examples_per_step = BATCH_SIZE * N_GPU
examples_per_sec = num_examples_per_step / duration
sec_per_batch = duration / N_GPU
format_str = '%s: step %d, loss = %.2f(%.1f examples/sec; %.3f sec/batch), accuracy = %.2f'
print(format_str % (datetime.now(), step, loss_value, examples_per_sec, sec_per_batch, acc_value))
summary = sess.run(summary_op)
summary_writer.add_summary(summary, step)
if step % 100 == 0 or (step + 1) == EPOCHS:
checkpoint_path = os.path.join(MODEL_SAVE_PATH, MODEL_NAME)
saver.save(sess, checkpoint_path, global_step=step)
except tf.errors.OutOfRangeError:
break
if __name__ == '__main__':
tf.app.run()