Files
AiDA_Python/app/service/image2sketch_2/server.py

143 lines
4.5 KiB
Python
Raw Normal View History

2024-09-30 10:57:12 +08:00
import cv2
import numpy
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image
from app.service.utils.oss_client import oss_get_image, oss_upload_image
norm_layer = nn.InstanceNorm2d
weights = [(0.7, 0.3), (0.5, 0.5), (0.3, 0.7), (0.1, 0.9), (0, 1)]
kernel = np.ones((3, 3), np.uint8)
class ResidualBlock(nn.Module):
def __init__(self, in_features):
super(ResidualBlock, self).__init__()
conv_block = [nn.ReflectionPad2d(1),
nn.Conv2d(in_features, in_features, 3),
norm_layer(in_features),
nn.ReLU(inplace=True),
nn.ReflectionPad2d(1),
nn.Conv2d(in_features, in_features, 3),
norm_layer(in_features)
]
self.conv_block = nn.Sequential(*conv_block)
def forward(self, x):
return x + self.conv_block(x)
class Generator(nn.Module):
def __init__(self, input_nc, output_nc, n_residual_blocks=9, sigmoid=True):
super(Generator, self).__init__()
# Initial convolution block
model0 = [nn.ReflectionPad2d(3),
nn.Conv2d(input_nc, 64, 7),
norm_layer(64),
nn.ReLU(inplace=True)]
self.model0 = nn.Sequential(*model0)
# Downsampling
model1 = []
in_features = 64
out_features = in_features * 2
for _ in range(2):
model1 += [nn.Conv2d(in_features, out_features, 3, stride=2, padding=1),
norm_layer(out_features),
nn.ReLU(inplace=True)]
in_features = out_features
out_features = in_features * 2
self.model1 = nn.Sequential(*model1)
model2 = []
# Residual blocks
for _ in range(n_residual_blocks):
model2 += [ResidualBlock(in_features)]
self.model2 = nn.Sequential(*model2)
# Upsampling
model3 = []
out_features = in_features // 2
for _ in range(2):
model3 += [nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1),
norm_layer(out_features),
nn.ReLU(inplace=True)]
in_features = out_features
out_features = in_features // 2
self.model3 = nn.Sequential(*model3)
# Output layer
model4 = [nn.ReflectionPad2d(3),
nn.Conv2d(64, output_nc, 7)]
if sigmoid:
model4 += [nn.Sigmoid()]
self.model4 = nn.Sequential(*model4)
def forward(self, x, cond=None):
out = self.model0(x)
out = self.model1(out)
out = self.model2(out)
out = self.model3(out)
out = self.model4(out)
return out
model1 = Generator(3, 1, 3)
2024-09-30 11:15:43 +08:00
model1.load_state_dict(torch.load('app/service/image2sketch_2/model.pth', map_location=torch.device('cpu')))
2024-09-30 10:57:12 +08:00
model1.eval()
def predict(input_img, width):
transform = transforms.Compose([transforms.Resize(width, Image.BICUBIC), transforms.ToTensor()])
input_img = transform(input_img)
input_img = torch.unsqueeze(input_img, 0)
with torch.no_grad():
drawing = model1(input_img)[0].detach()
drawing = transforms.ToPILImage()(drawing)
# 转ndarray
drawing = numpy.array(drawing)
return drawing
def get_image(image_url):
image = oss_get_image(bucket=image_url.split('/')[0], object_name=image_url[image_url.find('/') + 1:], data_type="PIL")
image = image.convert('RGB')
width = image.size[0]
height = image.size[1]
return image, width, height
def processing_pipeline(image_url, thickness, sketch_bucket, sketch_name):
thickness = int(thickness)
# 提取sketch
image, width, height = get_image(image_url)
sketch_image = predict(image, width)
# 设定线条粗细
if thickness != 0:
dilated = cv2.erode(sketch_image, kernel, iterations=1)
# 将原图与膨胀后的图像进行混合,使用不同的权重
sketch_image = cv2.addWeighted(sketch_image, weights[thickness][0], dilated, weights[thickness][1], 0)
# 上传minio
image_bytes = cv2.imencode(".jpg", sketch_image)[1].tobytes()
req = oss_upload_image(bucket=sketch_bucket, object_name=sketch_name, image_bytes=image_bytes)
return f"{req.bucket_name}/{req.object_name}"
if __name__ == '__main__':
result_url = processing_pipeline("aida-users/89/relight_image/d5f0d967-f8e8-424d-98f9-a8ad8313deec-0-89.png", 1, "test", "test123.jpg")
print(result_url)