使用parameter server strategy训练。task分chief,parameter server和worker。现在它们的代码分别为:
chief的ps_dist_strategy_chief.py:
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
import numpy as np
print(tf.__version__)#查看tf版本
gpus=tf.config.list_physical_devices('GPU')
print('*'*20, 'chief: ', gpus)#查看有多少个可用的GPU
tf.config.set_visible_devices(devices=[], device_type='GPU')
visible_devices = tf.config.get_visible_devices()
print('*'*20, 'chief visible devices:', visible_devices)
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images = train_images[..., None]
test_images = test_images[..., None]
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
def input_fn(X,y,shuffle, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X,y))
if shuffle:
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)
# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief":["127.0.0.1:5000"],#调度节点
"worker": ["127.0.0.1:5001"], #计算节点
"ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
},
"task": {"type": "chief", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
strategy = tf.distribute.experimental.ParameterServerStrategy()
LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)
,parameter server的:ps_dist_strategy_parameter_server.py
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
import numpy as np
gpus=tf.config.list_physical_devices('GPU')
print('*'*20, 'parameter server: ', gpus)#查看有多少个可用的GPU
tf.config.set_visible_devices(devices=[], device_type='GPU')
visible_devices = tf.config.get_visible_devices()
print('*'*20, 'parameter server:', visible_devices)#查看有多少个可用的GPU
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
train_images = train_images[..., None]
test_images = test_images[..., None]
# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
def input_fn(X,y,shuffle, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X,y))
if shuffle:
dataset = dataset.shuffle(buffer_size=100)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)
# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief":["127.0.0.1:5000"],#调度节点
"worker": ["127.0.0.1:5001"], #计算节点
"ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
},
"task": {"type": "ps", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
#定义ParameterServerStrategy策略即可
strategy = tf.distribute.experimental.ParameterServerStrategy()
LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)
和worker的:ps_dist_strategy_worker.py
import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
os.environ['HIP_VISIBLE_DEVICES'] = '0,1,2,3'
import numpy as np
print(tf.__version__)#查看tf版本
gpus=tf.config.list_physical_devices('GPU')
print(gpus)#查看有多少个可用的GPU
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# 向数组添加维度 -> 新的维度 == (28, 28, 1)
# 我们这样做是因为我们模型中的第一层是卷积层
# 而且它需要一个四维的输入 (批大小, 高, 宽, 通道).
# 批大小维度稍后将添加。
train_images = train_images[..., None]
test_images = test_images[..., None]
# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
def input_fn(X,y,shuffle, batch_size):
dataset = tf.data.Dataset.from_tensor_slices((X,y))
if shuffle:
##### Modified by Ngaiman Chow on 2023-4-3 for try to guess if it consumes CUDA memory and cause the OOM error
###dataset = dataset.shuffle(buffer_size=100000)
dataset = dataset.shuffle(buffer_size=100)
##### Modified by Ngaiman Chow on 2023-4-3 for try to guess if it consumes CUDA memory and cause the OOM error
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)
# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"chief":["127.0.0.1:5000"],#调度节点
"worker": ["127.0.0.1:5001"], #计算节点
"ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
},
"task": {"type": "worker", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
#定义ParameterServerStrategy策略即可
strategy = tf.distribute.experimental.ParameterServerStrategy()
LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)
当分别在3个命令行窗口中执行:
python ps_dist_strategy_chief.py
和
python ps_dist_strategy_parameter_sever.py
和
python ps_dist_strategy_worker.py
时,chief和worker会报CUDA_ERROR_OUT_OF_MEMORY错误。运行环境是在4块Nvidia A100 40G显卡的单主机上。请问怎么修改才能够让它不报CUDA_ERROR_OUT_OF_MEMORY错误呢?