Spaces:
Runtime error
Runtime error
| import math | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from collections import namedtuple | |
| def _upsample_add(x, y): | |
| _, _, H, W = y.size() | |
| return F.interpolate(x, size=(H, W), mode='bilinear', align_corners=True) + y | |
| class EqualLinear(nn.Module): | |
| def __init__( | |
| self, in_dim, out_dim, bias=True, bias_init=0, lr_mul=1, activation=None | |
| ): | |
| super().__init__() | |
| self.weight = nn.Parameter(torch.randn(out_dim, in_dim).div_(lr_mul)) | |
| if bias: | |
| self.bias = nn.Parameter(torch.zeros(out_dim).fill_(bias_init)) | |
| else: | |
| self.bias = None | |
| self.activation = activation | |
| self.scale = (1 / math.sqrt(in_dim)) * lr_mul | |
| self.lr_mul = lr_mul | |
| def forward(self, input): | |
| # if self.activation: | |
| # out = F.linear(input, self.weight * self.scale) | |
| # out = fused_leaky_relu(out, self.bias * self.lr_mul) | |
| # else: | |
| out = F.linear( | |
| input, self.weight * self.scale, bias=self.bias * self.lr_mul | |
| ) | |
| return out | |
| def __repr__(self): | |
| return ( | |
| f'{self.__class__.__name__}({self.weight.shape[1]}, {self.weight.shape[0]})' | |
| ) | |
| class GradualStyleBlock(nn.Module): | |
| def __init__(self, in_c, out_c, spatial): | |
| super(GradualStyleBlock, self).__init__() | |
| self.out_c = out_c | |
| self.spatial = spatial | |
| num_pools = int(np.log2(spatial)) | |
| modules = [] | |
| modules += [nn.Conv2d(in_c, out_c, kernel_size=3, stride=2, padding=1), | |
| nn.LeakyReLU()] | |
| for i in range(num_pools - 1): | |
| modules += [ | |
| nn.Conv2d(out_c, out_c, kernel_size=3, stride=2, padding=1), | |
| nn.LeakyReLU() | |
| ] | |
| self.convs = nn.Sequential(*modules) | |
| self.linear = EqualLinear(out_c, out_c, lr_mul=1) | |
| def forward(self, x): | |
| x = self.convs(x) | |
| x = x.view(-1, self.out_c) | |
| x = self.linear(x) | |
| return x | |
| class Bottleneck(namedtuple('Block', ['in_channel', 'depth', 'stride'])): | |
| """ A named tuple describing a ResNet block. """ | |
| class bottleneck_IR(nn.Module): | |
| def __init__(self, in_channel, depth, stride): | |
| super(bottleneck_IR, self).__init__() | |
| if in_channel == depth: | |
| self.shortcut_layer = nn.MaxPool2d(1, stride) | |
| else: | |
| self.shortcut_layer = nn.Sequential( | |
| nn.Conv2d(in_channel, depth, (1, 1), stride, bias=False), | |
| nn.BatchNorm2d(depth) | |
| ) | |
| self.res_layer = nn.Sequential( | |
| nn.BatchNorm2d(in_channel), | |
| nn.Conv2d(in_channel, depth, (3, 3), (1, 1), 1, bias=False), nn.PReLU(depth), | |
| nn.Conv2d(depth, depth, (3, 3), stride, 1, bias=False), nn.BatchNorm2d(depth) | |
| ) | |
| def forward(self, x): | |
| shortcut = self.shortcut_layer(x) | |
| res = self.res_layer(x) | |
| return res + shortcut | |
| class SEModule(nn.Module): | |
| def __init__(self, channels, reduction): | |
| super(SEModule, self).__init__() | |
| self.avg_pool = nn.AdaptiveAvgPool2d(1) | |
| self.fc1 = nn.Conv2d(channels, channels // reduction, kernel_size=1, padding=0, bias=False) | |
| self.relu = nn.ReLU(inplace=True) | |
| self.fc2 = nn.Conv2d(channels // reduction, channels, kernel_size=1, padding=0, bias=False) | |
| self.sigmoid = nn.Sigmoid() | |
| def forward(self, x): | |
| module_input = x | |
| x = self.avg_pool(x) | |
| x = self.fc1(x) | |
| x = self.relu(x) | |
| x = self.fc2(x) | |
| x = self.sigmoid(x) | |
| return module_input * x | |
| class bottleneck_IR_SE(nn.Module): | |
| def __init__(self, in_channel, depth, stride): | |
| super(bottleneck_IR_SE, self).__init__() | |
| if in_channel == depth: | |
| self.shortcut_layer = nn.MaxPool2d(1, stride) | |
| else: | |
| self.shortcut_layer = nn.Sequential( | |
| nn.Conv2d(in_channel, depth, (1, 1), stride, bias=False), | |
| nn.BatchNorm2d(depth) | |
| ) | |
| self.res_layer = nn.Sequential( | |
| nn.BatchNorm2d(in_channel), | |
| nn.Conv2d(in_channel, depth, (3, 3), (1, 1), 1, bias=False), | |
| nn.PReLU(depth), | |
| nn.Conv2d(depth, depth, (3, 3), stride, 1, bias=False), | |
| nn.BatchNorm2d(depth), | |
| SEModule(depth, 16) | |
| ) | |
| def forward(self, x): | |
| shortcut = self.shortcut_layer(x) | |
| res = self.res_layer(x) | |
| return res + shortcut | |
| def get_block(in_channel, depth, num_units, stride=2): | |
| return [Bottleneck(in_channel, depth, stride)] + [Bottleneck(depth, depth, 1) for i in range(num_units - 1)] | |
| def get_blocks(num_layers): | |
| if num_layers == 50: | |
| blocks = [ | |
| get_block(in_channel=64, depth=64, num_units=3), | |
| get_block(in_channel=64, depth=128, num_units=4), | |
| get_block(in_channel=128, depth=256, num_units=14), | |
| get_block(in_channel=256, depth=512, num_units=3) | |
| ] | |
| elif num_layers == 100: | |
| blocks = [ | |
| get_block(in_channel=64, depth=64, num_units=3), | |
| get_block(in_channel=64, depth=128, num_units=13), | |
| get_block(in_channel=128, depth=256, num_units=30), | |
| get_block(in_channel=256, depth=512, num_units=3) | |
| ] | |
| elif num_layers == 152: | |
| blocks = [ | |
| get_block(in_channel=64, depth=64, num_units=3), | |
| get_block(in_channel=64, depth=128, num_units=8), | |
| get_block(in_channel=128, depth=256, num_units=36), | |
| get_block(in_channel=256, depth=512, num_units=3) | |
| ] | |
| else: | |
| raise ValueError("Invalid number of layers: {}. Must be one of [50, 100, 152]".format(num_layers)) | |
| return blocks | |
| class Encoder4Editing(nn.Module): | |
| def __init__(self, num_layers, mode='ir', stylegan_size=1024, out_res=64): | |
| super(Encoder4Editing, self).__init__() | |
| assert num_layers in [50, 100, 152], 'num_layers should be 50,100, or 152' | |
| assert mode in ['ir', 'ir_se'], 'mode should be ir or ir_se' | |
| blocks = get_blocks(num_layers) | |
| if mode == 'ir': | |
| unit_module = bottleneck_IR | |
| elif mode == 'ir_se': | |
| unit_module = bottleneck_IR_SE | |
| self.out_res = out_res | |
| self.input_layer = nn.Sequential(nn.Conv2d(3, 64, (3, 3), 1, 1, bias=False), | |
| nn.BatchNorm2d(64), | |
| nn.PReLU(64)) | |
| modules = [] | |
| for block in blocks: | |
| for bottleneck in block: | |
| modules.append(unit_module(bottleneck.in_channel, | |
| bottleneck.depth, | |
| bottleneck.stride)) | |
| self.body = nn.Sequential(*modules) | |
| self.styles = nn.ModuleList() | |
| log_size = int(math.log(stylegan_size, 2)) | |
| self.style_count = 2 * log_size - 2 | |
| self.coarse_ind = 3 | |
| self.middle_ind = 7 | |
| for i in range(self.style_count): | |
| if i < self.coarse_ind: | |
| style = GradualStyleBlock(512, 512, 16) | |
| elif i < self.middle_ind: | |
| style = GradualStyleBlock(512, 512, 32) | |
| else: | |
| style = GradualStyleBlock(512, 512, 64) | |
| self.styles.append(style) | |
| self.latlayer1 = nn.Conv2d(256, 512, kernel_size=1, stride=1, padding=0) | |
| self.latlayer2 = nn.Conv2d(128, 512, kernel_size=1, stride=1, padding=0) | |
| def forward(self, x): | |
| x = self.input_layer(x) | |
| modulelist = list(self.body._modules.values()) | |
| for i, l in enumerate(modulelist): | |
| x = l(x) | |
| if i == 2: | |
| c0 = x | |
| if i == 6: | |
| c1 = x | |
| elif i == 20: | |
| c2 = x | |
| elif i == 23: | |
| c3 = x | |
| # Infer main W and duplicate it | |
| w0 = self.styles[0](c3) | |
| w = w0.repeat(self.style_count, 1, 1).permute(1, 0, 2) | |
| features = c3 | |
| for i in range(1, self.style_count): # Infer additional deltas | |
| if i == self.coarse_ind: | |
| p2 = _upsample_add(c3, self.latlayer1(c2)) # FPN's middle features | |
| features = p2 | |
| elif i == self.middle_ind: | |
| p1 = _upsample_add(p2, self.latlayer2(c1)) # FPN's fine features | |
| features = p1 | |
| delta_i = self.styles[i](features) | |
| w[:, i] += delta_i | |
| c = { 128: c0, | |
| 64: c1, | |
| 32: c2, | |
| 16: c3 | |
| }.get(self.out_res) | |
| return w, c | |
| class EqualConv2d(nn.Module): | |
| def __init__( | |
| self, in_channel, out_channel, kernel_size, stride=1, padding=0, bias=True | |
| ): | |
| super().__init__() | |
| self.weight = nn.Parameter( | |
| torch.randn(out_channel, in_channel, kernel_size, kernel_size) | |
| ) | |
| self.scale = 1 / math.sqrt(in_channel * kernel_size ** 2) | |
| self.stride = stride | |
| self.padding = padding | |
| if bias: | |
| self.bias = nn.Parameter(torch.zeros(out_channel)) | |
| else: | |
| self.bias = None | |
| def forward(self, input): | |
| out = F.conv2d( | |
| input, | |
| self.weight * self.scale, | |
| bias=self.bias, | |
| stride=self.stride, | |
| padding=self.padding, | |
| ) | |
| return out | |
| def __repr__(self): | |
| return ( | |
| f'{self.__class__.__name__}({self.weight.shape[1]}, {self.weight.shape[0]},' | |
| f' {self.weight.shape[2]}, stride={self.stride}, padding={self.padding})' | |
| ) | |
| class ScaledLeakyReLU(nn.Module): | |
| def __init__(self, negative_slope=0.2): | |
| super().__init__() | |
| self.negative_slope = negative_slope | |
| def forward(self, input): | |
| out = F.leaky_relu(input, negative_slope=self.negative_slope) | |
| return out * math.sqrt(2) | |
| class HighResFeat(nn.Module): | |
| def __init__(self, in_channels, out_channels): | |
| super(HighResFeat, self).__init__() | |
| self.shared = EqualConv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=True) | |
| self.conv1 = EqualConv2d(out_channels, 1, kernel_size=3, padding=1, bias=True) | |
| self.conv2 = EqualConv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=True) | |
| self.activation = ScaledLeakyReLU(0.2) | |
| self.sigmoid = nn.Sigmoid() | |
| self.skip = None | |
| if in_channels != out_channels: | |
| self.skip = EqualConv2d(in_channels, out_channels, kernel_size=1, padding=0, bias=False) | |
| def forward(self, x): | |
| shared_feats = self.shared(x) | |
| shared_feats = self.activation(shared_feats) | |
| gate = self.conv1(shared_feats) | |
| gate = self.sigmoid(gate) | |
| addition = self.conv2(shared_feats) | |
| addition = self.activation(addition) | |
| if self.skip is not None: | |
| x = self.skip(x) | |
| return gate, addition+x | |
| class E4E_Inversion(nn.Module): | |
| def __init__(self, resolution, num_layers = 50, mode='ir_se', out_res=64): | |
| super(E4E_Inversion, self).__init__() | |
| self.out_res = out_res | |
| resolution = 1024 | |
| self.basic_encoder = Encoder4Editing(num_layers, mode, resolution, self.out_res) | |
| self.latent_avg = None | |
| # ckpt = torch.load(e4e_path, map_location='cpu') | |
| # self.latent_avg = ckpt['latent_avg'].cuda() | |
| # ckpt = {k[k.find(".")+1:]: v for k, v in ckpt['state_dict'].items() if "decoder" not in k} | |
| # self.basic_encoder.load_state_dict(ckpt, strict=True) | |
| def freeze_basic_encoder(self): | |
| self.basic_encoder.eval() #Basic Encoder always in eval mode. | |
| #No backprop to basic Encoder | |
| for param in self.basic_encoder.parameters(): | |
| param.requires_grad = False | |
| def forward(self, reals): | |
| self.freeze_basic_encoder() | |
| w, c = self.basic_encoder(reals) | |
| w = w + self.latent_avg | |
| highres_outs = {f"{self.out_res}x{self.out_res}": c} #{"gates": gates, "additions": additions} | |
| return w, highres_outs | |