Homework 6 Part1
由于 Homework 6 的笔记内容有些多,我把它拆分成三部分了。
Part1 的内容是从零实现 Neural Nets 的相关组件。
1. 总体架构
作业初始代码的总体架构如下:
models.py
:神经网络模型实现:负责整个网络的前向传播、反向传播和训练循环layers.py
:网络层实现:实现全连接层、二维卷积层、一维批量归一化层的前向传播和反向传播方法。activations.py
:多种激活函数实现。optimizers.py
:多种优化器实现。schedulers.py
:学习率调度器实现。losses.py
:损失函数实现。datasets.py
:数据集管理实现:实现了对数据集的批处理、数据预处理(归一化、标准化)weights.py
: 权重初始化策略实现。
下面具体讲解作业要求实现的代码和我自己写的一些辅助的代码文件。
2. 全连接层 FullyConnected
forward
方法
全连接层的 forward
方法实现较简单,只需要计算下面的结果:
并将计算过程中的值存入缓存即可:
def forward(self, X: np.ndarray) -> np.ndarray:
"""Forward pass: multiply by a weight matrix, add a bias, apply activation.
Also, store all necessary intermediate results in the `cache` dictionary
to be able to compute the backward pass.
"""
# initialize layer parameters if they have not been initialized
if self.n_in is None:
self._init_parameters(X.shape)
W = self.parameters.get('W')
b = self.parameters.get('b')
Z = X @ W + b
# perform an affine transformation and activation
Y = self.activation(Z)
# store information necessary for backprop in `self.cache`
self.cache['X'] = X
self.cache['Z'] = Z
return Y
backward
方法
在 backward
方法中,我们取出之前缓存的值,然后从后往前计算梯度,最终得到损失函数相对 的梯度:
这里注意 的梯度计算:我们是通过在 axis=0
求和来计算 的梯度的,因为 在 forward 过程中被加到了 axis=1
的维度上,因此我们沿着除这个维度外的方向求和。
还有一个简单的判断方法: 这个式子是沿着
axis=1
的方向进行求和的(数据形状为 )。
def backward(self, dLdY: np.ndarray) -> np.ndarray:
"""Backward pass for fully connected layer.
Compute the gradients of the loss with respect to:
1. the weights of this layer (mutate the `gradients` dictionary)
2. the bias of this layer (mutate the `gradients` dictionary)
3. the input of this layer (return this)
"""
# unpack the cache
X = self.cache.get('X')
Z = self.cache.get('Z')
W = self.parameters.get('W')
# compute the gradients of the loss w.r.t. all parameters as well as the
# input of the layer
dLdZ = self.activation.backward(Z, dLdY)
dLdW = X.T @ dLdZ
dLdb = np.sum(dLdZ, axis=0, keepdims=True)
dLdX = dLdZ @ W.T
# store the gradients in `self.gradients`
# the gradient for self.parameters["W"] should be stored in
# self.gradients["W"], etc.
self.gradients['W'] = dLdW
self.gradients['b'] = dLdb
return dLdX
3. 批量归一化层 BatchNorm1D
原理概述
在批归一化层中,我们试着对网络中间的隐藏层 (hidden unit) 进行归一化处理。但是对于隐藏层,它的输出值是随着网络参数在训练过程中不断变化的,我们无法预先知道这些激活值的均值和方差。因此,批量归一化层被提出来了。
BN 本质上是在网络中增加了一个新的“层”,这个层会对前一层输出的激活值 进行处理,然后将处理后的结果 作为下一层的输入。这个处理过程在每次训练迭代(即每个小批量/minibatch)中动态进行。
首先,BN 会计算 minibatch 的统计数据。假设这些小批量数据包含 个样本,通过隐藏层产生了 个激活向量 。BN 计算得出这些数据的均值 和方差 :
利用上面计算出来的方差,BN 对当前 minibatch 的每个值进行归一化:
这里的 是一个很小的正数,防止分母为0。
不过,均值为0、方差为1的分布不一定是最好的分布,强制把每一层的输出都变成这样的标准分布可能会限制网络的表达能力。因此 BN 引入了两个可学习的参数 、,让网络自己去学习具体的分布:
同时,在训练时我们可以获取到 minibatch 的统计数据,但是在测试推理过程中,我们通常一次只处理一个样本,并没有 minibatch 这个东西。为了解决这个问题,在训练过程中,BN 会维护一个全局的统计 量:移动平均均值 和移动平均方差 。在每个批次的训练中,这些值都会按照如下方式更新:
这里的 是一个衰减系数。这样,训练结束后,、、、 的值固定下来。对于新的测试样本,BN层会使用这些固定值进行计算。
同样地,既然这些值也需要进行学习,我们也像对待其他神经网络一样对它们进行 forward
和 backward
。
forward
方法
BN 的 forward
方法实现非常简单,只需要更新 、,并使用这些值计算当前 minibatch 的 即可。
同时,根据前面所说,我们需要把训练和测试分开计算:
def forward(self, X: np.ndarray, mode: str = "train") -> np.ndarray:
""" Forward pass for 1D batch normalization layer.
Allows taking in an array of shape (B, C) and performs batch normalization over it.
"""
# implement a batch norm forward pass
if mode == 'train':
mu = np.mean(X, axis=0, keepdims=True) # Shape (1, n_features)
sigma = np.var(X, axis=0, keepdims=True) # Shape (1, n_features)
# Update running statistics with exponential moving average
# Note: running stats are initialized as zeros in _init_parameters
self.cache['running_mu'] = self.momentum * self.cache['running_mu'] + (1 - self.momentum) * mu
self.cache['running_var'] = self.momentum * self.cache['running_var'] + (1 - self.momentum) * sigma
elif mode == 'test':
mu = self.cache['running_mu']
sigma = self.cache['running_var']
else:
raise ValueError('invalid mode str')
X_hat = (X - mu) / np.sqrt(sigma + self.eps)
# cache any values required for backprop
self.cache['X'] = X
self.cache['X_hat'] = X_hat
self.cache['mu'] = mu.flatten() if mu.ndim > 1 else mu # Keep 1D for backward compatibility
self.cache['var'] = sigma.flatten() if sigma.ndim > 1 else sigma # Keep 1D for backward compatibility
gamma = self.parameters['gamma']
beta = self.parameters['beta']
Y = gamma * X_hat + beta
return Y
backward
方法
我们先列出反向传播的计算过程:
我们先计算对缩放参数 和平移参数 的梯度:
将上游梯度传回标准化输出 :
然后我们计算方差的梯度:
然后我们计算均值的梯度:
现在我们开始计算 的梯度来源。对于 ,两边求梯度(这里采用不太严谨的写法):
它贡献了 的权重。同样地,有:
这里我们直接简略掉求和,在实际代码中,对于原先是 的项,我们需要和前面对偏置项的梯度运算一样,使用
np.sum
对特定的axis
求和。
于是我们得到 相对损失函数最终的梯度计算式:
同时需要注意:参数
dY
是通过激活层后的梯度,因此要得到 需要先从激活层反向传播。
def backward(self, dY: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Backward method for batch normalization layer. You don't need to implement this to get full credit, although it is
fun to do so if you have the time.
"""
X = self.cache['X']
X_hat = self.cache['X_hat']
mu = self.cache['mu']
var = self.cache['var']
gamma = self.parameters['gamma']
batch_size = X.shape[0]
# implement backward pass for batchnorm.
dGamma = np.sum(dY * X_hat, axis=0)
dBeta = np.sum(dY, axis=0)
dX_hat = dY * gamma
dVar = np.sum(dX_hat * (X - mu) * (-0.5) * (var + self.eps)**(-1.5), axis=0)
dMu = (np.sum(dX_hat * (-1.0 / np.sqrt(var + self.eps)), axis=0) +
dVar * np.sum(-2.0 * (X - mu), axis=0) / batch_size)
dX = (dX_hat / np.sqrt(var + self.eps) +
dVar * 2.0 * (X - mu) / batch_size +
dMu / batch_size)
self.gradients["gamma"] = dGamma
self.gradients["beta"] = dBeta
## END YOUR CODE ##
return dX
4. 卷积层 Conv2D
卷积层原理概述
在阐释具体原理前,我们先明确符号定义:
- :网络的输入,通常是一个图像张量(多维数组)。它的形状是 ,其中 、 是图像的空间维度(高度和宽度), 是通道数。对于彩色图片而言, 通常为 3,表示红、绿、蓝三种颜色的强度。
- :网络的输出,通常是一个形状为 的向量( 是类别的数量)。
- :第 层的通道数。这也被称为滤波器的数量(见 cnn 笔记内容)。
- :第 层的卷积核大小。
- :一个四维张量,它包含了第 层所有的滤波器。它的形状是 。
- :第 层的偏置向量,形状为 。每个滤波器对应一个偏置值。
- : 第 层的输出。它是一个形状为 的张量。 是卷积操作后输出的空间尺寸,之后会详细讲解它的计算方法。
- : 应用在第 层的非线性激活函数(如 ReLU)。
我们知道,在卷积层中,每个滤波器都会与输入图像的所有通道进行卷积。这个操作本质上是一个滑动式的、逐元素乘积之和。令 为在第 个滤波器在 位置的输出,则有:
这个式子看起来很复杂,在之后的代码实现中会详细讲解怎么将这个东西向量化的。
这一操作其实并不是数学意义上的卷积,而是互相关操作:我们使用卷积核,在输入图像上从左到右、从上到下地滑动。在每一个位置,我们计算卷积核和它所覆盖的图像区域有多么“匹配”。而特征图就是我们匹配后的输出,它是一张新的“地图”,图上的每个点的值代表了卷积核与原始图像在该点的匹配程度。值越高,说明匹配得越好。
在上面的公式中,我们是每次将滤波器在图像上移动一个像素。我们也可以选择更大的移动步长 stride
。
卷积层的最终输出 是将特征图 通过激活函数 得到的:
池化层原理概述
池化层用于对输入的特征图进行下采样 (downsample)。它接收一个形状为 的输入数组,输出一个形状为 的数组。
池化操作也有一个核(尺寸为 )和一个步长 。对于输入的每个通道,我们取 窗口内的所有点的最大值或平均值。然后,我们将这个窗口滑动 个像素,重复此过程,直到处理完整个特征图。
同样地,池化层的输出计算如下:
传统的 CNN 通过交替组合卷积层和池化层来处理图像。这个过程会逐步地“压缩”输入的空间尺寸,同时通过卷积层提取越来越复杂的特征(通道数 通常会增加)。当特征图的空间尺寸变得足够小时,就将其展平(flatten),送入一个或多个全连接网络层 (fully-connected layers),最终进行分类或回归任务。
forward
方法
直接翻译卷积层的 forward
计算,我们直接
def convolution_forward_loops(X, W, b, pad, stride):
pad_h, pad_w = pad
k_h, k_w, in_channels, out_channels = W.shape
n_examples, in_rows, in_cols, _ = X.shape
out_rows = (in_rows + 2 * pad_h - k_h) // stride + 1
out_cols = (in_cols + 2 * pad_w - k_w) // stride + 1
X_padded = np.pad(
X,
((0, 0), (pad_h, pad_h), (pad_w, pad_w), (0, 0)),
mode='constant',
constant_values=0
)
out = np.zeros((n_examples, out_rows, out_cols, out_channels))
for n in range(n_examples):
for h_out in range(out_rows):
for w_out in range(out_cols):
h_start = h_out * stride
h_end = h_start + k_h
w_start = w_out * stride
w_end = w_start + k_w
input_slice = X_padded[n, h_start:h_end, w_start:w_end, :]
for c_out in range(out_channels):
kernel_slice = W[:, :, :, c_out]
out[n, h_out, w_out, c_out] = np.sum(input_slice * kernel_slice)
out += b
return out
这个循环十分低效,它通过循环提取 input_slice
。而向量化的目标是一次性地准备好所有的 input_slice
,然后用一次计算完成所有的点积。为了实现这个,我们可以将卷积操作转化为一个单一、巨大的矩阵乘法问题。
首先,在 numpy 中,一个数组(或矩阵)由下面这些性质决定:
shape
:决定数组的形状。stride
:决定要移动多少个字节才能到达下一个维度的下一个元素。
因此,我们创建一个包含滑动窗口所有要素的数组作为我们的 shape
:
view_shape=(n_examples, out_rows, out_cols, k_h, k_w, in_channels)
为什么要包含所有要素呢?因为之后我们就要用这个 shape 的矩阵做计算了,因此必须把这些都准备好。
然后,我们定义这些维度的移动规则:
s_n, s_r, s_c, s_ch = X_padded.stride
view_strides = (s_n, s_r * stride, s_c * stride, s_r, s_c, s_ch)
对
k_h
而言,一次只移动s_r
步;而对于实际的out_rows
,移动的距离需要乘以步长stride
。
然后我们就可以构建出包含所有窗口的窗口矩阵了:
X_windows = np.lib.stride_tricks.as_strided(
X_padded,
shape=view_shape,
strides=view_strides
)
构造完成窗口矩阵后,我们需要用这个矩阵和权重矩阵 进行运算。我们使用 np.einsum
来进行乘法运算。einsum
能够找到两个张量共享的维度标签、沿着这些标签进行元素级别的乘法。
- 对于那些只出现在输入部分而没有出现在输出部分的标签,
einsum
会将乘法结果沿着这些维度求和。 - 对于那些同时出现在输入和输出部分的标签,
einsum
会保留这些维度。
综上,我们得到最终向量化的运算代码(也就是作业中要求的 “must operate on minibatches of data”):
def forward(self, X: np.ndarray) -> np.ndarray:
"""Forward pass for convolutional layer. This layer convolves the input
`X` with a filter of weights, adds a bias term, and applies an activation
function to compute the output. This layer also supports padding and
integer strides. Intermediates necessary for the backward pass are stored
in the cache.
"""
if self.n_in is None:
self._init_parameters(X.shape)
W = self.parameters["W"]
b = self.parameters["b"]
pad_h, pad_w = self.pad
stride = self.stride
k_h, k_w, in_channels, _ = W.shape
n_examples, in_rows, in_cols, _ = X.shape
out_rows = (in_rows + 2 * pad_h - k_h) // stride + 1
out_cols = (in_cols + 2 * pad_w - k_w) // stride + 1
X_padded = np.pad(
X,
((0, 0), (pad_h, pad_h), (pad_w, pad_w), (0, 0)),
mode='constant'
)
view_shape = (n_examples, out_rows, out_cols, k_h, k_w, in_channels)
s_n, s_r, s_c, s_ch = X_padded.strides
view_strides = (s_n, s_r * stride, s_c * stride, s_r, s_c, s_ch)
X_windows = np.lib.stride_tricks.as_strided(
X_padded,
shape=view_shape,
strides=view_strides
)
out = np.einsum('nhwkij,kijo->nhwo', X_windows, W)
out += b
self.cache['X'] = X
self.cache['Z'] = out
out = self.activation.forward(out)
return out
backward
方法
同样地,我们以 CNN 在激活层前的输出为起点:
卷积层的权重、偏执以及输入梯度的计算比较简单,它是由卷积核和原有图像矩阵窗口直接相乘得到的,和全连接层的区别只在于卷积核需要滑动窗口:
对偏置项,每个卷积核有自己的偏置项,我们沿着除 out_channels
进行求和:
对权重矩阵,有:
输入矩阵的处理比较复杂,有:
在计算 的梯度时,我们需要对 进行翻转与转置。其中插值上采样 (Upsampling with Zeros) 后的矩阵 :
这里的公式看起来有些复杂,并且有一些在 forward
中没有的翻转和转置操作,下面我们逐一详解。
首先是权重矩阵的翻转。我们回到前向传播的公式:
在前向传播中,输出位置 用到的输入为 ,核索引为 。
而在反向传播中,输入矩阵的梯度计算如下:
可以发现,我们的核索引变成了 。我们需要把前向传播的 转换成后向传播的 ,也就是把矩阵进行翻转。
然后是转置操作,这个操作比较容易理解:在前向传播中,我们的权重矩阵 将 in_channels
映射到 out_channels
中;而在反向传播中,我们需要把 out_channels
的结果“倒流”回 in_channels
中,因此这两个维度在反向传播中需要转置。
最后是插值上采样过程。在前向传播的卷积过程中,由于卷积核可能会用大于 1 的步长进行卷积,输出空间会变得更加稀疏。这导致了:
- 输入的位置比输出更多。
- 前向过程中,stride 跳过了一部分位置。
于是我们需要“复原”前向传播时的跳格子操作——靠在 dLdZ
的格子之间插零来扩展成跟 stride=1 一样的稠密网格。
综合这些推导和前面的“构建大矩阵”的操作,可以写出如下的向量化实现:
def backward(self, dLdY: np.ndarray) -> np.ndarray:
"""Backward pass for conv layer. Computes the gradients of the output
with respect to the input feature maps as well as the filter weights and
biases.
"""
X = self.cache['X']
Z = self.cache['Z']
W = self.parameters['W']
pad_h, pad_w = self.pad
stride = self.stride
k_h, k_w = self.kernel_shape
_, _, in_channels, _ = W.shape
dLdZ = self.activation.backward(Z, dLdY)
n_examples, out_rows, out_cols, out_channels = dLdZ.shape
dLdb = np.sum(dLdZ, axis=(0, 1, 2))
self.gradients['b'] = dLdb
X_padded = np.pad(
X,
((0, 0), (pad_h, pad_h), (pad_w, pad_w), (0, 0)),
mode='constant'
)
view_shape = (n_examples, out_rows, out_cols, k_h, k_w, in_channels)
s_n, s_r, s_c, s_ch = X_padded.strides
view_strides = (s_n, s_r * stride, s_c * stride, s_r, s_c, s_ch)
X_windows = np.lib.stride_tricks.as_strided(
X_padded,
shape=view_shape,
strides=view_strides
)
dLdW = np.einsum('nhwijk,nhwo->ijko', X_windows, dLdZ)
self.gradients['W'] = dLdW
W_rot = np.rot90(W, 2, axes=(0, 1))
W_transposed = W_rot.transpose(0, 1, 3, 2)
if stride > 1:
unsampled = np.zeros(
(n_examples,
(out_rows - 1) * stride + 1,
(out_cols - 1) * stride + 1,
out_channels)
)
unsampled[:, ::stride, ::stride, :] = dLdZ
else:
unsampled = dLdZ
unsampled_padded = np.pad(
unsampled,
((0,0), (k_h - 1 - pad_h, k_h - 1 - pad_h), (k_w - 1 - pad_w, k_w - 1 - pad_w), (0, 0)),
mode='constant'
)
_, in_rows, in_cols, _ = X.shape
dLdZ_view_shape = (n_examples, in_rows, in_cols, k_h, k_w, out_channels)
u_s_n, u_s_r, u_s_c, u_s_ch = unsampled_padded.strides
dLdZ_view_strides = (u_s_n, u_s_r, u_s_c, u_s_r, u_s_c, u_s_ch)
dLdZ_windows = np.lib.stride_tricks.as_strided(
unsampled_padded,
shape=dLdZ_view_shape,
strides=dLdZ_view_strides
)
dLdX = np.einsum("nhwijo,ijok->nhwk", dLdZ_windows, W_transposed)
return dLdX
除了前面所讲解的理论实现,在上面的代码中,还有如下的细节:
- 由于 是输入层,因此 的 channel 为
in_channels
,而权重、偏置项的梯度计算中的 channel 为out_channels
。 - 插值上采样的代码实现中
unsampled
矩阵构造的参数需要一些简单的计算:- 首先是这个矩阵的形状。通过简单的归纳推理可以得出:我们需要将现有矩阵放大
(out - 1) * stride + 1
才能恢复被 stride 缩放前的大小。 - 然后,我们要把原来每个 stride 卷积得到的值放回去,也就是
unsampled[:, ::stride, ::stride, :] = dLdZ
。
- 首先是这个矩阵的形状。通过简单的归纳推理可以得出:我们需要将现有矩阵放大
- 注意插值上采样的 padding 过程。我们需要让卷积核覆盖所有的输入格点,因此 padding 的大小为
kernal_size - 1
。但是原始的输入有自己的 paddingpad_h
、pad_w
,我们需要把它删掉。 - 注意插值上采样的 stride 实现。和权重、偏置这些梯度求解不同,在求解 的梯度时,我们需要遍历所有的输入格点、而不是以 stride 的步长遍历,因此上采样结果的
k_h
和k_w
这两个 shape 的 stride 也设置为s_r
、s_c
而不是s_r * stride
、s_c * stride
。
5. 池化层 Pool2D
forward
方法
池化层的 forward
实现的具体细节和卷积层类似,不过需要注意:由于池化层是对卷积核进行池化的,因此我们需要对我们的窗口矩阵执行一次转置、把卷积核相关的维度放在最后。
def forward(self, X: np.ndarray) -> np.ndarray:
"""Forward pass: use the pooling function to aggregate local information
in the input. This layer typically reduces the spatial dimensionality of
the input while keeping the number of feature maps the same.
As with all other layers, please make sure to cache the appropriate
information for the backward pass.
"""
# implement the forward pass
n_examples, in_rows, in_cols, channels = X.shape
k_h, k_w = self.kernel_shape
pad_h, pad_w = self.pad
stride = self.stride
out_rows = (in_rows + 2 * pad_h - k_h) // stride + 1
out_cols = (in_cols + 2 * pad_w - k_w) // stride + 1
X_padded = np.pad(
X,
((0, 0), (pad_h, pad_h), (pad_w, pad_w), (0, 0)),
mode='constant'
)
view_shape = (n_examples, out_rows, out_cols, k_h, k_w, channels)
s_n, s_r, s_c, s_ch = X_padded.strides
view_strides = (s_n, s_r * stride, s_c * stride, s_r, s_c, s_ch)
X_windows = np.lib.stride_tricks.as_strided(
X_padded,
shape=view_shape,
strides=view_strides
)
# Rearrange to (N, out_H, out_W, C, k_H, k_W)
X_windows_permuted = X_windows.transpose(0, 1, 2, 5, 3, 4)
# Use the predefined pool_fn from __init__
X_pool = self.pool_fn(X_windows_permuted, axis=(4, 5))
# cache any values required for backprop
self.cache = {
"X_padded": X_padded,
"X_windows": X_windows, # (N,H_out,W_out,k_h,k_w,C)
"out_rows": out_rows,
"out_cols": out_cols,
}
# For max pooling, also store argmax information for backward pass
if self.mode == "max":
# Store mask indicating max positions for efficient backward pass
max_mask = (X_windows_permuted == X_pool[:, :, :, :, None, None])
self.cache["max_mask"] = max_mask
return X_pool
backward
方法
池化层的梯度传播如下:
- 平均池化层:梯度被平均分摊给池化窗口内的所有输入:
- 最大池化层:梯度只传递给池化窗口内取得最大值的那个输入:
由于我们缓存了切分好的窗口,我们不需要像之前一样自己构建矩阵,直接使用缓存的窗口进行运算即可。
这里使用了一个优化简单的 for 循环求和的方法:我们可以先通过 np.arange
和 np.meshgrid
构建好索引初始值以及每步的跨度;然后把它 reshape 成矩阵的形状,就可以使用 np.add.at
一步完成相加操作了:
i_idx = np.arange(out_rows) * stride
j_idx = np.arange(out_cols) * stride
di = np.arange(k_h)
dj = np.arange(k_w)
ii, dii = np.meshgrid(i_idx, di, indexing='ij')
jj, djj = np.meshgrid(j_idx, dj, indexing='ij')
row_idx = ii + dii
col_idx = jj + djj
row_idx = row_idx.reshape(1, out_rows, 1, k_h, 1, 1)
col_idx = col_idx.reshape(1, 1, out_cols, 1, k_w, 1)
n_idx = np.arange(n_examples).reshape(n_examples, 1, 1, 1, 1, 1)
c_idx = np.arange(in_channels).reshape(1, 1, 1, 1, 1, in_channels)
np.add.at(dLdX_padded, (n_idx, row_idx, col_idx, c_idx), grad_windows)
总体实现如下:
def backward(self, dLdY: np.ndarray) -> np.ndarray:
"""Backward pass for pooling layer.
"""
# perform a backward pass
X_padded = self.cache["X_padded"]
X_windows = self.cache["X_windows"]
out_rows, out_cols = self.cache["out_rows"], self.cache["out_cols"]
k_h, k_w = self.kernel_shape
stride = self.stride
pad_h, pad_w = self.pad
n_examples, _, _, in_channels = X_padded.shape
dLdX_padded = np.zeros_like(X_padded)
if self.mode == 'max':
# Use precomputed max_mask if available, otherwise compute it
if "max_mask" in self.cache:
max_mask = self.cache["max_mask"] # (N, out_H, out_W, C, k_H, k_W)
# Transpose back to match X_windows shape: (N, out_H, out_W, k_H, k_W, C)
mask = max_mask.transpose(0, 1, 2, 4, 5, 3).astype(np.float32)
else:
# Fallback to computing mask from X_windows
mask_vals = np.max(X_windows, axis=(3, 4), keepdims=True)
mask = (X_windows == mask_vals).astype(np.float32)
# Handle ties by normalizing the mask
mask_sum = np.sum(mask, axis=(3, 4), keepdims=True)
mask /= (mask_sum + 1e-8)
dLdY_exp = dLdY[:, :, :, None, None, :] # (batch_size, out_rows, out_cols, k_h, k_w, channels)
grad_windows = mask * dLdY_exp
elif self.mode == 'average':
avg_factor = 1.0 / (k_h * k_w)
dLdY_exp = dLdY[:, :, :, None, None, :] * avg_factor
grad_windows = np.broadcast_to(dLdY_exp, X_windows.shape)
i_idx = np.arange(out_rows) * stride
j_idx = np.arange(out_cols) * stride
di = np.arange(k_h)
dj = np.arange(k_w)
ii, dii = np.meshgrid(i_idx, di, indexing='ij')
jj, djj = np.meshgrid(j_idx, dj, indexing='ij')
row_idx = ii + dii
col_idx = jj + djj
row_idx = row_idx.reshape(1, out_rows, 1, k_h, 1, 1)
col_idx = col_idx.reshape(1, 1, out_cols, 1, k_w, 1)
n_idx = np.arange(n_examples).reshape(n_examples, 1, 1, 1, 1, 1)
c_idx = np.arange(in_channels).reshape(1, 1, 1, 1, 1, in_channels)
np.add.at(dLdX_padded, (n_idx, row_idx, col_idx, c_idx), grad_windows)
if pad_h == 0 and pad_w == 0:
gradX = dLdX_padded
else:
gradX = dLdX_padded[:, pad_h:-pad_h, pad_w:-pad_w, :]
return gradX
这里有一个实现的细节:在卷积层中,对于输入信息我们只缓存了 X
;而在池化层中,我们需要缓存 X_window
(max_mask
可缓存可不缓存,如果不缓存的话重新计算一下即可)。这是因为在卷积层中,我们只需输入+权重即可重建所有需要的局部卷积窗口。;但是在池化层中,我们必须明确知道窗口内部“谁贡献了输出”,而这些信息没法仅从原始 推回去,必须提前保存。
6. 损失层
损失层实现了交叉熵损失函数的 forward
方法和 backward
方法。
forward
方法
交叉熵函数的前向传播只需要计算一次交叉熵:
def forward(self, Y: np.ndarray, Y_hat: np.ndarray) -> float:
"""Computes the loss for predictions `Y_hat` given one-hot encoded labels
"""
return -np.mean(np.sum(Y * np.log(Y_hat + 1e-15), axis=1))
backward
方法
对 求梯度即可:
def backward(self, Y: np.ndarray, Y_hat: np.ndarray) -> np.ndarray:
"""Backward pass of cross-entropy loss.
"""
return -Y / (Y_hat + 1e-15) / Y_hat.shape[0]
7. 最终的神经网络层
forward
方法
逐个调用每个层的 forward
方法即可。
def forward(self, X: np.ndarray) -> np.ndarray:
"""One forward pass through all the layers of the neural network.
"""
out = X
for layer in self.layers:
out = layer.forward(out)
return out
backward
方法
我们计算出当前的损失、并逐个调用每个层的 backward
方法。
def backward(self, target: np.ndarray, out: np.ndarray) -> float:
"""One backward pass through all the layers of the neural network.
During this phase we calculate the gradients of the loss with respect to
each of the parameters of the entire neural network. Most of the heavy
lifting is done by the `backward` methods of the layers, so this method
should be relatively simple. Also make sure to compute the loss in this
method and NOT in `self.forward`.
"""
loss = self.loss.forward(target, out)
dL = self.loss.backward(target, out)
for layer in reversed(self.layers): # go backwards
dL = layer.backward(dL)
return loss