def forward(self, x, mask=None, temporal=False): """ Args: x: input features with shape of (num_windows*B, N, C) mask: (0/-inf) mask with shape of (num_windows, Wh*Ww, Wh*Ww) or None """ B_, N, C = x.shape qkv = self.qkv(x).reshape(B_, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) q, k, v = qkv[0], qkv[1], qkv[2] # make torchscript happy (cannot use tensor as tuple) q = q * self.scale attn = (q @ k.transpose(-2, -1)) if temporal: relative_pos_bias = self.temporal_position_bias_table[self.t_relative_coords].view(self.num_ttokens, self.num_ttokens, -1).permute(2, 0, 1).contiguous() attn = attn + relative_pos_bias.unsqueeze(0) attn = self.softmax(attn) else: relative_position_bias = self.relative_position_bias_table[self.relative_position_index.view(-1)].view( self.window_size[0] * self.window_size[1], self.window_size[0] * self.window_size[1], -1) # Wh*Ww,Wh*Ww,nH relative_position_bias = relative_position_bias.permute(2, 0, 1).contiguous() # nH, Wh*Ww, Wh*Ww attn = attn + relative_position_bias.unsqueeze(0) if mask is not None: nW = mask.shape[0] attn = attn.view(B_ // nW, nW, self.num_heads, N, N) + mask.unsqueeze(1).unsqueeze(0) attn = attn.view(-1, self.num_heads, N, N) attn = self.softmax(attn) else: attn = self.softmax(attn) attn = self.attn_drop(attn) x = (attn @ v).transpose(1, 2).reshape(B_, N, C) x = self.proj(x) x = self.proj_drop(x) return x
时间: 2023-06-27 18:00:59 浏览: 184
这段代码是一个自注意力机制的前向传播函数,用于计算输入特征的相对于自身的注意力权重,然后将这些权重应用于值向量,最后通过一个投影层输出新的特征表示。其中,输入特征x的形状为(num_windows*B, N, C),其中num_windows表示窗口数,B表示批次大小,N表示序列长度,C表示每个序列元素的特征维度。函数中涉及的主要操作包括:
- 利用多头注意力机制,将输入特征x分别映射到查询向量q,键向量k和值向量v,并计算它们的点积注意力矩阵attn=(q@k^T);
- 如果temporal为True,则说明输入特征是时间序列,并且需要考虑时间维度上的相对位置关系,此时会使用一个临时的位置偏置表(temporal_position_bias_table)来计算注意力矩阵;
- 如果temporal为False,则说明输入特征是二维图像,并且需要考虑空间维度上的相对位置关系,此时会使用一个固定的位置偏置表(relative_position_bias_table)来计算注意力矩阵;
- 如果mask不为None,则说明需要对注意力矩阵进行掩码处理,以避免模型在未来时刻预测当前时刻的信息,此时会将掩码矩阵mask与注意力矩阵attn相加,然后再通过softmax函数归一化;
- 最后将注意力权重与值向量做加权平均,并通过一个投影层得到新的特征表示x。在这个过程中,还会进行一些dropout操作以防止过拟合。
相关问题
class TemporalModel(nn.Module): def __init__( self, in_channels, receptive_field, input_shape, start_out_channels=64, extra_in_channels=0, n_spatial_layers_between_temporal_layers=0, use_pyramid_pooling=True): super().__init__() self.receptive_field = receptive_field n_temporal_layers = receptive_field - 1 h, w = input_shape modules = [] block_in_channels = in_channels block_out_channels = start_out_channels for _ in range(n_temporal_layers): if use_pyramid_pooling: use_pyramid_pooling = True pool_sizes = [(2, h, w)] else: use_pyramid_pooling = False pool_sizes = None temporal = TemporalBlock( block_in_channels, block_out_channels, use_pyramid_pooling=use_pyramid_pooling, pool_sizes=pool_sizes, ) spatial = [ Bottleneck3D(block_out_channels, block_out_channels, kernel_size=(1, 3, 3)) for _ in range(n_spatial_layers_between_temporal_layers) ] temporal_spatial_layers = nn.Sequential(temporal, *spatial) modules.extend(temporal_spatial_layers) block_in_channels = block_out_channels block_out_channels += extra_in_channels self.out_channels = block_in_channels self.model = nn.Sequential(*modules) def forward(self, x): # Reshape input tensor to (batch, C, time, H, W) x = x.permute(0, 2, 1, 3, 4) x = self.model(x) x = x.permute(0, 2, 1, 3, 4).contiguous() return x[:, (self.receptive_field - 1):]是如何一步步前向传播的?
首先,输入张量x的形状为(batch_size, in_channels, sequence_length, height, width)。
然后,我们将x的维度从(sequence_length, batch_size, in_channels, height, width)改变为(batch_size, sequence_length, in_channels, height, width)。
接下来,我们将模型应用于输入,并将结果从(batch_size, sequence_length, out_channels, height, width)改变为(batch_size, out_channels, sequence_length, height, width),使其与输入张量的形状匹配。
最后,我们将输出张量的维度从(batch_size, out_channels, sequence_length, height, width)改变为(batch_size, sequence_length-receptive_field+1, out_channels, height, width)。其中,sequence_length-receptive_field+1是因为我们使用的是膨胀卷积,其输出序列长度会比输入短receptive_field-1,因此我们需要将其调整为对齐原始输入序列的长度。
def flops(self, N, T, temporal=False): # calculate flops for 1 window with token length of N flops = 0 # qkv = self.qkv(x) # flops += N * self.dim * 3 * self.dim if temporal: flops += self.dim * N * T * T * 2 else: # attn = (q @ k.transpose(-2, -1)) flops += self.num_heads * N * (self.dim // self.num_heads) * N * T # x = (attn @ v) flops += self.num_heads * N * N * (self.dim // self.num_heads) * T # x = self.proj(x) # flops += N * self.dim * self.dim return flops
这段代码计算了一个窗口内的FLOPS(浮点操作数),其中N为令牌长度,T为时间步数。如果temporal为True,则计算的是temporal self-attention的FLOPS,否则计算的是普通的self-attention的FLOPS。具体来说,计算FLOPS的过程包括以下几个步骤:
1. 计算qkv,其中q、k、v均为大小为N×dim的矩阵,dim为模型的隐藏层维度。这一部分的FLOPS为N×dim×3×dim。
2. 如果是temporal self-attention,计算attn,其中attn为大小为N×T×T的矩阵,表示每个时间步之间的attention。这一部分的FLOPS为2×dim×N×T×T。
3. 如果是普通的self-attention,计算attn,其中attn为大小为num_heads×N×T×T的矩阵,表示每个头在每个时间步的attention。这一部分的FLOPS为num_heads×N×(dim/num_heads)×N×T。
4. 计算x,其中x为大小为num_heads×N×T×(dim/num_heads)的矩阵,表示每个头在每个时间步的输出。这一部分的FLOPS为num_heads×N×N×(dim/num_heads)×T。
5. 计算proj,其中proj为大小为N×dim的矩阵,表示self-attention的输出。这一部分的FLOPS为N×dim×dim。
最终,将所有步骤的FLOPS相加,即得到一个窗口内的总FLOPS。
阅读全文