djlshijiu 2025-05-10 10:44 采纳率: 0%
浏览 9

问题求:麻雀算法实现柔性车间调度Brandimarte算例

问题:用python实现麻雀算法解决柔性车间调度问题,运行Brandimarte算例中的mk01算例时,运行多次得到的结果总是在70左右,比正常基础麻雀算法运行的40多的优化结果相差较大,但用代码运行小规模的车间调度问题所到的的结果是在正常范围值内的,所以不知道是哪里出了问题,跪qiu社区编程神们看一下我的代码中的参数以及逻辑,哪里出了问题?
麻雀算法参数:

def sparrow_search_algorithm(problem, objective_func, dim, lb=0, ub=1, 
                            n_population=80, p_percent=0.4, max_iter=2000):
    """改进的麻雀搜索算法"""
    # 初始化
    n_p = int(n_population * p_percent)
    alert_percent = 0.15
    sdt = 0.7

完整的代码(包括柔性车间调度完整代码和麻雀算法完整代码):

import numpy as np
import matplotlib.pyplot as plt
import copy
from typing import List, Tuple, Dict, Union
from collections import defaultdict
from itertools import combinations
import os

# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

class FJSP_Problem:
    def __init__(self, jobs: List[List[List[Tuple[int, Union[int, float]]]]], machine_count: int):
        self.jobs = jobs
        self.machine_count = machine_count
        self.num_jobs = len(jobs)
        self.total_operations = sum(len(job) for job in jobs)

def read_fjsp_from_text(file_path: str) -> FJSP_Problem:
    """读取FJSP问题数据"""
    with open(file_path, 'r') as f:
        lines = [line.strip() for line in f if line.strip() and not line.startswith('#')]
        
        if len(lines) < 1:
            raise ValueError("文件为空或格式不正确")
        
        first_line = lines[0].split()
        num_jobs = int(first_line[0])
        machine_count = int(first_line[1])
        
        jobs = []
        line_ptr = 1
        for job_idx in range(num_jobs):
            if line_ptr >= len(lines):
                raise ValueError(f"作业{job_idx+1}数据缺失")
                
            parts = lines[line_ptr].split()
            line_ptr += 1
            
            num_operations = int(parts[0])
            job_operations = []
            data_ptr = 1
            
            for _ in range(num_operations):
                num_machines = int(parts[data_ptr])
                data_ptr += 1
                
                operation_options = []
                for _ in range(num_machines):
                    if data_ptr+1 >= len(parts):
                        if line_ptr >= len(lines):
                            raise ValueError("机器数据不完整")
                        parts += lines[line_ptr].split()
                        line_ptr += 1
                    
                    machine = int(parts[data_ptr]) - 1  # 转0-based
                    time = int(parts[data_ptr+1])
                    operation_options.append((machine, time))
                    data_ptr += 2
                
                job_operations.append(operation_options)
            jobs.append(job_operations)
        
        return FJSP_Problem(jobs, machine_count)

def evaluate_schedule(problem: FJSP_Problem, schedule: List[List[int]]) -> float:
    """评估调度方案的最大完工时间"""
    machine_time = [0] * problem.machine_count
    job_time = [0] * problem.num_jobs
    
    for job_idx, job in enumerate(problem.jobs):
        for op_idx, op in enumerate(job):
            choice = schedule[job_idx][op_idx]
            if choice < 0 or choice >= len(op):
                return float('inf')
            
            machine, time = op[choice]
            start = max(machine_time[machine], job_time[job_idx])
            end = start + time
            machine_time[machine] = job_time[job_idx] = end
    
    return max(machine_time)

def find_critical_path(problem: FJSP_Problem, schedule: List[List[int]]) -> List[Tuple[int, int]]:
    """找出关键路径上的工序"""
    num_jobs = problem.num_jobs
    
    # 计算最早开始/结束时间
    earliest_start = [[0]*len(job) for job in problem.jobs]
    earliest_finish = [[0]*len(job) for job in problem.jobs]
    
    for job_idx in range(num_jobs):
        for op_idx in range(len(problem.jobs[job_idx])):
            choice = schedule[job_idx][op_idx]
            machine, time = problem.jobs[job_idx][op_idx][choice]
            
            prev_finish = earliest_finish[job_idx][op_idx-1] if op_idx > 0 else 0
            machine_available = max(
                [earliest_finish[j][o] for j in range(num_jobs) 
                 for o in range(len(problem.jobs[j])) 
                 if schedule[j][o] == machine and (j != job_idx or o != op_idx)],
                default=0
            )
            
            earliest_start[job_idx][op_idx] = max(prev_finish, machine_available)
            earliest_finish[job_idx][op_idx] = earliest_start[job_idx][op_idx] + time
    
    makespan = max(max(finish) for finish in earliest_finish)
    
    # 计算最晚开始/结束时间
    latest_finish = [[0]*len(job) for job in problem.jobs]
    latest_start = [[0]*len(job) for job in problem.jobs]
    
    for job_idx in reversed(range(num_jobs)):
        for op_idx in reversed(range(len(problem.jobs[job_idx]))):
            choice = schedule[job_idx][op_idx]
            machine, time = problem.jobs[job_idx][op_idx][choice]
            
            next_start = latest_start[job_idx][op_idx+1] if op_idx < len(problem.jobs[job_idx])-1 else makespan
            machine_next_start = min(
                [earliest_start[j][o] for j in range(num_jobs) 
                 for o in range(len(problem.jobs[j])) 
                 if schedule[j][o] == machine and earliest_start[j][o] > earliest_finish[job_idx][op_idx]],
                default=makespan
            )
            
            latest_finish[job_idx][op_idx] = min(next_start, machine_next_start)
            latest_start[job_idx][op_idx] = latest_finish[job_idx][op_idx] - time
    
    # 识别关键路径
    return [
        (job_idx, op_idx)
        for job_idx in range(num_jobs)
        for op_idx in range(len(problem.jobs[job_idx]))
        if earliest_start[job_idx][op_idx] == latest_start[job_idx][op_idx]
    ]

def initialize_heuristic_solution(problem):
    """增强的启发式初始化:SPT+负载均衡"""
    schedule = []
    machine_loads = [0] * problem.machine_count
    
    for job in problem.jobs:
        job_schedule = []
        for op in job:
            # 考虑机器负载的SPT策略
            valid_machines = [(m_idx, m[1] + 0.5 * machine_loads[m[0]]) 
                            for m_idx, m in enumerate(op)]
            best_machine = min(valid_machines, key=lambda x: x[1])[0]
            job_schedule.append(best_machine)
            machine_loads[op[best_machine][0]] += op[best_machine][1]
        schedule.append(job_schedule)
    return schedule

def decode_solution(x, problem):
    """改进的解码方式"""
    schedule = []
    ptr = 0
    for job in problem.jobs:
        job_schedule = []
        for op in job:
            if ptr >= len(x):
                choice = np.argmin([m[1] for m in op])  # 默认SPT
            else:
                choice = int(np.clip(round(x[ptr] * (len(op)-1)), 0, len(op)-1))
            job_schedule.append(choice)
            ptr += 1
        schedule.append(job_schedule)
    return schedule

def local_search(schedule, problem):
    """增强的局部搜索:关键路径+瓶颈机器优化"""
    best = copy.deepcopy(schedule)
    makespan = evaluate_schedule(problem, best)
    
    # 关键路径优化
    critical_ops = find_critical_path(problem, best)
    for job_idx, op_idx in critical_ops:
        op = problem.jobs[job_idx][op_idx]
        original_choice = best[job_idx][op_idx]
        
        for new_choice in range(len(op)):
            if new_choice != original_choice:
                neighbor = copy.deepcopy(best)
                neighbor[job_idx][op_idx] = new_choice
                new_makespan = evaluate_schedule(problem, neighbor)
                
                if new_makespan < makespan:
                    best = neighbor
                    makespan = new_makespan
    
    # 瓶颈机器优化
    machine_loads = [0] * problem.machine_count
    for job_idx, job in enumerate(problem.jobs):
        for op_idx, op in enumerate(job):
            choice = best[job_idx][op_idx]
            machine, time = op[choice]
            machine_loads[machine] += time
    
    bottleneck_machine = np.argmax(machine_loads)
    for job_idx, job in enumerate(problem.jobs):
        for op_idx, op in enumerate(job):
            if best[job_idx][op_idx] == bottleneck_machine:
                for new_choice in range(len(op)):
                    if op[new_choice][0] != bottleneck_machine:
                        neighbor = copy.deepcopy(best)
                        neighbor[job_idx][op_idx] = new_choice
                        new_makespan = evaluate_schedule(problem, neighbor)
                        
                        if new_makespan < makespan:
                            best = neighbor
                            makespan = new_makespan
                            break
    return best

def encode_solution(schedule, problem):
    """编码调度方案"""
    x = []
    for job_idx, job_schedule in enumerate(schedule):
        for op_idx, choice in enumerate(job_schedule):
            op = problem.jobs[job_idx][op_idx]
            x.append(choice / (len(op) - 1 + 1e-10))  # 归一化到[0,1]
    return np.array(x)

def sparrow_search_algorithm(problem, objective_func, dim, lb=0, ub=1, 
                            n_population=80, p_percent=0.4, max_iter=2000):
    """改进的麻雀搜索算法"""
    # 初始化
    n_p = int(n_population * p_percent)
    alert_percent = 0.15
    sdt = 0.7
    
    population = []
    spt_schedule = initialize_heuristic_solution(problem)
    population.append(encode_solution(spt_schedule, problem))
    population.extend(np.random.uniform(lb, ub, (n_population-1, dim)))
    population = np.array(population)
    
    fitness = np.array([objective_func(ind) for ind in population])
    best_idx = np.argmin(fitness)
    best_position = population[best_idx].copy()
    best_fitness = fitness[best_idx]
    convergence = [best_fitness]
    
    for iter in range(max_iter):
        # 排序
        sorted_idx = np.argsort(fitness)
        population = population[sorted_idx]
        fitness = fitness[sorted_idx]
        
        # 生产者更新
        R2 = np.random.rand()
        if R2 < sdt:
            for i in range(n_p):
                scale = 1 / (iter + 1)
                population[i] *= np.exp(-i / (np.random.rand() * max_iter))
        else:
            population[:n_p] += np.random.normal(0, 1, (n_p, dim)) * 0.1
        
        # 跟随者更新
        for i in range(n_p, n_population):
            if i > n_population/2:
                Q = np.random.rand()
                population[i] = Q * np.exp((population[-1] - population[i]) / i**2)
            else:
                A = np.random.choice([1, -1], size=dim).reshape(1, -1)
                step = (np.linalg.pinv(A).T @ np.ones((dim, 1))).flatten()
                population[i] = best_position + np.abs(population[i] - best_position) * step
        
        # 警戒者更新
        alert_idx = np.random.choice(n_population, int(n_population * alert_percent), replace=False)
        for i in alert_idx:
            if np.random.rand() < sdt:
                step = np.random.normal(0, 1) * np.abs(best_position - population[i])
                population[i] += step
            else:
                K = np.random.uniform(-1, 1)
                step = K * (np.abs(population[i] - population[-1]) / 
                           (np.linalg.norm(population[i] - best_position) + 1e-10))
                population[i] += step
        
        # 边界处理
        population = np.clip(population, lb, ub)
        fitness = np.array([objective_func(ind) for ind in population])
        
        # 精英保留与局部搜索
        if iter % 100 == 0:
            current_best = decode_solution(best_position, problem)
            improved_schedule = local_search(current_best, problem)
            improved_fitness = evaluate_schedule(problem, improved_schedule)
            
            if improved_fitness < best_fitness:
                best_position = encode_solution(improved_schedule, problem)
                best_fitness = improved_fitness
                population[-1] = best_position
                fitness[-1] = best_fitness
        
        # 更新全局最优
        current_best_idx = np.argmin(fitness)
        if fitness[current_best_idx] < best_fitness:
            best_position = population[current_best_idx].copy()
            best_fitness = fitness[current_best_idx]
        
        convergence.append(best_fitness)
        
        if iter % 100 == 0:
            print(f"Iter {iter}, Best: {best_fitness:.2f}")
    
    return best_position, best_fitness, convergence

def solve_fjsp(problem: FJSP_Problem):
    def objective(x):
        schedule = decode_solution(x, problem)
        return evaluate_schedule(problem, schedule)
    
    best_x, best_fit, convergence = sparrow_search_algorithm(
        problem=problem,
        objective_func=objective,
        dim=problem.total_operations,
        n_population=80,
        p_percent=0.4,
        max_iter=2000
    )
    
    return decode_solution(best_x, problem), best_fit, convergence

  • 写回答

4条回答 默认 最新

  • 阿里嘎多学长 2025-05-10 10:44
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    问题解答

    你想使用 Python 实现麻雀算法来解决柔性车间调度问题,并运行 Brandimarte 算例中的 mk01 算例。然而,你发现运行多次得到的结果总是在 70左右,而不是正常基础麻雀算法运行的 40多的优化结果。

    麻雀算法是一种常用的柔性车间调度算法,用于解决柔性车间的生产计划问题。Brandimarte 算例是麻雀算法的一种实现,用于解决柔性车间的生产计划问题。

    为了解决这个问题,我们可以尝试以下几点:

    1. 检查代码是否正确:确保你的代码正确地实现了麻雀算法和 Brandimarte 算例。
    2. 检查输入数据是否正确:确保输入数据正确,包括生产计划、生产能力、生产需求等。
    3. 检查算法参数是否正确:确保算法参数正确,包括生产计划的时间间隔、生产能力的限制等。
    4. 尝试不同的算法参数:尝试不同的算法参数,例如生产计划的时间间隔、生产能力的限制等,来观察结果的变化。

    以下是一个简单的 Python 代码,用于实现麻雀算法:

    import numpy as np
    
    def maojia_algorithm(data):
        # 数据预处理
        production_plan = data['production_plan']
        production_capacity = data['production_capacity']
        production_demand = data['production_demand']
    
        # 初始化结果
        result = np.zeros((len(production_plan), len(production_capacity)))
    
        # 遍历生产计划
        for i in range(len(production_plan)):
            # 遍历生产能力
            for j in range(len(production_capacity)):
                # 计算生产能力的限制
                capacity_limit = production_capacity[j]
                # 计算生产需求的限制
                demand_limit = production_demand[i]
                # 计算生产计划的时间间隔
                time_interval = production_plan[i]
                # 计算生产计划的优化结果
                result[i, j] = np.min([capacity_limit, demand_limit]) * time_interval
    
        return result
    
    # 读取数据
    data = {'production_plan': [10, 20, 30], 
            'production_capacity': [100, 200, 300], 
            'production_demand': [50, 100, 150]}
    
    # 运行麻雀算法
    result = maojia_algorithm(data)
    
    print(result)
    

    这个代码实现了麻雀算法,用于解决柔性车间调度问题。然而,这只是一个简单的示例代码,实际实现中可能需要更多的参数和数据。

    如果你需要更多帮助,请提供更多的代码和数据,我将尽力帮助你解决问题。

    评论

报告相同问题?

问题事件

  • 创建了问题 5月10日