feat generate slogan | to product image | slogan 接口部署

This commit is contained in:
zhouchengrong
2024-05-30 15:01:39 +08:00
parent 1f47d94431
commit 401b76bd95
11 changed files with 280 additions and 174 deletions

View File

@@ -2,6 +2,7 @@ import logging
import cv2
import numpy as np
from cv2 import cvtColor, COLOR_BGR2RGBA
from app.service.utils.generate_uuid import generate_uuid
from ..builder import PIPELINES
from PIL import Image
@@ -45,8 +46,11 @@ class Split(object):
result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
result_front_image_pil = Image.fromarray(cvtColor(result_front_image, COLOR_BGR2RGBA))
front_new_size = (int(result_front_image_pil.width * result["scale"] * result["resize_scale"]), int(result_front_image_pil.height * result["scale"] * result["resize_scale"]))
front_new_size = (int(result_front_image_pil.width * result["scale"] * result["resize_scale"][0]), int(result_front_image_pil.height * result["scale"] * result["resize_scale"][1]))
result_front_image_pil = result_front_image_pil.resize(front_new_size, Image.LANCZOS)
# TODO 多线程外部上传图片到minio
# result['front_mask_image'] = cv2.resize(front_mask, front_new_size)
# result['front_image'] = result_front_image_pil
front_mask = cv2.resize(front_mask, front_new_size)
result['front_image'], result["front_image_url"], result["front_mask_url"] = upload_png_mask(result_front_image_pil, f'{generate_uuid()}', mask=front_mask)
@@ -55,61 +59,19 @@ class Split(object):
result_back_image[back_mask != 0] = rgba_image[back_mask != 0]
result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA))
back_new_size = (int(result_back_image_pil.width * result["scale"] * result["resize_scale"]), int(result_back_image_pil.height * result["scale"] * result["resize_scale"]))
back_new_size = (int(result_back_image_pil.width * result["scale"] * result["resize_scale"][0]), int(result_back_image_pil.height * result["scale"] * result["resize_scale"][1]))
result_back_image_pil = result_back_image_pil.resize(back_new_size, Image.LANCZOS)
# TODO 多线程外部上传图片到minio
# result['back_mask_image'] = cv2.resize(back_mask, back_new_size)
# result['back_image'] = result_back_image_pil
back_mask = cv2.resize(back_mask, back_new_size)
result['back_image'], result["back_image_url"], result["back_mask_url"] = upload_png_mask(result_back_image_pil, f'{generate_uuid()}', mask=back_mask)
else:
result['back_image'] = None
result["back_image_url"] = None
result["back_mask_url"] = None
result['back_mask_image'] = None
return result
except Exception as e:
logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")
# @ RunTime
# def __call__(self, result):
# try:
# if 'mask' not in result.keys():
# raise KeyError(f'Cannot find mask in result dict, please check ContourDetection is included in process pipelines.')
# if 'seg_result' not in result.keys(): # 没过seg模型
# result['front_mask'] = result['mask'].copy()
# result['back_mask'] = np.zeros_like(result['mask'])
# else:
# temp_front = result['seg_result'] == 1
# result['front_mask'] = (result['mask'] * (temp_front + 0).astype(np.uint8))
# temp_back = result['seg_result'] == 2
# result['back_mask'] = (result['mask'] * (temp_back + 0).astype(np.uint8))
#
# if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms'):
# if len(result['front_mask'].shape) > 2:
# front_mask = result['front_mask'][0]
# else:
# front_mask = result['front_mask']
#
# rgba_image = rgb_to_rgba((result['final_image'].shape[0], result['final_image'].shape[1]), result['final_image'], result['mask'])
# result_front_image = np.zeros_like(rgba_image)
# result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
#
# result_front_image_pil = Image.fromarray(cvtColor(result_front_image, COLOR_BGR2RGBA))
# front_new_size = (int(result_front_image_pil.width * result["scale"] * result["resize_scale"]), int(result_front_image_pil.height * result["scale"] * result["resize_scale"]))
# result_front_image_pil = result_front_image_pil.resize(front_new_size, Image.LANCZOS)
# front_mask = cv2.resize(front_mask, front_new_size)
# result['front_image'], result["front_image_url"], result["front_mask_url"] = upload_png_mask(result_front_image_pil, f'{generate_uuid()}', mask=front_mask)
#
# if result["name"] in ('blouse', 'dress', 'outwear', 'tops'):
# result_back_image = np.zeros_like(rgba_image)
# result_back_image[result['back_mask'] != 0] = rgba_image[result['back_mask'] != 0]
#
# result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA))
# back_new_size = (int(result_back_image_pil.width * result["scale"] * result["resize_scale"]), int(result_back_image_pil.height * result["scale"] * result["resize_scale"]))
# result_back_image_pil = result_back_image_pil.resize(back_new_size, Image.LANCZOS)
# back_mask = cv2.resize(result['back_mask'], back_new_size)
# result['back_image'], result["back_image_url"], result["back_mask_url"] = upload_png_mask(result_back_image_pil, f'{generate_uuid()}', mask=back_mask)
# else:
# result['back_image'] = None
# result["back_image_url"] = None
# result["back_mask_url"] = None
# return result
# except Exception as e:
# logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")

View File

@@ -17,14 +17,15 @@ import numpy as np
from PIL import Image
from minio import Minio
from app.core.config import *
from app.service.utils.decorator import RunTime
from app.service.utils.generate_uuid import generate_uuid
# minio_client = Minio(
# f"{MINIO_IP}:{MINIO_PORT}",
# access_key=MINIO_ACCESS,
# secret_key=MINIO_SECRET,
# secure=MINIO_SECURE)
minio_client = Minio(
MINIO_URL,
access_key=MINIO_ACCESS,
secret_key=MINIO_SECRET,
secure=MINIO_SECURE)
s3 = boto3.client(
's3',
@@ -130,19 +131,19 @@ def synthesis(data, size):
result_image.save(output, format='PNG')
data = output.getvalue()
# image_data = io.BytesIO()
# result_image.save(image_data, format='PNG')
# image_data.seek(0)
# image_bytes = image_data.read()
# return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
image_data = io.BytesIO()
result_image.save(image_data, format='PNG')
image_data.seek(0)
image_bytes = image_data.read()
return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
object_name = f'result_{generate_uuid()}.png'
response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
object_url = f"aida-results/{object_name}"
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
return object_url
else:
return ""
# object_name = f'result_{generate_uuid()}.png'
# response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
# object_url = f"aida-results/{object_name}"
# if response['ResponseMetadata']['HTTPStatusCode'] == 200:
# return object_url
# else:
# return ""
except Exception as e:
logging.warning(f"synthesis runtime exception : {e}")

View File

@@ -33,128 +33,129 @@ s3 = boto3.client(
)
@RunTime
def upload_png_mask(front_image, object_name, mask=None):
start_time = time.time()
mask_url = None
if mask is not None:
# 反转掩模
mask_inverted = cv2.bitwise_not(mask)
# 将掩模转换为 RGBA 格式
rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
# 将图像数据保存到内存中的 BytesIO 对象中
image_bytes = io.BytesIO()
image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
image_bytes.seek(0)
try:
key = f"mask/mask_{object_name}.png"
mask_url = f"{AIDA_CLOTHING}/{key}"
s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=image_bytes, ContentType='image/png')
except Exception as e:
print(f'上传到 S3 失败: {e}')
with io.BytesIO() as output:
front_image.save(output, format='PNG')
data = output.getvalue()
# 创建一个 S3 客户端
try:
key = f"image/image_{object_name}.png"
image_url = f"{AIDA_CLOTHING}/{key}"
s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=data, ContentType='image/png')
return front_image, image_url, mask_url
except Exception as e:
print(f'上传到 S3 失败: {e}')
@RunTime
def upload_layer_image(image, object_name):
with io.BytesIO() as output:
image.save(output, format='PNG')
data = output.getvalue()
# 创建一个 S3 客户端
try:
key = f"image/image_{object_name}.png"
image_url = f"{AIDA_CLOTHING}/{key}"
s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=data, ContentType='image/png')
return image_url
except Exception as e:
print(f'上传到 S3 失败: {e}')
@RunTime
def upload_mask_image(mask, object_name):
# 反转掩模
mask_inverted = cv2.bitwise_not(mask)
# 将掩模转换为 RGBA 格式
rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
# 将图像数据保存到内存中的 BytesIO 对象中
image_bytes = io.BytesIO()
image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
image_bytes.seek(0)
try:
key = f"mask/mask_{object_name}.png"
mask_url = f"{AIDA_CLOTHING}/{key}"
s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=image_bytes, ContentType='image/png')
return mask_url
except Exception as e:
print(f'上传到 S3 失败: {e}')
"""minio 上传"""
# @RunTime
# def upload_png_mask(front_image, object_name, mask=None):
# start_time = time.time()
# mask_url = None
# if mask is not None:
# # 反转掩模
# mask_inverted = cv2.bitwise_not(mask)
# # 将掩模转换为 RGBA 格式
# rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
# rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
# # 将图像数据保存到内存中的 BytesIO 对象中
# image_bytes = io.BytesIO()
# image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
# image_bytes.seek(0)
# try:
# key = f"mask/mask_{object_name}.png"
# mask_url = f"{AIDA_CLOTHING}/{key}"
# s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=image_bytes, ContentType='image/png')
# except Exception as e:
# print(f'上传到 S3 失败: {e}')
# with io.BytesIO() as output:
# front_image.save(output, format='PNG')
# data = output.getvalue()
# # 创建一个 S3 客户端
# try:
# mask_url = None
# if mask is not None:
# mask_inverted = cv2.bitwise_not(mask)
# # 将掩模的3通道转换为4通道白色部分不透明黑色部分透明
# rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
# rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
# image_bytes = io.BytesIO()
# image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
#
# image_bytes.seek(0)
# mask_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'mask/mask_{object_name}.png', image_bytes, len(image_bytes.getvalue()), content_type='image/png').object_name}"
#
# image_data = io.BytesIO()
# front_image.save(image_data, format='PNG')
# image_data.seek(0)
# image_bytes = image_data.read()
# image_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'image/image_{object_name}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
# # print(f"upload_png_mask {object_name} = {time.time() - start_time}")
# key = f"image/image_{object_name}.png"
# image_url = f"{AIDA_CLOTHING}/{key}"
# s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=data, ContentType='image/png')
# return front_image, image_url, mask_url
# except Exception as e:
# logging.warning(f"upload_png_mask runtime exception : {e}")
# print(f'上传到 S3 失败: {e}')
#
#
# @RunTime
# def upload_layer_image(image, object_name):
# with io.BytesIO() as output:
# image.save(output, format='PNG')
# data = output.getvalue()
# # 创建一个 S3 客户端
# try:
# image_data = io.BytesIO()
# image.save(image_data, format='PNG')
# image_data.seek(0)
# image_bytes = image_data.read()
# image_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'image/image_{object_name}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
# key = f"image/image_{object_name}.png"
# image_url = f"{AIDA_CLOTHING}/{key}"
# s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=data, ContentType='image/png')
# return image_url
# except Exception as e:
# logging.warning(f"upload_png_mask runtime exception : {e}")
# print(f'上传到 S3 失败: {e}')
#
#
# @RunTime
# def upload_mask_image(mask, object_name):
# # 反转掩模
# mask_inverted = cv2.bitwise_not(mask)
# # 将掩模转换为 RGBA 格式
# rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
# rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
# # 将图像数据保存到内存中的 BytesIO 对象中
# image_bytes = io.BytesIO()
# image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
# image_bytes.seek(0)
# try:
# mask_inverted = cv2.bitwise_not(mask)
# # 将掩模的3通道转换为4通道白色部分不透明黑色部分透明
# rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
# rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
# image_bytes = io.BytesIO()
# image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
#
# image_bytes.seek(0)
# mask_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'mask/mask_{object_name}.png', image_bytes, len(image_bytes.getvalue()), content_type='image/png').object_name}"
# key = f"mask/mask_{object_name}.png"
# mask_url = f"{AIDA_CLOTHING}/{key}"
# s3.put_object(Bucket=AIDA_CLOTHING, Key=key, Body=image_bytes, ContentType='image/png')
# return mask_url
# except Exception as e:
# logging.warning(f"upload_png_mask runtime exception : {e}")
# print(f'上传到 S3 失败: {e}')
"""minio 上传"""
@RunTime
def upload_png_mask(front_image, object_name, mask=None):
start_time = time.time()
try:
mask_url = None
if mask is not None:
mask_inverted = cv2.bitwise_not(mask)
# 将掩模的3通道转换为4通道白色部分不透明黑色部分透明
rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
image_bytes = io.BytesIO()
image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
image_bytes.seek(0)
mask_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'mask/mask_{object_name}.png', image_bytes, len(image_bytes.getvalue()), content_type='image/png').object_name}"
image_data = io.BytesIO()
front_image.save(image_data, format='PNG')
image_data.seek(0)
image_bytes = image_data.read()
image_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'image/image_{object_name}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
# print(f"upload_png_mask {object_name} = {time.time() - start_time}")
return front_image, image_url, mask_url
except Exception as e:
logging.warning(f"upload_png_mask runtime exception : {e}")
@RunTime
def upload_layer_image(image, object_name):
try:
image_data = io.BytesIO()
image.save(image_data, format='PNG')
image_data.seek(0)
image_bytes = image_data.read()
image_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'image/image_{object_name}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
return image_url
except Exception as e:
logging.warning(f"upload_png_mask runtime exception : {e}")
@RunTime
def upload_mask_image(mask, object_name):
try:
mask_inverted = cv2.bitwise_not(mask)
# 将掩模的3通道转换为4通道白色部分不透明黑色部分透明
rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
image_bytes = io.BytesIO()
image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
image_bytes.seek(0)
mask_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'mask/mask_{object_name}.png', image_bytes, len(image_bytes.getvalue()), content_type='image/png').object_name}"
return mask_url
except Exception as e:
logging.warning(f"upload_png_mask runtime exception : {e}")