bazookier 2023-04-03 19:43 采纳率: 100%
浏览 190
已结题

tensorflow分布式parameter server策略CUDA_ERROR_OUT_OF_MEMORY错误

使用parameter server strategy训练。task分chief,parameter server和worker。现在它们的代码分别为:
chief的ps_dist_strategy_chief.py:

import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
import numpy as np
print(tf.__version__)#查看tf版本
gpus=tf.config.list_physical_devices('GPU')
print('*'*20, 'chief: ', gpus)#查看有多少个可用的GPU
tf.config.set_visible_devices(devices=[], device_type='GPU')
visible_devices = tf.config.get_visible_devices()
print('*'*20, 'chief visible devices:', visible_devices)

fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

train_images = train_images[..., None]
test_images = test_images[..., None]

train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

def input_fn(X,y,shuffle, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((X,y))
    if shuffle: 
        dataset = dataset.shuffle(buffer_size=100)
    dataset = dataset.repeat()
    dataset = dataset.batch(batch_size)
    return dataset

dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)

# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "chief":["127.0.0.1:5000"],#调度节点
        "worker": ["127.0.0.1:5001"], #计算节点
        "ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
    },
   "task": {"type": "chief", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
strategy = tf.distribute.experimental.ParameterServerStrategy()

LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
               log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)

tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
    eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)

,parameter server的:ps_dist_strategy_parameter_server.py

import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
import numpy as np
gpus=tf.config.list_physical_devices('GPU')
print('*'*20, 'parameter server: ', gpus)#查看有多少个可用的GPU
tf.config.set_visible_devices(devices=[], device_type='GPU')
visible_devices = tf.config.get_visible_devices()
print('*'*20, 'parameter server:', visible_devices)#查看有多少个可用的GPU

fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

train_images = train_images[..., None]
test_images = test_images[..., None]

# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

def input_fn(X,y,shuffle, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((X,y))
    if shuffle: 
        dataset = dataset.shuffle(buffer_size=100)
    dataset = dataset.repeat()
    dataset = dataset.batch(batch_size)
    return dataset

dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)

# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "chief":["127.0.0.1:5000"],#调度节点
        "worker": ["127.0.0.1:5001"], #计算节点
        "ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
    },
   "task": {"type": "ps", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
#定义ParameterServerStrategy策略即可
strategy = tf.distribute.experimental.ParameterServerStrategy()

LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
               log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)

tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
    eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)

和worker的:ps_dist_strategy_worker.py

import tensorflow.compat.v1 as tf
tf.disable_eager_execution()
tf.logging.set_verbosity(tf.logging.INFO)
import tensorflow.keras as keras
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3' # 指定该代码文件的可见GPU为第一个和第二个
os.environ['HIP_VISIBLE_DEVICES'] = '0,1,2,3'
import numpy as np
print(tf.__version__)#查看tf版本
gpus=tf.config.list_physical_devices('GPU')
print(gpus)#查看有多少个可用的GPU

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# 向数组添加维度 -> 新的维度 == (28, 28, 1)
# 我们这样做是因为我们模型中的第一层是卷积层
# 而且它需要一个四维的输入 (批大小, 高, 宽, 通道).
# 批大小维度稍后将添加。
train_images = train_images[..., None]
test_images = test_images[..., None]

# 获取[0,1]范围内的图像。
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

def input_fn(X,y,shuffle, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((X,y))
    if shuffle: 
        ##### Modified by Ngaiman Chow on 2023-4-3 for try to guess if it consumes CUDA memory and cause the OOM error
        ###dataset = dataset.shuffle(buffer_size=100000)
        dataset = dataset.shuffle(buffer_size=100)
        ##### Modified by Ngaiman Chow on 2023-4-3 for try to guess if it consumes CUDA memory and cause the OOM error
    dataset = dataset.repeat()
    dataset = dataset.batch(batch_size)
    return dataset

dataset=input_fn(train_images,train_labels,True, 4)
test_dataset=input_fn(test_images,test_labels,True, 4)


# tf2.0需先配置cluster_resolver(即TF_CONFIG),否则报错
import json
os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "chief":["127.0.0.1:5000"],#调度节点
        "worker": ["127.0.0.1:5001"], #计算节点
        "ps": ["127.0.0.1:5002"]#参数服务器节点,可不必使用GPU
    },
   "task": {"type": "worker", "index": 0} #定义本进程为worker节点,即["127.0.0.1:5001"]为计算节点
})
#定义ParameterServerStrategy策略即可
strategy = tf.distribute.experimental.ParameterServerStrategy()

LEARNING_RATE = 1e-3
BATCH_SIZE=32
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10, activation='softmax')
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

#定义多少步保存模型,多少步打印日志信息等,注意,分布式训练关键在于-train_distribute=strategy
run_config = tf.estimator.RunConfig(keep_checkpoint_max=1,
               log_step_count_steps=10,train_distribute=strategy)
#输入model_fn,模型保存路径
classifier = tf.estimator.Estimator(model_fn=model_fn,model_dir="./model_test_fashion_mnist",config=run_config)

tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=lambda :input_fn(train_images,train_labels,tf.estimator.ModeKeys.TRAIN, 2),max_steps=30000),
    eval_spec=tf.estimator.EvalSpec(input_fn=lambda :input_fn(test_images,test_labels,tf.estimator.ModeKeys.EVAL, 2),steps=300)
)

当分别在3个命令行窗口中执行:
python ps_dist_strategy_chief.py

python ps_dist_strategy_parameter_sever.py

python ps_dist_strategy_worker.py
时,chief和worker会报CUDA_ERROR_OUT_OF_MEMORY错误。运行环境是在4块Nvidia A100 40G显卡的单主机上。请问怎么修改才能够让它不报CUDA_ERROR_OUT_OF_MEMORY错误呢?

  • 写回答

7条回答 默认 最新

  • zuogle 2023-04-03 20:03
    关注

    该回答引用ChatGPT GPT-4

    出现 CUDA_ERROR_OUT_OF_MEMORY 错误通常是因为显存不足。在你的代码中,由于你在每个进程中都设置了 os.environ['CUDA_VISIBLE_DEVICES'] = '0,1,2,3',这会导致每个进程都试图在所有 4 个 GPU 上运行,从而消耗大量的显存。为了避免这个问题,你应该让每个进程仅在一个特定的 GPU 上运行。以下是一些建议:

    1、为每个进程分配一个单独的 GPU。在每个脚本中,根据任务类型设置 os.environ['CUDA_VISIBLE_DEVICES']:

    在 ps_dist_strategy_chief.py 中:

    os.environ['CUDA_VISIBLE_DEVICES'] = '0'
    
    

    在 ps_dist_strategy_parameter_server.py 中:

    
    os.environ['CUDA_VISIBLE_DEVICES'] = '1'
    
    

    在 ps_dist_strategy_worker.py 中:

    os.environ['CUDA_VISIBLE_DEVICES'] = '2'
    
    

    如果你有更多的 worker,可以为每个 worker 分配一个单独的 GPU。

    2、减小批量大小。将 BATCH_SIZE 从 32 减小到一个较小的值,如 16 或 8,这将减少每个 GPU 上使用的显存量。

    3、请注意,在 TensorFlow 2.x 中,tf.compat.v1.disable_eager_execution() 已被弃用。建议您更新代码以适应 TensorFlow 2.x 的最佳实践。在 TensorFlow 2.x 中,可以使用 tf.distribute API 进行分布式训练。以下是一个使用 tf.distribute.experimental.MultiWorkerMirroredStrategy 的示例:

    
    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    
    

    在此示例中,我们使用了 MultiWorkerMirroredStrategy,这是一个同步分布式策略,它在所有工作节点上复制所有变量。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(6条)

报告相同问题?

问题事件

  • 系统已结题 4月17日
  • 已采纳回答 4月9日
  • 赞助了问题酬金200元 4月4日
  • 创建了问题 4月3日

悬赏问题

  • ¥15 delphi webbrowser组件网页下拉菜单自动选择问题
  • ¥15 linux驱动,linux应用,多线程
  • ¥20 我要一个分身加定位两个功能的安卓app
  • ¥15 基于FOC驱动器,如何实现卡丁车下坡无阻力的遛坡的效果
  • ¥15 IAR程序莫名变量多重定义
  • ¥15 (标签-UDP|关键词-client)
  • ¥15 关于库卡officelite无法与虚拟机通讯的问题
  • ¥15 目标检测项目无法读取视频
  • ¥15 GEO datasets中基因芯片数据仅仅提供了normalized signal如何进行差异分析
  • ¥100 求采集电商背景音乐的方法