import cv2 import numpy import numpy as np import torch import torch.nn as nn import torchvision.transforms as transforms from PIL import Image from app.service.utils.oss_client import oss_get_image, oss_upload_image norm_layer = nn.InstanceNorm2d weights = [(0.7, 0.3), (0.5, 0.5), (0.3, 0.7), (0.1, 0.9), (0, 1)] kernel = np.ones((3, 3), np.uint8) class ResidualBlock(nn.Module): def __init__(self, in_features): super(ResidualBlock, self).__init__() conv_block = [nn.ReflectionPad2d(1), nn.Conv2d(in_features, in_features, 3), norm_layer(in_features), nn.ReLU(inplace=True), nn.ReflectionPad2d(1), nn.Conv2d(in_features, in_features, 3), norm_layer(in_features) ] self.conv_block = nn.Sequential(*conv_block) def forward(self, x): return x + self.conv_block(x) class Generator(nn.Module): def __init__(self, input_nc, output_nc, n_residual_blocks=9, sigmoid=True): super(Generator, self).__init__() # Initial convolution block model0 = [nn.ReflectionPad2d(3), nn.Conv2d(input_nc, 64, 7), norm_layer(64), nn.ReLU(inplace=True)] self.model0 = nn.Sequential(*model0) # Downsampling model1 = [] in_features = 64 out_features = in_features * 2 for _ in range(2): model1 += [nn.Conv2d(in_features, out_features, 3, stride=2, padding=1), norm_layer(out_features), nn.ReLU(inplace=True)] in_features = out_features out_features = in_features * 2 self.model1 = nn.Sequential(*model1) model2 = [] # Residual blocks for _ in range(n_residual_blocks): model2 += [ResidualBlock(in_features)] self.model2 = nn.Sequential(*model2) # Upsampling model3 = [] out_features = in_features // 2 for _ in range(2): model3 += [nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1), norm_layer(out_features), nn.ReLU(inplace=True)] in_features = out_features out_features = in_features // 2 self.model3 = nn.Sequential(*model3) # Output layer model4 = [nn.ReflectionPad2d(3), nn.Conv2d(64, output_nc, 7)] if sigmoid: model4 += [nn.Sigmoid()] self.model4 = nn.Sequential(*model4) def forward(self, x, cond=None): out = self.model0(x) out = self.model1(out) out = self.model2(out) out = self.model3(out) out = self.model4(out) return out model1 = Generator(3, 1, 3) model1.load_state_dict(torch.load('service/image2sketch_2/model.pth', map_location=torch.device('cpu'))) model1.eval() def predict(input_img, width): transform = transforms.Compose([transforms.Resize(width, Image.BICUBIC), transforms.ToTensor()]) input_img = transform(input_img) input_img = torch.unsqueeze(input_img, 0) with torch.no_grad(): drawing = model1(input_img)[0].detach() drawing = transforms.ToPILImage()(drawing) # 转ndarray drawing = numpy.array(drawing) return drawing def get_image(image_url): image = oss_get_image(bucket=image_url.split('/')[0], object_name=image_url[image_url.find('/') + 1:], data_type="PIL") image = image.convert('RGB') width = image.size[0] height = image.size[1] return image, width, height def processing_pipeline(image_url, thickness, sketch_bucket, sketch_name): thickness = int(thickness) # 提取sketch image, width, height = get_image(image_url) sketch_image = predict(image, width) # 设定线条粗细 if thickness != 0: dilated = cv2.erode(sketch_image, kernel, iterations=1) # 将原图与膨胀后的图像进行混合,使用不同的权重 sketch_image = cv2.addWeighted(sketch_image, weights[thickness][0], dilated, weights[thickness][1], 0) # 上传minio image_bytes = cv2.imencode(".jpg", sketch_image)[1].tobytes() req = oss_upload_image(bucket=sketch_bucket, object_name=sketch_name, image_bytes=image_bytes) return f"{req.bucket_name}/{req.object_name}" if __name__ == '__main__': result_url = processing_pipeline("aida-users/89/relight_image/d5f0d967-f8e8-424d-98f9-a8ad8313deec-0-89.png", 1, "test", "test123.jpg") print(result_url)