问题:用python实现麻雀算法解决柔性车间调度问题,运行Brandimarte算例中的mk01算例时,运行多次得到的结果总是在70左右,比正常基础麻雀算法运行的40多的优化结果相差较大,但用代码运行小规模的车间调度问题所到的的结果是在正常范围值内的,所以不知道是哪里出了问题,跪qiu社区编程神们看一下我的代码中的参数以及逻辑,哪里出了问题?
麻雀算法参数:
def sparrow_search_algorithm(problem, objective_func, dim, lb=0, ub=1,
n_population=80, p_percent=0.4, max_iter=2000):
"""改进的麻雀搜索算法"""
# 初始化
n_p = int(n_population * p_percent)
alert_percent = 0.15
sdt = 0.7
完整的代码(包括柔性车间调度完整代码和麻雀算法完整代码):
import numpy as np
import matplotlib.pyplot as plt
import copy
from typing import List, Tuple, Dict, Union
from collections import defaultdict
from itertools import combinations
import os
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
class FJSP_Problem:
def __init__(self, jobs: List[List[List[Tuple[int, Union[int, float]]]]], machine_count: int):
self.jobs = jobs
self.machine_count = machine_count
self.num_jobs = len(jobs)
self.total_operations = sum(len(job) for job in jobs)
def read_fjsp_from_text(file_path: str) -> FJSP_Problem:
"""读取FJSP问题数据"""
with open(file_path, 'r') as f:
lines = [line.strip() for line in f if line.strip() and not line.startswith('#')]
if len(lines) < 1:
raise ValueError("文件为空或格式不正确")
first_line = lines[0].split()
num_jobs = int(first_line[0])
machine_count = int(first_line[1])
jobs = []
line_ptr = 1
for job_idx in range(num_jobs):
if line_ptr >= len(lines):
raise ValueError(f"作业{job_idx+1}数据缺失")
parts = lines[line_ptr].split()
line_ptr += 1
num_operations = int(parts[0])
job_operations = []
data_ptr = 1
for _ in range(num_operations):
num_machines = int(parts[data_ptr])
data_ptr += 1
operation_options = []
for _ in range(num_machines):
if data_ptr+1 >= len(parts):
if line_ptr >= len(lines):
raise ValueError("机器数据不完整")
parts += lines[line_ptr].split()
line_ptr += 1
machine = int(parts[data_ptr]) - 1 # 转0-based
time = int(parts[data_ptr+1])
operation_options.append((machine, time))
data_ptr += 2
job_operations.append(operation_options)
jobs.append(job_operations)
return FJSP_Problem(jobs, machine_count)
def evaluate_schedule(problem: FJSP_Problem, schedule: List[List[int]]) -> float:
"""评估调度方案的最大完工时间"""
machine_time = [0] * problem.machine_count
job_time = [0] * problem.num_jobs
for job_idx, job in enumerate(problem.jobs):
for op_idx, op in enumerate(job):
choice = schedule[job_idx][op_idx]
if choice < 0 or choice >= len(op):
return float('inf')
machine, time = op[choice]
start = max(machine_time[machine], job_time[job_idx])
end = start + time
machine_time[machine] = job_time[job_idx] = end
return max(machine_time)
def find_critical_path(problem: FJSP_Problem, schedule: List[List[int]]) -> List[Tuple[int, int]]:
"""找出关键路径上的工序"""
num_jobs = problem.num_jobs
# 计算最早开始/结束时间
earliest_start = [[0]*len(job) for job in problem.jobs]
earliest_finish = [[0]*len(job) for job in problem.jobs]
for job_idx in range(num_jobs):
for op_idx in range(len(problem.jobs[job_idx])):
choice = schedule[job_idx][op_idx]
machine, time = problem.jobs[job_idx][op_idx][choice]
prev_finish = earliest_finish[job_idx][op_idx-1] if op_idx > 0 else 0
machine_available = max(
[earliest_finish[j][o] for j in range(num_jobs)
for o in range(len(problem.jobs[j]))
if schedule[j][o] == machine and (j != job_idx or o != op_idx)],
default=0
)
earliest_start[job_idx][op_idx] = max(prev_finish, machine_available)
earliest_finish[job_idx][op_idx] = earliest_start[job_idx][op_idx] + time
makespan = max(max(finish) for finish in earliest_finish)
# 计算最晚开始/结束时间
latest_finish = [[0]*len(job) for job in problem.jobs]
latest_start = [[0]*len(job) for job in problem.jobs]
for job_idx in reversed(range(num_jobs)):
for op_idx in reversed(range(len(problem.jobs[job_idx]))):
choice = schedule[job_idx][op_idx]
machine, time = problem.jobs[job_idx][op_idx][choice]
next_start = latest_start[job_idx][op_idx+1] if op_idx < len(problem.jobs[job_idx])-1 else makespan
machine_next_start = min(
[earliest_start[j][o] for j in range(num_jobs)
for o in range(len(problem.jobs[j]))
if schedule[j][o] == machine and earliest_start[j][o] > earliest_finish[job_idx][op_idx]],
default=makespan
)
latest_finish[job_idx][op_idx] = min(next_start, machine_next_start)
latest_start[job_idx][op_idx] = latest_finish[job_idx][op_idx] - time
# 识别关键路径
return [
(job_idx, op_idx)
for job_idx in range(num_jobs)
for op_idx in range(len(problem.jobs[job_idx]))
if earliest_start[job_idx][op_idx] == latest_start[job_idx][op_idx]
]
def initialize_heuristic_solution(problem):
"""增强的启发式初始化:SPT+负载均衡"""
schedule = []
machine_loads = [0] * problem.machine_count
for job in problem.jobs:
job_schedule = []
for op in job:
# 考虑机器负载的SPT策略
valid_machines = [(m_idx, m[1] + 0.5 * machine_loads[m[0]])
for m_idx, m in enumerate(op)]
best_machine = min(valid_machines, key=lambda x: x[1])[0]
job_schedule.append(best_machine)
machine_loads[op[best_machine][0]] += op[best_machine][1]
schedule.append(job_schedule)
return schedule
def decode_solution(x, problem):
"""改进的解码方式"""
schedule = []
ptr = 0
for job in problem.jobs:
job_schedule = []
for op in job:
if ptr >= len(x):
choice = np.argmin([m[1] for m in op]) # 默认SPT
else:
choice = int(np.clip(round(x[ptr] * (len(op)-1)), 0, len(op)-1))
job_schedule.append(choice)
ptr += 1
schedule.append(job_schedule)
return schedule
def local_search(schedule, problem):
"""增强的局部搜索:关键路径+瓶颈机器优化"""
best = copy.deepcopy(schedule)
makespan = evaluate_schedule(problem, best)
# 关键路径优化
critical_ops = find_critical_path(problem, best)
for job_idx, op_idx in critical_ops:
op = problem.jobs[job_idx][op_idx]
original_choice = best[job_idx][op_idx]
for new_choice in range(len(op)):
if new_choice != original_choice:
neighbor = copy.deepcopy(best)
neighbor[job_idx][op_idx] = new_choice
new_makespan = evaluate_schedule(problem, neighbor)
if new_makespan < makespan:
best = neighbor
makespan = new_makespan
# 瓶颈机器优化
machine_loads = [0] * problem.machine_count
for job_idx, job in enumerate(problem.jobs):
for op_idx, op in enumerate(job):
choice = best[job_idx][op_idx]
machine, time = op[choice]
machine_loads[machine] += time
bottleneck_machine = np.argmax(machine_loads)
for job_idx, job in enumerate(problem.jobs):
for op_idx, op in enumerate(job):
if best[job_idx][op_idx] == bottleneck_machine:
for new_choice in range(len(op)):
if op[new_choice][0] != bottleneck_machine:
neighbor = copy.deepcopy(best)
neighbor[job_idx][op_idx] = new_choice
new_makespan = evaluate_schedule(problem, neighbor)
if new_makespan < makespan:
best = neighbor
makespan = new_makespan
break
return best
def encode_solution(schedule, problem):
"""编码调度方案"""
x = []
for job_idx, job_schedule in enumerate(schedule):
for op_idx, choice in enumerate(job_schedule):
op = problem.jobs[job_idx][op_idx]
x.append(choice / (len(op) - 1 + 1e-10)) # 归一化到[0,1]
return np.array(x)
def sparrow_search_algorithm(problem, objective_func, dim, lb=0, ub=1,
n_population=80, p_percent=0.4, max_iter=2000):
"""改进的麻雀搜索算法"""
# 初始化
n_p = int(n_population * p_percent)
alert_percent = 0.15
sdt = 0.7
population = []
spt_schedule = initialize_heuristic_solution(problem)
population.append(encode_solution(spt_schedule, problem))
population.extend(np.random.uniform(lb, ub, (n_population-1, dim)))
population = np.array(population)
fitness = np.array([objective_func(ind) for ind in population])
best_idx = np.argmin(fitness)
best_position = population[best_idx].copy()
best_fitness = fitness[best_idx]
convergence = [best_fitness]
for iter in range(max_iter):
# 排序
sorted_idx = np.argsort(fitness)
population = population[sorted_idx]
fitness = fitness[sorted_idx]
# 生产者更新
R2 = np.random.rand()
if R2 < sdt:
for i in range(n_p):
scale = 1 / (iter + 1)
population[i] *= np.exp(-i / (np.random.rand() * max_iter))
else:
population[:n_p] += np.random.normal(0, 1, (n_p, dim)) * 0.1
# 跟随者更新
for i in range(n_p, n_population):
if i > n_population/2:
Q = np.random.rand()
population[i] = Q * np.exp((population[-1] - population[i]) / i**2)
else:
A = np.random.choice([1, -1], size=dim).reshape(1, -1)
step = (np.linalg.pinv(A).T @ np.ones((dim, 1))).flatten()
population[i] = best_position + np.abs(population[i] - best_position) * step
# 警戒者更新
alert_idx = np.random.choice(n_population, int(n_population * alert_percent), replace=False)
for i in alert_idx:
if np.random.rand() < sdt:
step = np.random.normal(0, 1) * np.abs(best_position - population[i])
population[i] += step
else:
K = np.random.uniform(-1, 1)
step = K * (np.abs(population[i] - population[-1]) /
(np.linalg.norm(population[i] - best_position) + 1e-10))
population[i] += step
# 边界处理
population = np.clip(population, lb, ub)
fitness = np.array([objective_func(ind) for ind in population])
# 精英保留与局部搜索
if iter % 100 == 0:
current_best = decode_solution(best_position, problem)
improved_schedule = local_search(current_best, problem)
improved_fitness = evaluate_schedule(problem, improved_schedule)
if improved_fitness < best_fitness:
best_position = encode_solution(improved_schedule, problem)
best_fitness = improved_fitness
population[-1] = best_position
fitness[-1] = best_fitness
# 更新全局最优
current_best_idx = np.argmin(fitness)
if fitness[current_best_idx] < best_fitness:
best_position = population[current_best_idx].copy()
best_fitness = fitness[current_best_idx]
convergence.append(best_fitness)
if iter % 100 == 0:
print(f"Iter {iter}, Best: {best_fitness:.2f}")
return best_position, best_fitness, convergence
def solve_fjsp(problem: FJSP_Problem):
def objective(x):
schedule = decode_solution(x, problem)
return evaluate_schedule(problem, schedule)
best_x, best_fit, convergence = sparrow_search_algorithm(
problem=problem,
objective_func=objective,
dim=problem.total_operations,
n_population=80,
p_percent=0.4,
max_iter=2000
)
return decode_solution(best_x, problem), best_fit, convergence