在运行mindspore框架的blip2的一阶段推理的时候,出现了[ERROR] ANALYZER(3243863,ffffaa423010,python):2024-04-24-20:34:01.769.582 [mindspore/ccsrc/pipeline/jit/ps/static_analysis/async_eval_result.cc:70] HandleException] Exception happened, check the information as below.
TypeError: must be real number, not NoneType
代码如下
class BaseTrainer:
def __init__(self, task: str = None, model_name: str = None):
if model_name is None:
model_name = "model name unspecified."
if task is None:
task = "task name unspecified."
logger.info("Now Running Task is: %s, Model is: %s", task, model_name)
self.model_name = model_name
self.task = task
self.config = None
self.default_task_config = None
self.train_dataset = None
self.eval_dataset = None
self.network = None
self.optimizer = None
self.image_processor = None
self.audio_processor = None
self.tokenizer = None
self.callbacks = None
self.eval_callbacks = None
self.compute_metrics = None
self.kwargs = None
self.pipeline_task = None
if task not in SUPPORT_TASKS.keys():
logger.warning("Input task name is not in the supported list or unspecified.")
if task in SUPPORT_TASKS.keys() and model_name not in SUPPORT_TASKS.get(task).keys():
model_name_support_list = list(MindFormerBook().get_model_name_support_list_for_task(task))
model_name_support_list.sort()
logger.warning("Input model name is not in the supported list or unspecified.")
logger.warning("See the list of supported task and model name: %s", model_name_support_list)
logger.warning("The default model config: %s will now be used for the %s task ",
SUPPORT_TASKS.get(self.task).get("common"), task)
self.model_name = "common"
def set_config(self,
config: Optional[Union[dict, str, ConfigArguments, TrainingArguments]] = None,
is_full_config: bool = False):
"""Set the task config for task trainer."""
if config is not None:
if is_full_config:
self.config = config
else:
self.setup_task_config()
self.config = self._merge_config(config)
else:
self.setup_task_config()
self.config = self.default_task_config
build_parallel_config(self.config)
self._check_grad_accumulation_steps()
self._check_global_batch_size_for_auto_parallel()
return self.config
def setup_task_config(self):
"""Setup the default task config."""
task_config = None
if self.task in SUPPORT_TASKS.keys() and self.model_name in SUPPORT_TASKS.get(self.task).keys():
task_config = MindFormerConfig(SUPPORT_TASKS.get(self.task).get(self.model_name))
if isinstance(task_config, MindFormerConfig):
self.default_task_config = task_config
else:
logger.warning("If the default config arguments is not specified,"
"you must define the required model, optimizer, and so on"
"in the train or evaluate or predict attribute function.")
def _reset_wrapper_for_pipeline_parallel(self):
"""Reset wrapper when pipeline parallel."""
if self.config.runner_wrapper is not None:
self.config.runner_wrapper.type = "MFPipelineWithLossScaleCell" \
if self.config.runner_wrapper.type != "MFPipelineWithLossScaleCell" else self.config.runner_wrapper.type
self.config.runner_wrapper.micro_batch_num = self.config.parallel_config.micro_batch_num
logger.warning(
"When using the pipeline parallel mode, "
"the MFPipelineWithLossScaleCell class is used by default.")
else:
logger.info(
"When using the pipeline parallel mode, "
"because the wrapper class is not specified, "
"MindSpore's built-in PipelineCell is used by default")
logger.info("PipelineWrapper under evaluate or predict mode will not take effect.")
def _reset_wrapper_for_grad_accu(self):
"""Reset wrapper when using grad accumulation."""
if self.config.runner_wrapper is not None:
self.config.runner_wrapper.type = "MFPipelineWithLossScaleCell" \
if self.config.runner_wrapper.type != "MFPipelineWithLossScaleCell" else self.config.runner_wrapper.type
else:
self.config.runner_wrapper.type = "MFPipelineWithLossScaleCell"
logger.warning(
"When using the gradient_accumulation_steps in semi/auto parallel mode, "
"the MFPipelineWithLossScaleCell class is used by default.")
def _reset_dataset_batch_size(self):
"""Reset dataset batch size according to the global batch size of runner config."""
check_dataset_config(self.config)
def _merge_config(self, args):
"""Merge config from default task config."""
if self.default_task_config is None:
logger.warning("default task config is None, you will not be able to merge config parameters.")
return args
if isinstance(args, dict):
self.default_task_config.merge_from_dict(args)
elif isinstance(args, str):
if not (os.path.realpath(args) and os.path.exists(args)):
raise FileNotFoundError(f"config path must be exist, but get {args}.")
if not args.endswith(('.yaml', '.yml')):
raise ValueError(f"config file must be end with .yaml or .yml, but get {args}")
self.default_task_config = MindFormerConfig(args)
elif isinstance(args, ConfigArguments):
if hasattr(args, 'train_dataset'):
check_train_data_loader_type(args, self.default_task_config)
if hasattr(args, 'eval_dataset'):
check_eval_data_loader_type(args, self.default_task_config)
if hasattr(args, 'optimizer'):
check_optimizer_and_lr_type(args, self.default_task_config)
if hasattr(args, 'runner_wrapper'):
check_wrapper_config(args, self.default_task_config)
self.default_task_config.merge_from_dict(args.__dict__)
elif isinstance(args, TrainingArguments):
args.convert_args_to_mindformers_config(self.default_task_config)
else:
logger.warning(
"The type of config parameter to merge is not supported, "
"currently supported types are [dict, config_path(str), ConfigArguments, TrainingArguments], "
"but get %s", type(args))
return self.default_task_config
def create_dataset(self, is_train: bool = True, default_args: dict = None):
"""Create the dataset for training or evaluate."""
dataset = self.train_dataset if is_train else self.eval_dataset
dataset_task = self.config.train_dataset_task if is_train else self.config.eval_dataset_task
if isinstance(dataset, (BaseDataset, Dataset)):
return dataset
if dataset is None:
dataset = build_dataset(dataset_task, default_args=default_args)
elif check_dataset_iterable(dataset):
dataset_task.type = 'GeneralDataset'
default_args = {} if default_args is None else default_args
default_args["dataset"] = dataset
dataset = build_dataset(dataset_task, default_args=default_args)
else:
raise ValueError("Dataset should be Dataset, iterator, iterable class which has `__iter__`, "
f"iterable class which has `__get_item__` and `__len__`, but get {dataset}.")
return dataset
def create_train_dataset(self, default_args: dict = None):
"""Create the train dataset for training."""
logger.info(".........Build Dataset From Config..........")
self._reset_dataset_batch_size()
train_dataset = self.create_dataset(is_train=True, default_args=default_args)
return train_dataset
def create_eval_dataset(self, default_args: dict = None):
"""Create the eval dataset for evaluate."""
logger.info(".........Build Dataset From Config..........")
self._reset_dataset_batch_size()
parallel_mode = ms.get_auto_parallel_context("parallel_mode")
pipeline_stages = ms.get_auto_parallel_context("pipeline_stages")
if parallel_mode in ["semi_auto_parallel", "auto_parallel"] and pipeline_stages > 1:
self.config.eval_dataset.batch_size = \
self.config.eval_dataset.batch_size // self.config.parallel_config.micro_batch_num
if parallel_mode in ["semi_auto_parallel", "auto_parallel"]:
self.config.eval_dataset.batch_size = \
self.config.eval_dataset.batch_size // self.config.micro_batch_interleave_num
# reduce batch size for that gradient_accumulation_steps will not take effect on eval
if self.config.runner_config.gradient_accumulation_steps > 1:
self.config.eval_dataset.batch_size = \
self.config.eval_dataset.batch_size // self.config.runner_config.gradient_accumulation_steps
if self.config.eval_dataset.batch_size < 1:
logger.warning("eval_dataset batch_size is less than 1 after bs calculate, reset batch_size to 1, "
"please check your configs about batch_size, micro_batch_num micro_batch_interleave_num "
"and gradient_accumulation_steps.")
self.config.eval_dataset.batch_size = 1
logger.info("For evaluate phase, batch size for eval dataset is %s, different from training, "
"not multiplied by micro_batch_num, micro_batch_interleave_num and gradient_accumulation_steps",
self.config.eval_dataset.batch_size)
eval_dataset = self.create_dataset(is_train=False, default_args=default_args)
return eval_dataset
def create_network(self, default_args: dict = None):
"""Create the network for task trainer."""
logger.info(".........Build Network From Config..........")
return build_network(self.config.model, default_args=default_args)
def wrap_network_with_tool_cells(self, network):
"""For training process, warp the network with some tool cells."""
micro_batch_interleave_num = self.config.micro_batch_interleave_num
gradient_accumulation_steps = self.config.runner_config.gradient_accumulation_steps
parallel_mode = ms.context.get_auto_parallel_context("parallel_mode")
pp = self.get_pipeline_stages()
if micro_batch_interleave_num > 1:
logger.info("micro_batch_interleave_num > 1, the double copy parallel feature is turned on.")
network = MicroBatchInterleaved(network, micro_batch_interleave_num)
if gradient_accumulation_steps > 1 and not pp > 1:
logger.info("gradient_accumulation_steps > 1, GradAccumulationCell is wrapped on network. "
"It is suggested to use `Lazy Inline` feature to save compiling time.")
network = GradAccumulationCell(network, gradient_accumulation_steps)
if pp > 1:
micro_batch_num = self.config.parallel_config.micro_batch_num
network = PipelineCell(network, micro_size=micro_batch_num)
if parallel_mode in ["semi_auto_parallel", "auto_parallel"]:
network = _VirtualDatasetCell(network)
return network
def wrap_eval_network_with_tool_cells(self, network):
"""For evaluate in training process, warp the network with some tool cells."""
parallel_mode = ms.context.get_auto_parallel_context("parallel_mode")
if parallel_mode in ["semi_auto_parallel", "auto_parallel"]:
network = _VirtualDatasetCell(network)
return network
def create_image_processor(self, default_args: dict = None):
"""Create the image processor for predict."""
logger.info(".........Build Image Processor From Config..........")
self.image_processor = build_processor(
self.config.processor.image_processor, default_args=default_args)
return self.image_processor
def create_optimizer_scheduler(self, network, layer_scale=False):
"""Create the optimizer for training."""
logger.info(".........Build Optimizer From Config..........")
# learning rate scale for multi-nodes training
learning_scale = self.config.lr_scale
scale_factor = self.config.lr_scale_factor
# build learning rate schedule
lr_schedule = self.create_lr_scheduler(learning_scale, scale_factor)
weight_decay = self.config.optimizer.weight_decay if self.config.optimizer.weight_decay else 0.
layer_decay = self.config.layer_decay if self.config.layer_decay else 1.0
group_params = get_optimizer_grouped_parameters(network,
weight_decay,
lr_schedule,
layer_scale=layer_scale,
layer_decay=layer_decay)
if lr_schedule is not None:
self.optimizer = build_optim(
self.config.optimizer,
default_args={"params": group_params,
"learning_rate": lr_schedule})
else:
if self.config.optimizer.learning_rate is None:
raise ValueError("learning_rate must be input")
self.config.optimizer.learning_rate = self.learning_rate_scale(
self.config.optimizer.learning_rate, scale_factor) \
if learning_scale and scale_factor is not None else self.config.optimizer.learning_rate
self.optimizer = build_optim(
self.config.optimizer,
default_args={"params": group_params})
return self.optimizer
def create_lr_scheduler(self, learning_scale: bool = False, scale_factor: int = 256):
"""Create the learning rate scheduler."""
logger.info(".........Build LR Schedule From Config..........")
train_data_size = self.get_train_data_size()
if self.config.lr_schedule:
warmup_epochs = self.config.lr_schedule.pop("warmup_epochs", None)
warmup_ratio = self.config.lr_schedule.pop("warmup_ratio", None)
if not self.config.runner_config.sink_mode:
total_steps = int(self.config.runner_config.epochs * train_data_size)
else:
total_steps = int(self.config.runner_config.epochs * self.config.runner_config.sink_size)
if warmup_epochs is not None and warmup_ratio is not None:
logger.warning("warmup_epochs and warmup_ratio are set simultaneously,"
"warmup_ratio takes precedence.")
warmup_epochs = None
if warmup_epochs is not None:
logger.warning("warmup_epochs was set in lr_schedule,"
"it will multiply the data size to represent the warmup steps")
self.config.lr_schedule.warmup_steps = int(warmup_epochs * train_data_size)
if warmup_ratio is not None:
self.config.lr_schedule.warmup_steps = int(total_steps * warmup_ratio)
self.config.lr_schedule.total_steps = total_steps \
if self.config.lr_schedule.total_steps is None or self.config.lr_schedule.total_steps == -1 \
else int(self.config.lr_schedule.total_steps)
self.config.lr_schedule.learning_rate = self.learning_rate_scale(
self.config.lr_schedule.learning_rate, scale_factor) \
if learning_scale and scale_factor is not None else self.config.lr_schedule.learning_rate
lr_schedule = build_lr(self.config.lr_schedule)
return lr_schedule
def create_model_wrapper(self, network, optimizer):
"""Create the model wrapper for training."""
logger.info(".........Build Model Wrapper for Train From Config..........")
model_wrapper = build_wrapper(self.config.runner_wrapper,
default_args={"network": network,
"optimizer": optimizer,
"parallel_config": self.config.parallel_config})
return model_wrapper
def create_callbacks(self, default_args: dict = None):
"""Create the callback list for training."""
logger.info(".........Build Callbacks for Train From Config..........")
self.callbacks = []
if self.config.profile:
self.callbacks.append(self.config.profile_cb)
self.callbacks.extend(build_callback(self.config.callbacks, default_args=default_args))
return self.callbacks
def create_eval_callbacks(self, default_args: dict = None):
"""Create the eval callback list for training."""
logger.info(".........Build Callbacks for Evaluate From Config..........")
self.eval_callbacks = []
self.eval_callbacks.extend(build_callback(self.config.eval_callbacks, default_args=default_args))
return self.eval_callbacks
def create_metrics(self, metric_name: str = None):
"""Create Metrics For Evaluate or Fit."""
if self.compute_metrics is not None:
self.compute_metrics = get_metrics(self.compute_metrics)
return self.compute_metrics
if self.config.metric is None:
raise ValueError("When `do_eval` is True and `compute_metrics` is None, \
the config of metric must not be None.")
if isinstance(self.config.metric, dict):
self.config.metric = [self.config.metric]
self.compute_metrics = {}
for metric_config in self.config.metric:
assert "type" in metric_config, "The type of metric is not found!"
metric = build_metric(metric_config)
if metric_name is None:
metric_name = metric.__class__.__name__
self.compute_metrics[metric_name] = metric
return self.compute_metrics
def count_parameters(self):
"""Count network parameters number."""
if self.network is not None:
logger.info("Network Parameters: %s M.", str(count_params(self.network)))
else:
logger.warning("Network is None, parameters incalculable.")
def set_seed(self, seed: int = None):
"""Set seed for training."""
if seed is None:
if self.config.seed is None:
raise ValueError("seed is not set in config, it is None.")
set_seed(self.config.seed)
else:
set_seed(seed)
def set_train_dataset(self, dataset):
"""Set the attribute of train dataset."""
if dataset is None:
raise ValueError("Train dataset is None")
self.train_dataset = dataset
def set_eval_dataset(self, dataset):
"""Set the attribute of eval dataset ."""
if dataset is None:
raise ValueError("Eval dataset is None")
self.eval_dataset = dataset
def set_network(self, network, is_train: bool = True):
"""Set the attribute of network."""
if network is None:
raise ValueError("network is None")
if isinstance(network, (Cell, PreTrainedModel)):
network.set_train(is_train)
self.network = network
def get_train_data_size(self):
"""Get train dataset size."""
if self.train_dataset is None:
raise NotImplementedError("train dataset is None")
return self.train_dataset.get_dataset_size()
def get_eval_data_size(self):
"""Get eval dataset size."""
if self.eval_dataset is None:
raise NotImplementedError("train dataset is None")
return self.eval_dataset.get_dataset_size()
def get_pipeline_stages(self):
"""Get pipeline stages for task trainer."""
pipeline_stages = ms.get_auto_parallel_context("pipeline_stages")
return pipeline_stages
def learning_rate_scale(self, base_learning_rate: float = 0., scale_factor: Optional[Union[float, int]] = 256.):
"""Scale learning rate for training."""
if not isinstance(base_learning_rate, float):
raise ValueError(f"learning rate must be float type, but get {type(base_learning_rate)}")
if not isinstance(scale_factor, (float, int)):
raise ValueError(f"scale_factor must be float or int type, but get {type(scale_factor)}")
device_num = get_real_group_size()
per_device_batch_size = self.config.train_dataset.batch_size
learning_rate = (base_learning_rate * device_num * per_device_batch_size) / scale_factor
return learning_rate
def training_process(
self,
config: Optional[Union[dict, MindFormerConfig, ConfigArguments, TrainingArguments]] = None,
network: Optional[Union[Cell, PreTrainedModel]] = None,
dataset: Optional[Union[BaseDataset, GeneratorDataset]] = None,
optimizer: Optional[Optimizer] = None,
callbacks: Optional[Union[Callback, List[Callback]]] = None,
compute_metrics: Optional[Union[dict, set]] = None,
**kwargs):
"""Train or Fine-tune for BaseTrainer in MindFormers."""
self.kwargs = kwargs
self.train_dataset = dataset if dataset else self.train_dataset
self.eval_dataset = kwargs.get('eval_dataset', None)
self.compute_metrics = compute_metrics if compute_metrics else self.compute_metrics
is_full_config = kwargs.get("is_full_config", False)
config = self.set_config(config, is_full_config)
# build dataset
logger.info(".........Build Dataset For Train..........")
#import pdb; pdb.set_trace() #断点
dataset = self.create_train_dataset()
logger.info("Create train dataset finish, dataset size:%d", dataset.get_dataset_size())
append_info = None
if config.resume_training and config.load_checkpoint:
logger.info(".............Start load resume context from checkpoint..................")
load_resume_context_from_checkpoint(config, dataset)
resume_dict = {
"step_num": config.runner_config.initial_step,
"epoch_num": config.runner_config.initial_epoch,
}
if config.runner_wrapper.scale_sense is not None:
if hasattr(config.runner_wrapper.scale_sense, 'loss_scale_value'):
resume_dict["loss_scale"] = config.runner_wrapper.scale_sense.loss_scale_value
else:
resume_dict["loss_scale"] = config.runner_wrapper.scale_sense
logger.info("initial epoch: %d", config.runner_config.initial_epoch)
logger.info("initial step: %d", config.runner_config.initial_step)
append_info = [resume_dict]
dataset.set_init_step(config.runner_config.initial_step)
else:
config.runner_config.initial_epoch = 0
config.runner_config.initial_step = 0
self.set_train_dataset(dataset)
check_runner_config(config, dataset)
# check rules
check_rules(config, mode='train', network=network, dataset=dataset)
# build network
logger.info(".........Build Net For Train..........")
if network is None and self.network is None:
network = self.create_network(
default_args={"parallel_config": config.parallel_config,
"moe_config": config.moe_config})
elif network is None and self.network is not None:
logger.info(".........Using The Existing Network For Train:: %s", self.network.__class__.__name__)
network = self.network
self._check_training_network_no_use_past(network)
eval_network = None
if network is not None:
eval_network = network
# warp network for training
network = self.wrap_network_with_tool_cells(eval_network)
eval_network = self.wrap_eval_network_with_tool_cells(eval_network)
self.set_network(network, is_train=True)
self.count_parameters()
# build optimizer
logger.info(".........Build Optimizer For Train..........")
if optimizer is None:
optimizer = self.create_optimizer_scheduler(network, layer_scale=config.layer_scale)
# build model wrapper
logger.info(".........Build Running Wrapper From Config For Train..........")
wrapper = self.create_model_wrapper(network, optimizer)
# build callback
logger.info(".........Build Callbacks For Train..........")
default_callbacks = []
if self.config.profile:
default_callbacks.append(self.config.profile_cb)
for callback in self.config.callbacks:
default_args = None
if "type" in callback and callback["type"] == "MFLossMonitor":
default_args = {
"learning_rate": optimizer.learning_rate if optimizer else wrapper.optimizer.learning_rate,
"origin_epochs": config.runner_config.origin_epochs,
"dataset_size": config.data_size,
"micro_batch_interleave_num": config.micro_batch_interleave_num,
"micro_batch_num": config.parallel_config.micro_batch_num,
"initial_epoch": config.runner_config.initial_epoch,
"initial_step": config.runner_config.initial_step,
"global_batch_size": self.global_batch_size,
"gradient_accumulation_steps": self.config.runner_config.gradient_accumulation_steps
}
elif "type" in callback and callback["type"] == "CheckpointMointor":
default_args = {"append_info": append_info}
default_callbacks.append(build_callback(callback, default_args=default_args))
if callbacks is not None:
if isinstance(callbacks, list):
default_callbacks.extend(callbacks)
if isinstance(callbacks, Callback):
default_callbacks.append(callbacks)
callbacks = default_callbacks
# define compute metrics for evaluate in training
if config.do_eval:
compute_metrics = self.create_metrics()
# define Model and begin training
logger.info(".........Starting Init Train Model..........")
if wrapper is not None:
model = Model(wrapper, metrics=compute_metrics, eval_network=eval_network)
else:
model = Model(network, optimizer=optimizer, metrics=compute_metrics, eval_network=eval_network)
logger.info(".........Starting Training Model..........")
if get_real_rank() % 8 == 0:
pprint(config)
logger.info(".........Model Compiling, Please Wait a Moment...........")
model.train(config.runner_config.epochs, dataset,
callbacks=callbacks,
dataset_sink_mode=config.runner_config.sink_mode,
sink_size=config.runner_config.sink_size,
initial_epoch=config.runner_config.initial_epoch)
logger.info(".........Training Over!.............")