对于人员排班问题,
考虑GNN+RL来解决。
尝试采用GATLSTM读取当前排班状态输出动作概率空间,即将图神经网络作为策略函数。
题目要求里有个硬约束:
一名员工一旦被排班,需要出勤 12 小时(其中包括 2 小时的休息时间),
同一员工两次排班之间需要间隔 12 小时以上。一名员工出勤一次视为 1 人次。
评分标准
评价指标=max(总货量需求-总可完成货量, 0)1/1000+均衡指数10000+出勤人次
评价指标低者更优;得分一致时,求解时间短者更优。
故计算损失函数 = 评价指标+W*penalty(学习硬约束)
然而梯度输出全为0
模型对penalty没有反应
部分代码:
class GATLayer(nn.Module):
def __init__(self, in_dim, hidden_dim, out_dim, num_heads):
super(GATLayer, self).__init__()
self.num_heads = num_heads
self.head_dim = hidden_dim // num_heads
# 对源节点和目标节点的特征分别进行线性变换
self.fc_src = nn.Linear(in_dim, hidden_dim, bias=False)
self.fc_dst = nn.Linear(in_dim, hidden_dim, bias=False)
self.attn_fc = nn.Linear(2 * hidden_dim, 1, bias=False)
self.embedding_src = nn.Linear(hidden_dim, hidden_dim) # 对源节点特征进行嵌入
self.embedding_dst = nn.Linear(hidden_dim, hidden_dim)
self.dropout = nn.Dropout(0.3) #加入噪声增加鲁棒性
self.init_weights()
def init_weights(self):
nn.init.xavier_uniform_(self.fc_src.weight)
nn.init.xavier_uniform_(self.fc_dst.weight)
nn.init.xavier_uniform_(self.attn_fc.weight)
nn.init.xavier_uniform_(self.embedding_src.weight)
nn.init.xavier_uniform_(self.embedding_dst.weight)
def edge_attention(self, edges):
z_src = edges.src['z_src']
z_dst = edges.dst['z_dst']
a = self.attn_fc(th.cat([z_src, z_dst], dim=1))# 计算注意力分数
e = F.leaky_relu(a)# 应用激活函数
return {'e': e}# 返回边的注意力分数
def message_func(self, edges):
return {'z': edges.src['z_src'], 'e': edges.data['e']}
def reduce_func(self, nodes):
alpha = th.softmax(nodes.mailbox['e'], dim=1)
h = th.sum(alpha * nodes.mailbox['z'], dim=1)
return {'h': h}
def forward(self, g, h):
z_src = self.fc_src(h)
z_src = F.leaky_relu(z_src) # 添加非线性激活
z_src = self.embedding_src(z_src) # 特征嵌入
z_src = self.dropout(z_src)
z_dst = self.fc_dst(h)
z_dst = F.leaky_relu(z_dst) # 添加非线性激活
z_dst = self.embedding_dst(z_dst) # 特征嵌入
z_dst = self.dropout(z_dst)
g.srcdata['z_src'] = z_src # 使用不同的特征名称
g.dstdata['z_dst'] = z_dst
g.apply_edges(self.edge_attention)
g.update_all(self.message_func, self.reduce_func)
return g.ndata.pop('h')
class GAT(nn.Module):
def __init__(self, in_dim, hidden_dim, num_heads, num_classes):
super(GAT, self).__init__()
self.hidden_dim = hidden_dim
self.gat1 = GATLayer(in_dim, hidden_dim, hidden_dim * num_heads, num_heads)
self.gat2 = GATLayer(hidden_dim, hidden_dim, hidden_dim * num_heads, num_heads)
self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
self.bn1 = nn.BatchNorm1d(hidden_dim)
self.fc = nn.Linear(hidden_dim, num_classes)
self.dropout = nn.Dropout(0.3)
self.init_weights()
def init_weights(self):
nn.init.xavier_uniform_(self.fc.weight)
for name, param in self.lstm.named_parameters():
if 'weight' in name:
nn.init.xavier_uniform_(param)
elif 'bias' in name:
nn.init.constant_(param, 0)
def forward(self, g, features, hidden):
x = self.gat1(g, features)
x = self.dropout(x)
#sx = self.gat2(g, x)
x = x.unsqueeze(0)
x, hidden = self.lstm(x, hidden)
x = x.squeeze(0) #去除batch维度
x = self.bn1(x)
x = self.fc(x)
x = x.view(data0.node_sum,1)
return x, hidden
def init_hidden(self, batch_size):
return (th.zeros(1, batch_size, self.hidden_dim),
th.zeros(1, batch_size, self.hidden_dim))
```python
infeat = initial_state.size(1)
model = GAT(infeat,30,8,1)
optimizer = optim.SGD(model.parameters(), lr=learning_rate) #优化器
evaluation = float('inf')
for episode in range(num_episodes): #玩num轮
#重置状态
schedule = {}
employee_hours = {}
employee_schedule = {}
state_tensor = env.reset(g)
episode_reward = 0
penalty0 = 0
batch_size = 1
hidden = model.init_hidden(batch_size)
total_loss = th.tensor(0.0, requires_grad=True)
print(f"episode{episode}")
for step in range(data0.time-data0.cycle+1):
optimizer.zero_grad()
#print("step:"+str(step))
for dept in range(1,5):
f,hidden = model(g, state_tensor, hidden) # 使用策略网络获取动作概率
action_probs = f[:len(data0.employee_data)]
reward, penalty = env.step(g, employee_hours, employee_schedule, step, dept, action_probs) #采取动作
penalty0 += penalty
episode_reward += reward * (discount_factor ** step)# 计算奖励折扣和更新 episode_reward
schedule,state_tensor = env.observe(g, schedule) #更新环境
#print("step_reward:"+str(reward))
loss2, loss3 = calloss2(employee_hours)
loss1 = calloss1(g)
total_loss = th.add(total_loss, 0.001*max(loss1,0)+loss2*10000+loss3 + penalty0*10000)
print(f"total_loss:{total_loss.item()} loss1:{0.001*max(loss1,0)} loss2:{loss2*10000} loss3:{loss3} penalty{penalty0*10}")
total_loss = total_loss / 1000000
print(f"lossadjus:{total_loss}")
total_loss.backward()
#th.nn.utils.clip_grad_norm_(model.parameters(), max_norm=50.0)
grad_norm = th.nn.utils.clip_grad_norm_(model.parameters(), 2)
print('Gradient Norm:', grad_norm)
optimizer.step()
补充:强化学习的部分对于action是依照输出的action_probs选取员工,若在该时间点员工is_available不为0,则给予惩罚。