143 lines
4.5 KiB
Python
143 lines
4.5 KiB
Python
import cv2
|
|
import numpy
|
|
import numpy as np
|
|
import torch
|
|
import torch.nn as nn
|
|
import torchvision.transforms as transforms
|
|
from PIL import Image
|
|
|
|
from app.service.utils.oss_client import oss_get_image, oss_upload_image
|
|
|
|
norm_layer = nn.InstanceNorm2d
|
|
|
|
weights = [(0.7, 0.3), (0.5, 0.5), (0.3, 0.7), (0.1, 0.9), (0, 1)]
|
|
kernel = np.ones((3, 3), np.uint8)
|
|
|
|
|
|
class ResidualBlock(nn.Module):
|
|
def __init__(self, in_features):
|
|
super(ResidualBlock, self).__init__()
|
|
|
|
conv_block = [nn.ReflectionPad2d(1),
|
|
nn.Conv2d(in_features, in_features, 3),
|
|
norm_layer(in_features),
|
|
nn.ReLU(inplace=True),
|
|
nn.ReflectionPad2d(1),
|
|
nn.Conv2d(in_features, in_features, 3),
|
|
norm_layer(in_features)
|
|
]
|
|
|
|
self.conv_block = nn.Sequential(*conv_block)
|
|
|
|
def forward(self, x):
|
|
return x + self.conv_block(x)
|
|
|
|
|
|
class Generator(nn.Module):
|
|
def __init__(self, input_nc, output_nc, n_residual_blocks=9, sigmoid=True):
|
|
super(Generator, self).__init__()
|
|
|
|
# Initial convolution block
|
|
model0 = [nn.ReflectionPad2d(3),
|
|
nn.Conv2d(input_nc, 64, 7),
|
|
norm_layer(64),
|
|
nn.ReLU(inplace=True)]
|
|
self.model0 = nn.Sequential(*model0)
|
|
|
|
# Downsampling
|
|
model1 = []
|
|
in_features = 64
|
|
out_features = in_features * 2
|
|
for _ in range(2):
|
|
model1 += [nn.Conv2d(in_features, out_features, 3, stride=2, padding=1),
|
|
norm_layer(out_features),
|
|
nn.ReLU(inplace=True)]
|
|
in_features = out_features
|
|
out_features = in_features * 2
|
|
self.model1 = nn.Sequential(*model1)
|
|
|
|
model2 = []
|
|
# Residual blocks
|
|
for _ in range(n_residual_blocks):
|
|
model2 += [ResidualBlock(in_features)]
|
|
self.model2 = nn.Sequential(*model2)
|
|
|
|
# Upsampling
|
|
model3 = []
|
|
out_features = in_features // 2
|
|
for _ in range(2):
|
|
model3 += [nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1),
|
|
norm_layer(out_features),
|
|
nn.ReLU(inplace=True)]
|
|
in_features = out_features
|
|
out_features = in_features // 2
|
|
self.model3 = nn.Sequential(*model3)
|
|
|
|
# Output layer
|
|
model4 = [nn.ReflectionPad2d(3),
|
|
nn.Conv2d(64, output_nc, 7)]
|
|
if sigmoid:
|
|
model4 += [nn.Sigmoid()]
|
|
|
|
self.model4 = nn.Sequential(*model4)
|
|
|
|
def forward(self, x, cond=None):
|
|
out = self.model0(x)
|
|
out = self.model1(out)
|
|
out = self.model2(out)
|
|
out = self.model3(out)
|
|
out = self.model4(out)
|
|
|
|
return out
|
|
|
|
|
|
model1 = Generator(3, 1, 3)
|
|
model1.load_state_dict(torch.load('app/service/image2sketch_2/model.pth', map_location=torch.device('cpu')))
|
|
model1.eval()
|
|
|
|
|
|
def predict(input_img, width):
|
|
transform = transforms.Compose([transforms.Resize(width, Image.BICUBIC), transforms.ToTensor()])
|
|
input_img = transform(input_img)
|
|
input_img = torch.unsqueeze(input_img, 0)
|
|
|
|
with torch.no_grad():
|
|
drawing = model1(input_img)[0].detach()
|
|
|
|
drawing = transforms.ToPILImage()(drawing)
|
|
|
|
# 转ndarray
|
|
drawing = numpy.array(drawing)
|
|
return drawing
|
|
|
|
|
|
def get_image(image_url):
|
|
image = oss_get_image(bucket=image_url.split('/')[0], object_name=image_url[image_url.find('/') + 1:], data_type="PIL")
|
|
image = image.convert('RGB')
|
|
width = image.size[0]
|
|
height = image.size[1]
|
|
return image, width, height
|
|
|
|
|
|
def processing_pipeline(image_url, thickness, sketch_bucket, sketch_name):
|
|
thickness = int(thickness)
|
|
# 提取sketch
|
|
image, width, height = get_image(image_url)
|
|
sketch_image = predict(image, width)
|
|
|
|
# 设定线条粗细
|
|
if thickness != 0:
|
|
dilated = cv2.erode(sketch_image, kernel, iterations=1)
|
|
# 将原图与膨胀后的图像进行混合,使用不同的权重
|
|
sketch_image = cv2.addWeighted(sketch_image, weights[thickness][0], dilated, weights[thickness][1], 0)
|
|
|
|
# 上传minio
|
|
image_bytes = cv2.imencode(".jpg", sketch_image)[1].tobytes()
|
|
req = oss_upload_image(bucket=sketch_bucket, object_name=sketch_name, image_bytes=image_bytes)
|
|
return f"{req.bucket_name}/{req.object_name}"
|
|
|
|
|
|
if __name__ == '__main__':
|
|
result_url = processing_pipeline("aida-users/89/relight_image/d5f0d967-f8e8-424d-98f9-a8ad8313deec-0-89.png", 1, "test", "test123.jpg")
|
|
print(result_url)
|