feat design 功能迁移

This commit is contained in:
zhouchengrong
2024-05-28 15:22:11 +08:00
parent ec2438f97f
commit a9dcd444c8
35 changed files with 3378 additions and 3 deletions

View File

@@ -0,0 +1,16 @@
from .builder import ITEMS, build_item
from .clothing import Clothing # 4.0 sec
from .body import Body
from .top import Top, Blouse, Outwear, Dress
from .bottom import Bottom, Trousers, Skirt
from .shoes import Shoes
from .bag import Bag
from .accessories import Hairstyle, Earring
__all__ = [
'ITEMS', 'build_item',
'Clothing', 'Body',
'Top', 'Blouse', 'Outwear', 'Dress',
'Bottom', 'Trousers', 'Skirt',
'Shoes', 'Bag', 'Hairstyle', 'Earring'
]

View File

@@ -0,0 +1,59 @@
from .builder import ITEMS
from .clothing import Clothing
@ITEMS.register_module()
class Hairstyle(Clothing):
def __init__(self, **kwargs):
pipeline = [
dict(type='LoadImageFromFile', path=kwargs['path']),
dict(type='KeypointDetection'),
dict(type='ContourDetection'),
dict(type='Painting'),
dict(type='Scaling'),
dict(type='Split'),
# dict(type='ImageShow', key=['image', 'mask', 'pattern_image']),
]
kwargs.update(pipeline=pipeline)
super(Hairstyle, self).__init__(**kwargs)
@staticmethod
def calculate_start_point(keypoint_type, scale, clothes_point, body_point):
"""
align up
Args:
keypoint_type: string, "head_point"
scale: float
clothes_point: dict{'left': [x1, y1, z1], 'right': [x2, y2, z2]}
body_point: dict, containing keypoint data of body figure
Returns:
start_point: tuple (x', y')
x' = y_body - y1 * scale
y' = x_body - x1 * scale
"""
side_indicator = f'{keypoint_type}_up'
# clothes_point = {k: tuple(map(lambda x: int(scale * x), v[0: 2])) for k, v in clothes_point.items()}
# logging.info(clothes_point[side_indicator])
start_point = (
int(body_point[side_indicator][1] - int(clothes_point[side_indicator].split("_")[1] * scale)),
int(body_point[side_indicator][0] - int(clothes_point[side_indicator].split("_")[0] * scale))
)
return start_point
@ITEMS.register_module()
class Earring(Clothing):
def __init__(self, **kwargs):
pipeline = [
dict(type='LoadImageFromFile', path=kwargs['path']),
dict(type='KeypointDetection'),
dict(type='ContourDetection'),
dict(type='Painting'),
dict(type='Scaling'),
dict(type='Split'),
# dict(type='ImageShow', key=['image', 'mask', 'pattern_image']),
]
kwargs.update(pipeline=pipeline)
super(Earring, self).__init__(**kwargs)

View File

@@ -0,0 +1,44 @@
from .builder import ITEMS
from .clothing import Clothing
import random
@ITEMS.register_module()
class Bag(Clothing):
def __init__(self, **kwargs):
pipeline = [
dict(type='LoadImageFromFile', path=kwargs['path'], color=kwargs['color']),
dict(type='KeypointDetection'),
dict(type='ContourDetection'),
dict(type='Painting'),
dict(type='Scaling'),
dict(type='Split'),
# dict(type='ImageShow', key=['image', 'mask', 'pattern_image']),
]
kwargs.update(pipeline=pipeline)
super(Bag, self).__init__(**kwargs)
@staticmethod
def calculate_start_point(keypoint_type, scale, clothes_point, body_point):
"""
align left
Args:
keypoint_type: string, "hand_point"
scale: float
clothes_point: dict{'left': [x1, y1, z1], 'right': [x2, y2, z2]}
body_point: dict, containing keypoint data of body figure
Returns:
start_point: tuple (y', x')
x' = y_body - y1 * scale
y' = x_body - x1 * scale
"""
location = random.choice(seq=['left', 'right'])
if location == 'left':
side_indicator = f'{keypoint_type}_left'
else:
side_indicator = f'{keypoint_type}_right'
# clothes_point = {k: tuple(map(lambda x: int(scale * x), v[0: 2])) for k, v in clothes_point.items()}
start_point = (body_point[side_indicator][1] - int(int(clothes_point[keypoint_type].split("_")[1]) * scale),
body_point[side_indicator][0] - int(int(clothes_point[keypoint_type].split("_")[0]) * scale))
return start_point

View File

@@ -0,0 +1,35 @@
import cv2
from .builder import ITEMS
from .pipelines import Compose
@ITEMS.register_module()
class Body(object):
def __init__(self, **kwargs):
pipeline = [
dict(type='LoadBodyImageFromFile', body_path=kwargs['body_path']),
# dict(type='ImageShow', key=['body_image', "body_mask"])
]
self.pipeline = Compose(pipeline)
self.result = dict()
def process(self):
self.pipeline(self.result)
pass
def organize(self, layer):
body_layer = dict(priority=0,
name=type(self).__name__.lower(),
image=self.result['body_image'],
image_url=self.result['image_url'],
mask_image=None,
mask_url=None,
sacle=1,
# mask=self.result['body_mask'],
position=(0, 0))
layer.insert(body_layer)
@staticmethod
def show(img):
cv2.imshow('', img)
cv2.waitKey(0)

View File

@@ -0,0 +1,38 @@
from .builder import ITEMS
from .clothing import Clothing
@ITEMS.register_module()
class Bottom(Clothing):
def __init__(self, pipeline, **kwargs):
if pipeline is None:
pipeline = [
dict(type='LoadImageFromFile', path=kwargs['path'], color=kwargs['color'], print_dict=kwargs['print']),
dict(type='KeypointDetection'),
dict(type='ContourDetection'),
dict(type='Painting', painting_flag=True),
dict(type='PrintPainting', print_flag=True),
dict(type='Scaling'),
dict(type='Split'),
# dict(type='ImageShow', key=['image', 'mask', 'pattern_image', 'print_image']),
]
kwargs.update(pipeline=pipeline)
super(Bottom, self).__init__(**kwargs)
@ITEMS.register_module()
class Trousers(Bottom):
def __init__(self, pipeline=None, **kwargs):
super(Trousers, self).__init__(pipeline, **kwargs)
@ITEMS.register_module()
class Skirt(Bottom):
def __init__(self, pipeline=None, **kwargs):
super(Skirt, self).__init__(pipeline, **kwargs)
@ITEMS.register_module()
class Bottoms(Bottom):
def __init__(self, pipeline=None, **kwargs):
super(Bottoms, self).__init__(pipeline, **kwargs)

View File

@@ -0,0 +1,9 @@
from mmcv.utils import Registry, build_from_cfg
ITEMS = Registry('item')
PIPELINES = Registry('pipeline')
def build_item(cfg, default_args=None):
item = build_from_cfg(cfg, ITEMS, default_args)
return item

View File

@@ -0,0 +1,96 @@
import cv2
from app.core.config import PRIORITY_DICT
from .builder import ITEMS
from .pipelines import Compose
@ITEMS.register_module()
class Clothing(object):
def __init__(self, pipeline, **kwargs):
self.pipeline = Compose(pipeline)
self.result = dict(name=type(self).__name__.lower(), **kwargs)
def process(self):
self.pipeline(self.result)
def apply_scale(self, img):
scale = self.result['scale']
height, width = img.shape[0: 2]
if len(img.shape) > 2:
height, width = img.shape[0: 2]
scaled_img = cv2.resize(img, (int(width * scale), int(height * scale)), interpolation=cv2.INTER_AREA)
return scaled_img
def organize(self, layer):
start_point = self.calculate_start_point(self.result['keypoint'], self.result['scale'], self.result['clothes_keypoint'], self.result['body_point_test'], self.result["offset"], self.result["resize_scale"])
front_layer = dict(priority=self.result.get("priority", None) if self.result.get("layer_order", False) else PRIORITY_DICT.get(f'{type(self).__name__.lower()}_front', None),
name=f'{type(self).__name__.lower()}_front',
image=self.result["front_image"],
# mask_image=self.result['front_mask_image'],
image_url=self.result['front_image_url'],
mask_url=self.result['front_mask_url'],
sacle=self.result['scale'],
clothes_keypoint=self.result['clothes_keypoint'],
position=start_point,
resize_scale=self.result["resize_scale"],
mask=cv2.resize(self.result['mask'], self.result["front_image"].size),
gradient_string=self.result['gradient_string'] if 'gradient_string' in self.result.keys() else ""
)
layer.insert(front_layer)
back_layer = dict(priority=-self.result.get("priority", 0) if self.result.get("layer_order", False) else PRIORITY_DICT.get(f'{type(self).__name__.lower()}_back', None),
name=f'{type(self).__name__.lower()}_back',
image=self.result["back_image"],
# mask_image=self.result['back_mask_image'],
image_url=self.result['back_image_url'],
mask_url=self.result['back_mask_url'],
sacle=self.result['scale'],
clothes_keypoint=self.result['clothes_keypoint'],
position=start_point,
resize_scale=self.result["resize_scale"],
mask=cv2.resize(self.result['mask'], self.result["front_image"].size),
gradient_string=self.result['gradient_string'] if 'gradient_string' in self.result.keys() else ""
)
layer.insert(back_layer)
@staticmethod
def calculate_start_point(keypoint_type, scale, clothes_point, body_point, offset, resize_scale):
"""
Align left
Args:
keypoint_type: string, "waistband" | "shoulder" | "ear_point"
scale: float
clothes_point: dict{'left': [x1, y1, z1], 'right': [x2, y2, z2]}
body_point: dict, containing keypoint data of body figure
Returns:
start_point: tuple (x', y')
x' = y_body - y1 * scale + offset
y' = x_body - x1 * scale + offset
"""
side_indicator = f'{keypoint_type}_left'
# if keypoint_type == "ear_point":
# start_point = (body_point[side_indicator][1] - int(int(clothes_point[side_indicator].split("_")[1]) * scale),
# body_point[side_indicator][0] - int(int(clothes_point[side_indicator].split("_")[0]) * scale))
# else:
# start_point = (
# int(body_point[side_indicator][1] + offset[1] - int(clothes_point[side_indicator].split("_")[0]) * scale), # y
# int(body_point[side_indicator][0] + offset[0] - int(clothes_point[side_indicator].split("_")[1]) * scale) # x
# )
# milvus_DB_keypoint_cache:
start_point = (
int(body_point[side_indicator][1] + offset[1] - int(clothes_point[side_indicator][0]) * scale), # y
int(body_point[side_indicator][0] + offset[0] - int(clothes_point[side_indicator][1]) * scale) # x
)
# start_point = (
# int(body_point[side_indicator][1] + offset[1] - int(clothes_point[side_indicator].split("_")[0]) * scale), # y
# int(body_point[side_indicator][0] + offset[0] - int(clothes_point[side_indicator].split("_")[1]) * scale) # x
# )
return start_point

View File

@@ -0,0 +1,19 @@
from .compose import Compose
from .loading import LoadImageFromFile, LoadBodyImageFromFile, ImageShow
from .keypoints import KeypointDetection
from .segmentation import Segmentation
from .painting import Painting, PrintPainting
from .scale import Scaling
from .contour_detection import ContourDetection
from .split import Split
__all__ = [
'Compose',
'LoadImageFromFile', 'LoadBodyImageFromFile', 'ImageShow',
'KeypointDetection',
'Segmentation',
'Painting', 'PrintPainting',
'Scaling',
'ContourDetection',
'split',
]

View File

@@ -0,0 +1,36 @@
import collections
from mmcv.utils import build_from_cfg
from ..builder import PIPELINES
@PIPELINES.register_module()
class Compose(object):
def __init__(self, transforms):
assert isinstance(transforms, collections.abc.Sequence)
self.transforms = []
for transform in transforms:
if isinstance(transform, dict):
transform = build_from_cfg(transform, PIPELINES)
self.transforms.append(transform)
elif callable(transform):
self.transforms.append(transform)
else:
raise TypeError('transform must be callable or a dict')
def __call__(self, data):
"""Call function to apply transforms sequentially.
Args:
data (dict): A result dict contains the data to transform.
Returns:
dict: Transformed data.
"""
for t in self.transforms:
data = t(data)
if data is None:
return None
return data

View File

@@ -0,0 +1,59 @@
import logging
from ..builder import PIPELINES
import cv2
import numpy as np
@PIPELINES.register_module()
class ContourDetection(object):
def __init__(self):
# logging.info("ContourDetection run ")
pass
#@ RunTime
def __call__(self, result):
# shoe diff
if result['name'] == 'shoes':
Contour = self.get_contours(result['image'])
Mask = np.zeros(result['image'].shape[:2], np.uint8)
for i in range(2):
Max_contour = Contour[i]
Epsilon = 0.001 * cv2.arcLength(Max_contour, True)
Approx = cv2.approxPolyDP(Max_contour, Epsilon, True)
cv2.drawContours(Mask, [Approx], -1, 255, -1)
if result['pre_mask'] is None:
result['mask'] = Mask
else:
result['mask'] = cv2.bitwise_and(Mask, result['pre_mask'])
else:
Contour = self.get_contours(result['image'])
Mask = np.zeros(result['image'].shape[:2], np.uint8)
if len(Contour):
Max_contour = Contour[0]
Epsilon = 0.001 * cv2.arcLength(Max_contour, True)
Approx = cv2.approxPolyDP(Max_contour, Epsilon, True)
cv2.drawContours(Mask, [Approx], -1, 255, -1)
else:
Mask = np.ones(result['image'].shape[:2], np.uint8) * 255
# TODO 修复部分图片出现透明的情况 下版本上线
# img2gray = cv2.cvtColor(result['image'], cv2.COLOR_BGR2GRAY)
# ret, Mask = cv2.threshold(img2gray, 126, 255, cv2.THRESH_BINARY)
# Mask = cv2.bitwise_not(Mask)
if result['pre_mask'] is None:
result['mask'] = Mask
else:
result['mask'] = cv2.bitwise_and(Mask, result['pre_mask'])
return result
@staticmethod
def get_contours(image):
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
Edge = cv2.Canny(gray, 10, 150)
kernel = np.ones((5, 5), np.uint8)
Edge = cv2.dilate(Edge, kernel=kernel, iterations=1)
Edge = cv2.erode(Edge, kernel=kernel, iterations=1)
Contour, _ = cv2.findContours(Edge, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
Contour = sorted(Contour, key=cv2.contourArea, reverse=True)
return Contour

View File

@@ -0,0 +1,148 @@
import logging
import time
import numpy as np
from pymilvus import MilvusClient
from app.core.config import *
from ..builder import PIPELINES
from ...utils.design_ensemble import get_keypoint_result
@PIPELINES.register_module()
class KeypointDetection(object):
"""
path here: abstract path
"""
def __init__(self):
self.client = MilvusClient(
uri="http://10.1.1.240:19530",
token="root:Milvus",
db_name=MILVUS_ALIAS
)
def __del__(self):
# start_time = time.time()
self.client.close()
# print(f"client close time : {time.time() - start_time}")
# @ RunTime
def __call__(self, result):
# logging.info("KeypointDetection run ")
if result['name'] in ['blouse', 'skirt', 'dress', 'outwear', 'trousers', 'tops', 'bottoms']: # 查询是否有数据 且类别相同 相同则直接读 不同则推理后更新
# result['clothes_keypoint'] = self.infer_keypoint_result(result)
site = 'up' if result['name'] in ['blouse', 'outwear', 'dress', 'tops'] else 'down'
# keypoint_cache = search_keypoint_cache(result["image_id"], site)
# 取消向量查询 直接过模型推理
keypoint_cache = self.keypoint_cache(result, site)
# keypoint_cache = False
if keypoint_cache is False:
keypoint_infer_result, site = self.infer_keypoint_result(result)
result['clothes_keypoint'] = self.save_keypoint_cache(result["image_id"], keypoint_infer_result, site)
else:
result['clothes_keypoint'] = keypoint_cache
return result
@staticmethod
def infer_keypoint_result(result):
site = 'up' if result['name'] in ['blouse', 'outwear', 'dress', 'tops'] else 'down'
start_time = time.time()
keypoint_infer_result = get_keypoint_result(result["image"], site) # 推理结果
# logging.info(f"infer keypoint time : {time.time() - start_time}")
return keypoint_infer_result, site
@staticmethod
# @ RunTime
def save_keypoint_cache(keypoint_id, cache, site, KEYPOINT_RESULT_TABLE_FIELD_SET=None):
if site == "down":
zeros = np.zeros(20, dtype=int)
result = np.concatenate([zeros, cache.flatten()])
else:
zeros = np.zeros(4, dtype=int)
result = np.concatenate([cache.flatten(), zeros])
# 取消向量保存 直接拿结果
data = [
{"keypoint_id": keypoint_id,
"keypoint_site": site,
"keypoint_vector": result.tolist()
}
]
client = MilvusClient(
uri="http://10.1.1.240:19530",
token="root:Milvus",
db_name=MILVUS_ALIAS
)
try:
start_time = time.time()
res = client.upsert(
collection_name=MILVUS_TABLE_KEYPOINT,
data=data,
)
# logging.info(f"save keypoint time : {time.time() - start_time}")
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
except Exception as e:
logging.info(f"save keypoint cache milvus error : {e}")
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
finally:
client.close()
@staticmethod
def update_keypoint_cache(keypoint_id, infer_result, search_result, site):
if site == "up":
# 需要的是up 即推理出来的是up 那么查询的就是down
result = np.concatenate([infer_result.flatten(), search_result[-4:]])
else:
# 需要的是down 即推理出来的是down 那么查询的就是up
result = np.concatenate([search_result[:20], infer_result.flatten()])
data = [
{"keypoint_id": keypoint_id,
"keypoint_site": "all",
"keypoint_vector": result.tolist()
}
]
client = MilvusClient(
uri="http://10.1.1.240:19530",
token="root:Milvus",
db_name=MILVUS_ALIAS
)
try:
# connections.connect(alias=MILVUS_ALIAS, host=MILVUS_DB_HOST, port=MILVUS_PORT)
start_time = time.time()
# collection = Collection(MILVUS_TABLE_KEYPOINT) # Get an existing collection.
# mr = collection.upsert(data)
client.upsert(
collection_name=MILVUS_TABLE_KEYPOINT,
data=data
)
# logging.info(f"save keypoint time : {time.time() - start_time}")
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
except Exception as e:
logging.info(f"save keypoint cache milvus error : {e}")
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
# @ RunTime
def keypoint_cache(self, result, site):
try:
keypoint_id = result['image_id']
res = self.client.query(
collection_name=MILVUS_TABLE_KEYPOINT,
# ids=[keypoint_id],
filter=f"keypoint_id == {keypoint_id}",
output_fields=['keypoint_vector', 'keypoint_site']
)
if len(res) == 0:
# 没有结果 直接推理拿结果 并保存
keypoint_infer_result, site = self.infer_keypoint_result(result)
return self.save_keypoint_cache(result['image_id'], keypoint_infer_result, site)
elif res[0]["keypoint_site"] == "all" or res[0]["keypoint_site"] == site:
# 需要的类型和查询的类型一致或者查询的类型为all 则直接返回查询的结果
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, np.array(res[0]['keypoint_vector']).astype(int).reshape(12, 2).tolist()))
elif res[0]["keypoint_site"] != site:
# 需要的类型和查询到的不一致则更新类型为all
keypoint_infer_result, site = self.infer_keypoint_result(result)
return self.update_keypoint_cache(result["image_id"], keypoint_infer_result, res[0]['keypoint_vector'], site)
except Exception as e:
logging.info(f"search keypoint cache milvus error {e}")
return False

View File

@@ -0,0 +1,143 @@
import io
import logging
import time
import cv2
import numpy as np
from PIL import Image
from minio import Minio
from app.core.config import *
from ..builder import PIPELINES
@PIPELINES.register_module()
class LoadImageFromFile(object):
def __init__(self, path, color=None, print_dict=None):
self.path = path
self.color = color
self.print_dict = print_dict
self.minio_client = Minio(
f"{MINIO_URL}",
access_key=MINIO_ACCESS,
secret_key=MINIO_SECRET,
secure=MINIO_SECURE)
def __call__(self, result):
result['image'], result['pre_mask'] = self.read_image(self.path)
result['gray'] = cv2.cvtColor(result['image'], cv2.COLOR_BGR2GRAY)
result['keypoint'] = self.get_keypoint(result['name'])
result['path'] = self.path
result['img_shape'] = result['image'].shape
result['ori_shape'] = result['image'].shape
result['color'] = self.color if self.color is not None else None
result['print_dict'] = self.print_dict
return result
@staticmethod
def get_keypoint(name):
if name == 'blouse' or name == 'outwear' or name == 'dress' or name == 'tops':
keypoint = 'shoulder'
elif name == 'trousers' or name == 'skirt' or name == 'bottoms':
keypoint = 'waistband'
elif name == 'bag':
keypoint = 'hand_point'
elif name == 'shoes':
keypoint = 'toe'
elif name == 'hairstyle':
keypoint = 'head_point'
elif name == 'earring':
keypoint = 'ear_point'
else:
raise KeyError(f"{name} does not belong to item category list: blouse, outwear, dress, trousers, skirt, "
f"bag, shoes, hairstyle, earring.")
return keypoint
def read_image(self, image_path):
image_mask = None
file = self.minio_client.get_object(image_path.split("/", 1)[0], image_path.split("/", 1)[1]).data
image = cv2.imdecode(np.frombuffer(file, np.uint8), 1)
if len(image.shape) == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
if image.shape[2] == 4: # 如果是四通道 mask
image_mask = image[:, :, 3]
image = image[:, :, :3]
return image, image_mask
@PIPELINES.register_module()
class LoadBodyImageFromFile(object):
def __init__(self, body_path):
self.body_path = body_path
self.minioClient = Minio(
f"{MINIO_URL}",
access_key=MINIO_ACCESS,
secret_key=MINIO_SECRET,
secure=MINIO_SECURE)
# response = self.minioClient.get_object("aida-mannequins", "model_1693218345.2714431.png")
# @ RunTime
def __call__(self, result):
result["image_url"] = result['body_path'] = self.body_path
result["name"] = "mannequin"
if not result['image_url'].lower().endswith(".png"):
logging.info(1)
bucket = self.body_path.split("/", 1)[0]
object_name = self.body_path.split("/", 1)[1]
new_object_name = f'{object_name[:object_name.rfind(".")]}.png'
image = self.minioClient.get_object(bucket, object_name)
image = Image.open(io.BytesIO(image.data))
image = image.convert("RGBA")
data = image.getdata()
#
new_data = []
for item in data:
if item[0] >= 230 and item[1] >= 230 and item[2] >= 230:
new_data.append((255, 255, 255, 0))
else:
new_data.append(item)
image.putdata(new_data)
image_data = io.BytesIO()
image.save(image_data, format='PNG')
image_data.seek(0)
image_bytes = image_data.read()
image_path = f"{bucket}/{self.minioClient.put_object(bucket, new_object_name, io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
self.body_path = image_path
result["image_url"] = result['body_path'] = self.body_path
response = self.minioClient.get_object(self.body_path.split("/", 1)[0], self.body_path.split("/", 1)[1])
# put_image_time = time.time()
result['body_image'] = Image.open(io.BytesIO(response.read()))
# logging.info(f"Image.open time is : {time.time() - put_image_time}")
return result
@PIPELINES.register_module()
class ImageShow(object):
def __init__(self, key):
self.key = key
# @ RunTime
def __call__(self, result):
import matplotlib.pyplot as plt
if isinstance(self.key, list):
for key in self.key:
plt.imshow(result[key])
plt.title(key)
plt.show()
elif isinstance(self.key, str):
img = self._resize_img(result[self.key])
cv2.imshow(self.key, img)
cv2.waitKey(0)
else:
raise TypeError(f'key should be string but got type {type(self.key)}.')
return result
@staticmethod
def _resize_img(img):
shape = img.shape
if shape[0] > 400 or shape[1] > 400:
ratio = min(400 / shape[0], 400 / shape[1])
img = cv2.resize(img, (int(ratio * shape[1]), int(ratio * shape[0])))
return img

View File

@@ -0,0 +1,498 @@
import random
from io import BytesIO
import boto3
import cv2
import numpy as np
from PIL import Image
from ..builder import PIPELINES
# minio_client = Minio(
# f"{MINIO_IP}:{MINIO_PORT}",
# access_key=MINIO_ACCESS,
# secret_key=MINIO_SECRET,
# secure=MINIO_SECURE)
s3 = boto3.client(
's3',
aws_access_key_id="AKIAVD3OJIMF6UJFLSHZ",
aws_secret_access_key="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",
region_name="ap-east-1"
)
@PIPELINES.register_module()
class Painting(object):
def __init__(self, painting_flag=True):
self.painting_flag = painting_flag
# @ RunTime
def __call__(self, result):
if result['name'] not in ['hairstyle', 'earring'] and self.painting_flag and result['color'] != 'none':
dim_image_h, dim_image_w = result['image'].shape[0:2]
if "gradient" in result.keys() and result['gradient'] != "":
bucket_name = result['gradient'].split('/')[0]
object_name = result['gradient'][result['gradient'].find('/') + 1:]
pattern = self.get_gradient(bucket_name=bucket_name, object_name=object_name)
resize_pattern = cv2.resize(pattern, (dim_image_w, dim_image_h), interpolation=cv2.INTER_AREA)
else:
pattern = self.get_pattern(result['color'])
resize_pattern = cv2.resize(pattern, (dim_image_w, dim_image_h), interpolation=cv2.INTER_AREA)
closed_mo = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2)
get_image_fir = resize_pattern * (closed_mo / 255) * (gray_mo / 255)
result['pattern_image'] = get_image_fir.astype(np.uint8)
result['final_image'] = result['pattern_image']
canvas = np.full_like(result['final_image'], 255)
temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2)
tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8)
temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8)
result['single_image'] = cv2.add(tmp1, tmp2)
result['alpha'] = 100 / 255.0
else:
closed_mo = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
get_image_fir = result['image'] * (closed_mo / 255)
result['pattern_image'] = get_image_fir.astype(np.uint8)
result['final_image'] = result['pattern_image']
return result
@staticmethod
def get_gradient(bucket_name, object_name):
# image_data = minio_client.get_object(bucket_name, object_name)
image_data = s3.get_object(Bucket=bucket_name, Key=object_name)['Body']
# 从数据流中读取图像
image_bytes = image_data.read()
# 将图像数据转换为numpy数组
image_array = np.asarray(bytearray(image_bytes), dtype=np.uint8)
# 使用OpenCV解码图像数组
image = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
return image
@staticmethod
def crop_image(image, image_size_h, image_size_w):
x_offset = np.random.randint(low=0, high=int(image_size_h / 5) - 6)
y_offset = np.random.randint(low=0, high=int(image_size_w / 5) - 6)
image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w, :]
return image
@staticmethod
def get_pattern(single_color):
if single_color is None:
raise False
R, G, B = single_color.split(' ')
pattern = np.zeros([1, 1, 3], np.uint8)
pattern[0, 0, 0] = int(B)
pattern[0, 0, 1] = int(G)
pattern[0, 0, 2] = int(R)
return pattern
@staticmethod
def gradient(image, angle_degrees, start_color, end_color):
height, width = image.shape[0], image.shape[1]
# 创建一个空白的图像
gradient_image = np.zeros((height, width, 3), dtype=np.uint8)
# 将角度限制在 0 到 360 度之间
angle_degrees = np.clip(angle_degrees, 0, 360)
# 将角度转换为弧度
angle_radians = np.radians(angle_degrees)
# 计算渐变的方向
dx = np.cos(angle_radians)
dy = np.sin(angle_radians)
# 创建网格
x_grid, y_grid = np.meshgrid(np.arange(width), np.arange(height))
# 计算每个像素在渐变方向上的位置
distance_along_gradient = (x_grid * dx + y_grid * dy) / np.sqrt(dx ** 2 + dy ** 2)
# 计算渐变的权重
weight = np.clip(distance_along_gradient / max(width, height), 0, 1)
# 计算渐变的颜色
gradient_image[:, :, 0] = (1 - weight) * start_color[0] + weight * end_color[0]
gradient_image[:, :, 1] = (1 - weight) * start_color[1] + weight * end_color[1]
gradient_image[:, :, 2] = (1 - weight) * start_color[2] + weight * end_color[2]
return gradient_image
@PIPELINES.register_module()
class PrintPainting(object):
def __init__(self, print_flag=True):
self.print_flag = print_flag
# @ RunTime
def __call__(self, result):
if "location" not in result['print'].keys():
result['print']["location"] = [[0, 0]]
elif result['print']["location"] == [] or result['print']["location"] is None:
result['print']["location"] = [[0, 0]]
if result['print']['IfSingle']:
if len(result['print']['print_path_list']) == 0:
raise ValueError('When there is no printing, ifsingle must be false')
print_background = np.zeros((result['pattern_image'].shape[0], result['pattern_image'].shape[1], 3), dtype=np.uint8)
mask_background = np.zeros((result['pattern_image'].shape[0], result['pattern_image'].shape[1], 3), dtype=np.uint8)
# print_background = np.full((result['pattern_image'].shape[0], result['pattern_image'].shape[1], 3), 255, dtype=np.uint8)
for i in range(len(result['print']['print_path_list'])):
image, image_mode = self.read_image(result['print']['print_path_list'][i])
if image_mode == "RGBA":
new_size = (int(image.width * result['print']['print_scale_list'][i]), int(image.height * result['print']['print_scale_list'][i]))
mask = image.split()[3]
resized_source = image.resize(new_size)
resized_source_mask = mask.resize(new_size)
rotated_resized_source = resized_source.rotate(result['print']['print_angle_list'][i])
rotated_resized_source_mask = resized_source_mask.rotate(result['print']['print_angle_list'][i])
source_image_pil = Image.fromarray(print_background)
source_image_pil_mask = Image.fromarray(mask_background)
source_image_pil.paste(rotated_resized_source, (int(result['print']['location'][i][0]), int(result['print']['location'][i][1])), rotated_resized_source)
source_image_pil_mask.paste(rotated_resized_source_mask, (int(result['print']['location'][i][0]), int(result['print']['location'][i][1])), rotated_resized_source_mask)
print_background = np.array(source_image_pil)
mask_background = np.array(source_image_pil_mask)
print(1)
else:
mask = self.get_mask_inv(image)
mask = np.expand_dims(mask, axis=2)
mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
mask = cv2.bitwise_not(mask)
# 旋转后的坐标需要重新算
rotate_mask, _ = self.img_rotate(mask, result['print']['print_angle_list'][i], result['print']['print_scale_list'][i])
rotate_image, rotated_new_size = self.img_rotate(image, result['print']['print_angle_list'][i], result['print']['print_scale_list'][i])
# x, y = int(result['print']['location'][i][0] - rotated_new_size[0] - (rotate_mask.shape[0] - image.shape[0]) / 2), int(result['print']['location'][i][1] - rotated_new_size[1] - (rotate_mask.shape[1] - image.shape[1]) / 2)
x, y = int(result['print']['location'][i][0] - rotated_new_size[0]), int(result['print']['location'][i][1] - rotated_new_size[1])
image_x = print_background.shape[1]
image_y = print_background.shape[0]
print_x = rotate_image.shape[1]
print_y = rotate_image.shape[0]
# 有bug
# if x + print_x > image_x:
# rotate_image = rotate_image[:, :x + print_x - image_x]
# rotate_mask = rotate_mask[:, :x + print_x - image_x]
# #
# if y + print_y > image_y:
# rotate_image = rotate_image[:y + print_y - image_y]
# rotate_mask = rotate_mask[:y + print_y - image_y]
# 不能是并行
# 当前第一轮的if 108以及115是判断有没有过下界和右界。第二轮的是判断左上有没有超出。 如果这个样子的话先裁了右边再左移region就会有问题
# 先挪 再判断 最后裁剪
# 如果print旋转了 或者 print贴边了 则需要判断 判断左界和上界是否小于0
if x <= 0:
rotate_image = rotate_image[:, -x:]
rotate_mask = rotate_mask[:, -x:]
start_x = x = 0
else:
start_x = x
if y <= 0:
rotate_image = rotate_image[-y:, :]
rotate_mask = rotate_mask[-y:, :]
start_y = y = 0
else:
start_y = y
# ------------------
# 如果print-size大于image-size 则需要裁剪print
if x + print_x > image_x:
rotate_image = rotate_image[:, :image_x - x]
rotate_mask = rotate_mask[:, :image_x - x]
if y + print_y > image_y:
rotate_image = rotate_image[:image_y - y, :]
rotate_mask = rotate_mask[:image_y - y, :]
# mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = cv2.bitwise_xor(mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]], rotate_mask)
# print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = cv2.add(print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]], rotate_image)
# mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = rotate_mask
# print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image
mask_background = self.stack_prin(mask_background, result['pattern_image'], rotate_mask, start_y, y, start_x, x)
print_background = self.stack_prin(print_background, result['pattern_image'], rotate_image, start_y, y, start_x, x)
# gray_image = cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY)
# print_background = cv2.bitwise_and(print_background, print_background, mask=gray_image)
print_mask = cv2.bitwise_and(result['mask'], cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY))
img_fg = cv2.bitwise_or(print_background, print_background, mask=print_mask)
img_bg = cv2.bitwise_and(result['pattern_image'], result['pattern_image'], mask=cv2.bitwise_not(print_mask))
mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2)
gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2)
img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8)
result['final_image'] = cv2.add(img_bg, img_fg)
canvas = np.full_like(result['final_image'], 255)
temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2)
tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8)
temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8)
result['single_image'] = cv2.add(tmp1, tmp2)
return result
else:
painting_dict = {}
painting_dict['dim_image_h'], painting_dict['dim_image_w'] = result['pattern_image'].shape[0:2]
# no print
if len(result['print_dict']['print_path_list']) == 0 or not self.print_flag:
result['print_image'] = result['pattern_image']
# print
else:
painting_dict = self.painting_collection(painting_dict, result, print_trigger=True)
result['print_image'] = self.printpaint(result, painting_dict, print_=True)
result['final_image'] = result['print_image']
canvas = np.full_like(result['final_image'], 255)
temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2)
tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8)
temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8)
result['single_image'] = cv2.add(tmp1, tmp2)
return result
@staticmethod
def stack_prin(print_background, pattern_image, rotate_image, start_y, y, start_x, x):
temp_print = np.zeros((pattern_image.shape[0], pattern_image.shape[1], 3), dtype=np.uint8)
temp_print[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image
img2gray = cv2.cvtColor(print_background, cv2.COLOR_BGR2GRAY)
ret, mask_ = cv2.threshold(img2gray, 1, 255, cv2.THRESH_BINARY)
mask_inv = cv2.bitwise_not(mask_)
img1_bg = cv2.bitwise_and(print_background, print_background, mask=mask_)
img2_fg = cv2.bitwise_and(temp_print, temp_print, mask=mask_inv)
print_background = img1_bg + img2_fg
return print_background
def painting_collection(self, painting_dict, result, print_trigger=False):
if print_trigger:
print_ = self.get_print(result['print_dict'])
painting_dict['Trigger'] = not print_['IfSingle']
painting_dict['location'] = print_['location'] if 'location' in print_.keys() else None
single_mask_inv_print = self.get_mask_inv(print_['image'])
dim_max = max(painting_dict['dim_image_h'], painting_dict['dim_image_w'])
dim_pattern = (int(dim_max * print_['scale'] / 5), int(dim_max * print_['scale'] / 5))
if not print_['IfSingle']:
self.random_seed = random.randint(0, 1000)
painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'], trigger=True)
painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'], trigger=True)
else:
painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'])
painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'])
painting_dict['dim_print_h'], painting_dict['dim_print_w'] = dim_pattern
return painting_dict
def tile_image(self, pattern, dim, scale, dim_image_h, dim_image_w, location, trigger=False):
if not trigger:
tile = cv2.resize(pattern, dim, interpolation=cv2.INTER_AREA)
else:
resize_pattern = cv2.resize(pattern, dim, interpolation=cv2.INTER_AREA)
if len(pattern.shape) == 2:
tile = np.tile(resize_pattern, (int((5 + 1) / scale) + 4, int((5 + 1) / scale) + 4))
if len(pattern.shape) == 3:
tile = np.tile(resize_pattern, (int((5 + 1) / scale) + 4, int((5 + 1) / scale) + 4, 1))
tile = self.crop_image(tile, dim_image_h, dim_image_w, location, resize_pattern.shape)
return tile
def get_mask_inv(self, print_):
if print_[0][0][0] == 255 and print_[0][0][1] == 255 and print_[0][0][2] == 255:
bg_color = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)[0][0]
print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)
bg_l, bg_a, bg_b = bg_color[0], bg_color[1], bg_color[2]
bg_L_high, bg_L_low = self.get_low_high_lab(bg_l, L=True)
bg_a_high, bg_a_low = self.get_low_high_lab(bg_a)
bg_b_high, bg_b_low = self.get_low_high_lab(bg_b)
lower = np.array([bg_L_low, bg_a_low, bg_b_low])
upper = np.array([bg_L_high, bg_a_high, bg_b_high])
mask_inv = cv2.inRange(print_tile, lower, upper)
return mask_inv
else:
# bg_color = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)[0][0]
# print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)
# bg_l, bg_a, bg_b = bg_color[0], bg_color[1], bg_color[2]
# bg_L_high, bg_L_low = self.get_low_high_lab(bg_l, L=True)
# bg_a_high, bg_a_low = self.get_low_high_lab(bg_a)
# bg_b_high, bg_b_low = self.get_low_high_lab(bg_b)
# lower = np.array([bg_L_low, bg_a_low, bg_b_low])
# upper = np.array([bg_L_high, bg_a_high, bg_b_high])
# print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)
# mask_inv = cv2.cvtColor(print_tile, cv2.COLOR_BGR2GRAY)
# mask_inv = cv2.cvtColor(print_, cv2.COLOR_BGR2GRAY)
mask_inv = np.zeros(print_.shape[:2], dtype=np.uint8)
return mask_inv
@staticmethod
def printpaint(result, painting_dict, print_=False):
if print_ and painting_dict['Trigger']:
print_mask = cv2.bitwise_and(result['mask'], cv2.bitwise_not(painting_dict['mask_inv_print']))
img_fg = cv2.bitwise_and(painting_dict['tile_print'], painting_dict['tile_print'], mask=print_mask)
else:
print_mask = result['mask']
img_fg = result['final_image']
if print_ and not painting_dict['Trigger']:
try:
index_ = len(painting_dict['location'])
except:
assert f'there must be parameter of location if choose IfSingle'
for i in range(index_):
start_h, start_w = int(painting_dict['location'][i][1]), int(painting_dict['location'][i][0])
length_h = min(start_h + painting_dict['dim_print_h'], img_fg.shape[0])
length_w = min(start_w + painting_dict['dim_print_w'], img_fg.shape[1])
change_region = img_fg[start_h: length_h, start_w: length_w, :]
# problem in change_mask
change_mask = print_mask[start_h: length_h, start_w: length_w]
# get real part into change mask
_, change_mask = cv2.threshold(change_mask, 220, 255, cv2.THRESH_BINARY)
mask = cv2.bitwise_not(painting_dict['mask_inv_print'])
img_fg[start_h:start_h + painting_dict['dim_print_h'], start_w:start_w + painting_dict['dim_print_w'], :] = change_region
clothes_mask_print = cv2.bitwise_not(print_mask)
img_bg = cv2.bitwise_and(result['pattern_image'], result['pattern_image'], mask=clothes_mask_print)
mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2)
gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2)
img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8)
print_image = cv2.add(img_bg, img_fg)
return print_image
@staticmethod
def get_print(print_dict):
if not 'print_scale_list' in print_dict.keys() or print_dict['print_scale_list'][0] < 0.3:
print_dict['scale'] = 0.3
else:
print_dict['scale'] = print_dict['print_scale_list'][0]
if not 'IfSingle' in print_dict.keys():
print_dict['IfSingle'] = False
# data = minio_client.get_object(print_dict['print_path_list'][0].split("/", 1)[0], print_dict['print_path_list'][0].split("/", 1)[1])
data = s3.get_object(Bucket=print_dict['print_path_list'][0].split("/", 1)[0], Key=print_dict['print_path_list'][0].split("/", 1)[1])['Body']
data_bytes = BytesIO(data.read())
image = Image.open(data_bytes)
image_mode = image.mode
# 判断图片格式如果是RGBA 则贴在一张纯白图片上 防止透明转黑
if image_mode == "RGBA":
new_background = Image.new('RGB', image.size, (255, 255, 255))
new_background.paste(image, mask=image.split()[3])
image = new_background
print_dict['image'] = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR)
# file = minio_client.get_object(print_dict['print_path_list'][0].split("/", 1)[0], print_dict['print_path_list'][0].split("/", 1)[1]).data
# print_dict['image'] = cv2.imdecode(np.fromstring(file, np.uint8), 1)
# image = cv2.imdecode(np.frombuffer(file, np.uint8), 1)
# return image
return print_dict
def crop_image(self, image, image_size_h, image_size_w, location, print_shape):
print_w = print_shape[1]
print_h = print_shape[0]
random.seed(self.random_seed)
# logging.info(f'overall print location : {location}')
# x_offset = random.randint(0, image.shape[0] - image_size_h)
# y_offset = random.randint(0, image.shape[1] - image_size_w)
# 1.拿到偏移量后和resize后的print宽高取余 得到真正偏移量
x_offset = print_w - int(location[0][1] % print_w)
y_offset = print_w - int(location[0][0] % print_h)
# y_offset = int(location[0][0])
# x_offset = int(location[0][1])
if len(image.shape) == 2:
image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w]
elif len(image.shape) == 3:
image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w, :]
return image
@staticmethod
def get_low_high_lab(Lab_value, L=False):
if L:
high = Lab_value + 30 if Lab_value + 30 < 255 else 255
low = Lab_value - 30 if Lab_value - 30 > 0 else 0
else:
high = Lab_value + 30 if Lab_value + 30 < 255 else 255
low = Lab_value - 30 if Lab_value - 30 > 0 else 0
return high, low
@staticmethod
def img_rotate(image, angel, scale):
"""顺时针旋转图像任意角度
Args:
image (np.array): [原始图像]
angel (float): [逆时针旋转的角度]
Returns:
[array]: [旋转后的图像]
"""
h, w = image.shape[:2]
center = (w // 2, h // 2)
# if type(angel) is not int:
# angel = 0
M = cv2.getRotationMatrix2D(center, -angel, scale)
# 调整旋转后的图像长宽
rotated_h = int((w * np.abs(M[0, 1]) + (h * np.abs(M[0, 0]))))
rotated_w = int((h * np.abs(M[0, 1]) + (w * np.abs(M[0, 0]))))
M[0, 2] += (rotated_w - w) // 2
M[1, 2] += (rotated_h - h) // 2
# 旋转图像
rotated_img = cv2.warpAffine(image, M, (rotated_w, rotated_h))
return rotated_img, ((rotated_img.shape[1] - image.shape[1] * scale) // 2, (rotated_img.shape[0] - image.shape[0] * scale) // 2)
# return rotated_img, (0, 0)
@staticmethod
def read_image(image_url):
# data = minio_client.get_object(image_url.split("/", 1)[0], image_url.split("/", 1)[1])
data = s3.get_object(Bucket=image_url.split("/", 1)[0], Key=image_url.split("/", 1)[1])['Body']
data_bytes = BytesIO(data.read())
image = Image.open(data_bytes)
image_mode = image.mode
# 判断图片格式如果是RGBA 则贴在一张纯白图片上 防止透明转黑
if image_mode == "RGBA":
# new_background = Image.new('RGB', image.size, (255, 255, 255))
# new_background.paste(image, mask=image.split()[3])
# image = new_background
return image, image_mode
image = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR)
return image, image_mode
# @staticmethod
# def read_image(image_url):
# response = requests.get(image_url)
# image_data = np.frombuffer(response.content, np.uint8)
#
# # 解码图像
# image = cv2.imdecode(image_data, 3)
# return image

View File

@@ -0,0 +1,54 @@
from ..builder import PIPELINES
import math
import cv2
@PIPELINES.register_module()
class Scaling(object):
def __init__(self):
pass
# @ RunTime
def __call__(self, result):
if result['keypoint'] in ['waistband', 'shoulder', 'head_point']:
# milvus_db_keypoint_cache
distance_clo = math.sqrt(
(int(result['clothes_keypoint'][result['keypoint'] + '_left'][0]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'][0])) ** 2
+
(int(result['clothes_keypoint'][result['keypoint'] + '_left'][1]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'][1])) ** 2)
distance_bdy = math.sqrt((int(result['body_point_test'][result['keypoint'] + '_left'][0]) - int(result['body_point_test'][result['keypoint'] + '_right'][0])) ** 2 + 1)
# distance_clo = math.sqrt(
# (int(result['clothes_keypoint'][result['keypoint'] + '_left'].split("_")[0]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'].split("_")[0])) ** 2
# +
# (int(result['clothes_keypoint'][result['keypoint'] + '_left'].split("_")[1]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'].split("_")[1])) ** 2)
#
# distance_bdy = math.sqrt((int(result['body_point_test'][result['keypoint'] + '_left'][0]) - int(result['body_point_test'][result['keypoint'] + '_right'][0])) ** 2 + 1)
if distance_clo == 0:
result['scale'] = 10
else:
result['scale'] = distance_bdy / distance_clo
elif result['keypoint'] == 'toe':
distance_bdy = math.sqrt(
(int(result['body_point_test']['foot_length'][0]) - int(result['body_point_test']['foot_length'][2])) ** 2
+
(int(result['body_point_test']['foot_length'][1]) - int(result['body_point_test']['foot_length'][3])) ** 2
)
Blur = cv2.GaussianBlur(result['gray'], (3, 3), 0)
Edge = cv2.Canny(Blur, 10, 200)
Edge = cv2.dilate(Edge, None)
Edge = cv2.erode(Edge, None)
Contour, _ = cv2.findContours(Edge, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
Contours = sorted(Contour, key=cv2.contourArea, reverse=True)
Max_contour = Contours[0]
x, y, w, h = cv2.boundingRect(Max_contour)
width = w
distance_clo = width
result['scale'] = distance_bdy / distance_clo
elif result['keypoint'] == 'hand_point':
result['scale'] = result['scale_bag']
elif result['keypoint'] == 'ear_point':
result['scale'] = result['scale_earrings']
return result

View File

@@ -0,0 +1,14 @@
from ..builder import PIPELINES
from ...utils.design_ensemble import get_seg_result
@PIPELINES.register_module()
class Segmentation(object):
def __init__(self, device='cpu', show=False, debug=None):
self.show = show
self.device = device
self.debug = debug
def __call__(self, result):
result['seg_result'] = get_seg_result(result["image_id"], result['image'])
return result

View File

@@ -0,0 +1,115 @@
import logging
import cv2
import numpy as np
from cv2 import cvtColor, COLOR_BGR2RGBA
from app.service.utils.generate_uuid import generate_uuid
from ..builder import PIPELINES
from PIL import Image
from ...utils.conversion_image import rgb_to_rgba
from ...utils.upload_image import upload_png_mask
@PIPELINES.register_module()
class Split(object):
"""
Split image into front and back layer according to the segmentation result
"""
# KNet
def __call__(self, result):
try:
if 'mask' not in result.keys():
raise KeyError(f'Cannot find mask in result dict, please check ContourDetection is included in process pipelines.')
if 'seg_result' not in result.keys(): # 没过seg模型
result['front_mask'] = result['mask'].copy()
result['back_mask'] = np.zeros_like(result['mask'])
else:
temp_front = result['seg_result'] == 1
result['front_mask'] = (result['mask'] * (temp_front + 0).astype(np.uint8))
temp_back = result['seg_result'] == 2
result['back_mask'] = (result['mask'] * (temp_back + 0).astype(np.uint8))
if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms'):
if len(result['front_mask'].shape) > 2:
front_mask = result['front_mask'][0]
else:
front_mask = result['front_mask']
if len(result['back_mask'].shape) > 2:
back_mask = result['back_mask'][0]
else:
back_mask = result['back_mask']
rgba_image = rgb_to_rgba((result['final_image'].shape[0], result['final_image'].shape[1]), result['final_image'], result['mask'])
result_front_image = np.zeros_like(rgba_image)
result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
result_front_image_pil = Image.fromarray(cvtColor(result_front_image, COLOR_BGR2RGBA))
front_new_size = (int(result_front_image_pil.width * result["scale"] * result["resize_scale"]), int(result_front_image_pil.height * result["scale"] * result["resize_scale"]))
result_front_image_pil = result_front_image_pil.resize(front_new_size, Image.LANCZOS)
front_mask = cv2.resize(front_mask, front_new_size)
result['front_image'], result["front_image_url"], result["front_mask_url"] = upload_png_mask(result_front_image_pil, f'{generate_uuid()}', mask=front_mask)
if result["name"] in ('blouse', 'dress', 'outwear', 'tops'):
result_back_image = np.zeros_like(rgba_image)
result_back_image[back_mask != 0] = rgba_image[back_mask != 0]
result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA))
back_new_size = (int(result_back_image_pil.width * result["scale"] * result["resize_scale"]), int(result_back_image_pil.height * result["scale"] * result["resize_scale"]))
result_back_image_pil = result_back_image_pil.resize(back_new_size, Image.LANCZOS)
back_mask = cv2.resize(back_mask, back_new_size)
result['back_image'], result["back_image_url"], result["back_mask_url"] = upload_png_mask(result_back_image_pil, f'{generate_uuid()}', mask=back_mask)
else:
result['back_image'] = None
result["back_image_url"] = None
result["back_mask_url"] = None
return result
except Exception as e:
logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")
# @ RunTime
# def __call__(self, result):
# try:
# if 'mask' not in result.keys():
# raise KeyError(f'Cannot find mask in result dict, please check ContourDetection is included in process pipelines.')
# if 'seg_result' not in result.keys(): # 没过seg模型
# result['front_mask'] = result['mask'].copy()
# result['back_mask'] = np.zeros_like(result['mask'])
# else:
# temp_front = result['seg_result'] == 1
# result['front_mask'] = (result['mask'] * (temp_front + 0).astype(np.uint8))
# temp_back = result['seg_result'] == 2
# result['back_mask'] = (result['mask'] * (temp_back + 0).astype(np.uint8))
#
# if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms'):
# if len(result['front_mask'].shape) > 2:
# front_mask = result['front_mask'][0]
# else:
# front_mask = result['front_mask']
#
# rgba_image = rgb_to_rgba((result['final_image'].shape[0], result['final_image'].shape[1]), result['final_image'], result['mask'])
# result_front_image = np.zeros_like(rgba_image)
# result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
#
# result_front_image_pil = Image.fromarray(cvtColor(result_front_image, COLOR_BGR2RGBA))
# front_new_size = (int(result_front_image_pil.width * result["scale"] * result["resize_scale"]), int(result_front_image_pil.height * result["scale"] * result["resize_scale"]))
# result_front_image_pil = result_front_image_pil.resize(front_new_size, Image.LANCZOS)
# front_mask = cv2.resize(front_mask, front_new_size)
# result['front_image'], result["front_image_url"], result["front_mask_url"] = upload_png_mask(result_front_image_pil, f'{generate_uuid()}', mask=front_mask)
#
# if result["name"] in ('blouse', 'dress', 'outwear', 'tops'):
# result_back_image = np.zeros_like(rgba_image)
# result_back_image[result['back_mask'] != 0] = rgba_image[result['back_mask'] != 0]
#
# result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA))
# back_new_size = (int(result_back_image_pil.width * result["scale"] * result["resize_scale"]), int(result_back_image_pil.height * result["scale"] * result["resize_scale"]))
# result_back_image_pil = result_back_image_pil.resize(back_new_size, Image.LANCZOS)
# back_mask = cv2.resize(result['back_mask'], back_new_size)
# result['back_image'], result["back_image_url"], result["back_mask_url"] = upload_png_mask(result_back_image_pil, f'{generate_uuid()}', mask=back_mask)
# else:
# result['back_image'] = None
# result["back_image_url"] = None
# result["back_mask_url"] = None
# return result
# except Exception as e:
# logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")

View File

@@ -0,0 +1,126 @@
import io
import logging
import time
import cv2
import numpy as np
from .builder import ITEMS
from .clothing import Clothing
from PIL import Image
from ..utils.conversion_image import rgb_to_rgba
from ..utils.upload_image import upload_png_mask
from ...utils.generate_uuid import generate_uuid
@ITEMS.register_module()
class Shoes(Clothing):
# TODO location of shoes has little mismatch
def __init__(self, **kwargs):
pipeline = [
dict(type='LoadImageFromFile', path=kwargs['path'], color=kwargs['color']),
dict(type='KeypointDetection'),
dict(type='ContourDetection'),
dict(type='Painting'),
dict(type='Scaling'),
dict(type='Split'),
# dict(type='ImageShow', key=['image', 'mask', 'pattern_image']),
]
kwargs.update(pipeline=pipeline)
super(Shoes, self).__init__(**kwargs)
def organize(self, layer):
left_shoe_mask, right_shoe_mask = self.cut()
left_layer = dict(name=f'{type(self).__name__.lower()}_left',
image=self.result['shoes_left'],
image_url=self.result['left_image_url'],
mask_url=self.result['left_mask_url'],
sacle=self.result['scale'],
clothes_keypoint=self.result['clothes_keypoint'],
position=self.calculate_start_point(self.result['keypoint'],
self.result['scale'],
self.result['clothes_keypoint'],
self.result['body_point'],
'left'))
layer.insert(left_layer)
right_layer = dict(name=f'{type(self).__name__.lower()}_right',
image=self.result['shoes_right'],
image_url=self.result['right_image_url'],
mask_url=self.result['right_mask_url'],
sacle=self.result['scale'],
clothes_keypoint=self.result['clothes_keypoint'],
position=self.calculate_start_point(self.result['keypoint'],
self.result['scale'],
self.result['clothes_keypoint'],
self.result['body_point'],
'right'))
layer.insert(right_layer)
def cut(self):
"""
Cut shoes mask into two pieces
Returns:
"""
contour, _ = cv2.findContours(self.result['mask'], cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = sorted(contour, key=cv2.contourArea, reverse=True)
bounding_boxes = [cv2.boundingRect(c) for c in contours[:2]]
(contours, bounding_boxes) = zip(*sorted(zip(contours[:2], bounding_boxes), key=lambda x: x[1][0], reverse=False))
epsilon_left = 0.001 * cv2.arcLength(contours[0], True)
approx_left = cv2.approxPolyDP(contours[0], epsilon_left, True)
mask_left = np.zeros(self.result['final_image'].shape[:2], np.uint8)
cv2.drawContours(mask_left, [approx_left], -1, 255, -1)
item_mask_left = cv2.GaussianBlur(mask_left, (5, 5), 0)
rgba_image = rgb_to_rgba((self.result['final_image'].shape[0], self.result['final_image'].shape[1]), self.result['final_image'], item_mask_left)
result_image = np.zeros_like(rgba_image)
result_image[self.result['front_mask'] != 0] = rgba_image[self.result['front_mask'] != 0]
result_left_image_pil = Image.fromarray(result_image, 'RGBA')
result_left_image_pil = result_left_image_pil.resize((int(result_left_image_pil.width * self.result["scale"]), int(result_left_image_pil.height * self.result["scale"])), Image.LANCZOS)
self.result['shoes_left'], self.result["left_image_url"], self.result["left_mask_url"] = upload_png_mask(result_left_image_pil, f"{generate_uuid()}")
epsilon_right = 0.001 * cv2.arcLength(contours[1], True)
approx_right = cv2.approxPolyDP(contours[1], epsilon_right, True)
mask_right = np.zeros(self.result['final_image'].shape[:2], np.uint8)
cv2.drawContours(mask_right, [approx_right], -1, 255, -1)
item_mask_right = cv2.GaussianBlur(mask_right, (5, 5), 0)
rgba_image = rgb_to_rgba((self.result['final_image'].shape[0], self.result['final_image'].shape[1]), self.result['final_image'], item_mask_right)
result_image = np.zeros_like(rgba_image)
result_image[self.result['front_mask'] != 0] = rgba_image[self.result['front_mask'] != 0]
result_right_image_pil = Image.fromarray(result_image, 'RGBA')
result_right_image_pil = result_right_image_pil.resize((int(result_right_image_pil.width * self.result["scale"]), int(result_right_image_pil.height * self.result["scale"])), Image.LANCZOS)
self.result['shoes_right'], self.result["right_image_url"], self.result["right_mask_url"] = upload_png_mask(result_right_image_pil, f"{generate_uuid()}")
return item_mask_left, item_mask_right
@staticmethod
def calculate_start_point(keypoint_type, scale, clothes_point, body_point, location):
"""
left shoes align left
right shoes align right
Args:
keypoint_type: string, "toe"
scale: float
clothes_point: dict{'left': [x1, y1, z1], 'right': [x2, y2, z2]}
body_point: dict, containing keypoint data of body figure
location: string, indicates whether the start point belongs to right or left shoe
Returns:
start_point: tuple (x', y')
x' = y_body - y1 * scale
y' = x_body - x1 * scale
"""
if location not in ['left', 'right']:
raise KeyError(f'location value must be left or right but got {location}')
side_indicator = f'{keypoint_type}_{location}'
# clothes_point = {k: tuple(map(lambda x: int(scale * x), v[0: 2])) for k, v in clothes_point.items()}
start_point = (body_point[side_indicator][1] - int(int(clothes_point[side_indicator].split("_")[1]) * scale),
body_point[side_indicator][0] - int(int(clothes_point[side_indicator].split("_")[0]) * scale))
return start_point

View File

@@ -0,0 +1,46 @@
from .builder import ITEMS
from .clothing import Clothing
@ITEMS.register_module()
class Top(Clothing):
def __init__(self, pipeline, **kwargs):
if pipeline is None:
pipeline = [
dict(type='LoadImageFromFile', path=kwargs['path'], color=kwargs['color'], print_dict=kwargs['print']),
dict(type='KeypointDetection'),
dict(type='ContourDetection'),
dict(type='Segmentation', device='cpu', show=False, debug=kwargs['debug']),
dict(type='Painting', painting_flag=True),
dict(type='PrintPainting', print_flag=True),
# dict(type='ImageShow', key=['image', 'mask', 'seg_visualize', 'pattern_image']),
dict(type='Scaling'),
dict(type='Split'),
]
kwargs.update(pipeline=pipeline)
super(Top, self).__init__(**kwargs)
@ITEMS.register_module()
class Blouse(Top):
def __init__(self, pipeline=None, **kwargs):
super(Blouse, self).__init__(pipeline, **kwargs)
@ITEMS.register_module()
class Outwear(Top):
def __init__(self, pipeline=None, **kwargs):
super(Outwear, self).__init__(pipeline, **kwargs)
@ITEMS.register_module()
class Dress(Top):
def __init__(self, pipeline=None, **kwargs):
super(Dress, self).__init__(pipeline, **kwargs)
# Men's clothing
@ITEMS.register_module()
class Tops(Top):
def __init__(self, pipeline=None, **kwargs):
super(Tops, self).__init__(pipeline, **kwargs)