from keras import activations
from keras import backend as K
from keras.engine.topology import Layer
def squash(x, axis=-1):
s_squared_norm = K.sum(K.square(x), axis, keepdims=True) + K.epsilon()
scale = K.sqrt(s_squared_norm) / (0.5 + s_squared_norm) #疑问:论文中是加上1,这里是加上了0.5 并且此种变化是分子分母进行了约分
return scale * x
define our own softmax function instead of K.softmax
def softmax(x, axis=-1):
ex = K.exp(x - K.max(x, axis=axis, keepdims=True))
return ex / K.sum(ex, axis=axis, keepdims=True) #疑问:这个将softmax改了 e(x-max(x))/sum( e(x-max)) max可以被任何东西换掉
A Capsule Implement with Pure Keras
class Capsule(Layer):
def init(self, num_capsule, dim_capsule, routings=3, share_weights=True, activation='squash', kwargs):
# Capsule(10, 16, 3, True)(cnn) # [64,128]
super(Capsule, self).__init__(kwargs)
self.num_capsule = num_capsule #胶囊数目 10
self.dim_capsule = dim_capsule #每个胶囊的维度 16
self.routings = routings #动态路由次数
self.share_weights = share_weights #是否共享变量参数
if activation == 'squash':
self.activation = squash
else:
self.activation = activations.get(activation)
def build(self, input_shape):
super(Capsule, self).build(input_shape)
input_dim_capsule = input_shape[-1]
if self.share_weights:#共享权重
self.W = self.add_weight(name='capsule_kernel',
shape=(1,#共享就将胶囊个数赋值为1
input_dim_capsule,
self.num_capsule * self.dim_capsule), #10,16
initializer='glorot_uniform',
trainable=True)
else:
input_num_capsule = input_shape[-2]
self.W = self.add_weight(name='capsule_kernel',
shape=(input_num_capsule,
input_dim_capsule, #
self.num_capsule * self.dim_capsule),
initializer='glorot_uniform',
trainable=True)
def call(self, u_vecs): #[64,128]
if self.share_weights:
u_hat_vecs = K.conv1d(u_vecs, self.W) # [1,128,160]
else:
u_hat_vecs = K.local_conv1d(u_vecs, self.W, [1], [1])
batch_size = K.shape(u_vecs)[0]
input_num_capsule = K.shape(u_vecs)[1]
u_hat_vecs = K.reshape(u_hat_vecs, (batch_size, input_num_capsule,
self.num_capsule, self.dim_capsule))
u_hat_vecs = K.permute_dimensions(u_hat_vecs, (0, 2, 1, 3))
# final u_hat_vecs.shape = [None, num_capsule, input_num_capsule, dim_capsule]
b = K.zeros_like(u_hat_vecs[:, :, :, 0]) # shape = [None, num_capsule, input_num_capsule]
for i in range(self.routings):
c = softmax(b, 1)
o = K.batch_dot(c, u_hat_vecs, [2, 2])
if K.backend() == 'theano':
o = K.sum(o, axis=1)
if i < self.routings - 1:
o = K.l2_normalize(o, -1)
b = K.batch_dot(o, u_hat_vecs, [2, 3])
if K.backend() == 'theano':
b = K.sum(b, axis=1)
return self.activation(o)
def compute_output_shape(self, input_shape):
return (None, self.num_capsule, self.dim_capsule)
搭建网络 ,通过对比普通的CNN模型及Capsule+CNN对比
#************************************************************************************
! -*- coding: utf-8 -*-
from Capsule_Keras import *
from keras import utils
from keras.datasets import mnist
from keras.models import Model
from keras.layers import *
from keras import backend as K
准备训练数据
batch_size = 128
num_classes = 10
img_rows, img_cols = 28, 28
加载数据集
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
换one hot格式
y_train = utils.to_categorical(y_train, num_classes)
y_test = utils.to_categorical(y_test, num_classes)
准备自定义的测试样本
对测试集重新排序并拼接到原来测试集,就构成了新的测试集,每张图片有两个不同数字
idx = list(range(len(x_test)) ) #list(range(10))
打乱顺序
np.random.shuffle(idx)
传入的数组必须具有相同的形状,这里的相同的形状可以满足在拼接方向axis轴上数组间的形状一致即可
X_test = np.concatenate([x_test, x_test[idx]], 1) #(x_train.shape[0], img_rows, img_cols, 1)==》[x_train.shape[0], 2*img_rows, img_cols, 1]
np.vstack:按垂直方向(行顺序)堆叠数组构成一个新的数组
Y_test = np.vstack([y_test.argmax(axis=1), y_test[idx].argmax(axis=1)]).T#Y_test.shape===>[none,2]
X_test = X_test[Y_test[:, 0] != Y_test[:, 1]] # 确保两个数字不一样
Y_test = Y_test[Y_test[:, 0] != Y_test[:, 1]]
Y_test.sort(axis=1) # 排一下序,因为只比较集合,不比较顺序
搭建普通CNN分类模型
input_image = Input(shape=(None, None, 1))
cnn = Conv2D(64, (3, 3), activation='relu')(input_image)
cnn = Conv2D(64, (3, 3), activation='relu')(cnn)
cnn = AveragePooling2D((2, 2))(cnn)
cnn = Conv2D(128, (3, 3), activation='relu')(cnn)
cnn = Conv2D(128, (3, 3), activation='relu')(cnn)
cnn = GlobalAveragePooling2D()(cnn) #(batch_size, rows, cols, channels)==》(batch_size, channels)
dense = Dense(128, activation='relu')(cnn)
output = Dense(10, activation='sigmoid')(dense)
model = Model(inputs=input_image, outputs=output)
model.compile(
#这里就是运行了论文中的losss表达方式
loss=lambda y_true, y_pred: y_true * K.relu(0.9 - y_pred) ** 2 + 0.25 * (1 - y_true) * K.relu(
y_pred - 0.1) ** 2,#y_true是one-one编码 y_pred就是经过sigmoid的output
optimizer='adam',
metrics=['accuracy'])
model.summary()
model.fit(x_train, y_train,
batch_size=batch_size,
epochs=20,
verbose=1,
validation_data=(x_test, y_test))
Y_pred = model.predict(X_test) # 用模型进行预测 X_test.shape=[x_train.shape[0], 2*img_rows, img_cols, 1] Y_pred[x_train.shape[0],num_classes]
#Y_pred.shape=(none, num_classes)
greater = np.sort(Y_pred, axis=1)[:, -2] > 0.5 # 判断预测结果是否大于0.5 返回值是[true,false,true....] 疑问:这个作用是啥,置信度?
#greater 由于是从低到高排序,并且两个正确并且大于0.5才算过,所以取倒数第二个看是否大于0.5即可
Y_pred = Y_pred.argsort()[:, -2:] # 取最高分数的两个类别 argsort 默认从小到大排序的索引 Y_pred[x_train.shape[0],2]
Y_pred.sort(axis=1) # 排序,因为只比较集合 Y_pred[x_train.shape[0],2]
acc = 1. * (np.prod(Y_pred == Y_test, axis=1)).sum() / len(X_test) #np.prod 返回给定轴上的数组元素的乘积。
print(u'CNN+Pooling,不考虑置信度的准确率为:%s' % acc)
acc = 1. * (np.prod(Y_pred == Y_test, axis=1) * greater).sum() / len(X_test)
print(u'CNN+Pooling,考虑置信度的准确率为:%s' % acc)
搭建CNN+Capsule分类模型
一个常规的 Conv2D 模型
input_image = Input(shape=(None, None, 1)) #[28,28,1]
cnn = Conv2D(64, (3, 3), activation='relu')(input_image) #[(28-3+1)/1=26,26,64]
cnn = Conv2D(64, (3, 3), activation='relu')(cnn) #[(26-3+1)/1=[24,24,64]
cnn = AveragePooling2D((2, 2))(cnn) #平均池化 #[12,12,1]
cnn = Conv2D(128, (3, 3), activation='relu')(cnn) #[(12-3+1)/1=[10,10,128]
cnn = Conv2D(128, (3, 3), activation='relu')(cnn) #[(10-3+1)/1=[8,8,128]
cnn = Reshape((-1, 128))(cnn) #[64,128]
capsule = Capsule(10, 16, 3, True)(cnn) #[64,128]
return (None, self.num_capsule, self.dim_capsule)
output = Lambda(lambda x: K.sqrt(K.sum(K.square(x), 2)), output_shape=(10,))(capsule)
model = Model(inputs=input_image, outputs=output)
model.compile(loss=lambda y_true, y_pred: y_true * K.relu(0.9 - y_pred) ** 2 + 0.25 * (1 - y_true) * K.relu(
y_pred - 0.1) ** 2,
optimizer='adam',
metrics=['accuracy'])
model.summary()
model.fit(x_train, y_train, #x_train.shape=[总数目, img_rows, img_cols, 1]
batch_size=batch_size,
epochs=20,
verbose=1,
validation_data=(x_test, y_test))
Y_pred = model.predict(X_test) # 用模型进行预测
greater = np.sort(Y_pred, axis=1)[:, -2] > 0.5 # 判断预测结果是否大于0.5
Y_pred = Y_pred.argsort()[:, -2:] # 取最高分数的两个类别
Y_pred.sort(axis=1) # 排序,因为只比较集合
acc = 1. * (np.prod(Y_pred == Y_test, axis=1)).sum() / len(X_test)
print(u'CNN+Capsule,不考虑置信度的准确率为:%s' % acc)
acc = 1. * (np.prod(Y_pred == Y_test, axis=1) * greater).sum() / len(X_test)
print(u'CNN+Capsule,考虑置信度的准确率为:%s' % acc)