feat design batch
fix
This commit is contained in:
90
app/design_batch/request_data/requests_data.json
Normal file
90
app/design_batch/request_data/requests_data.json
Normal file
@@ -0,0 +1,90 @@
|
||||
{
|
||||
"objects": [
|
||||
{
|
||||
"basic": {
|
||||
"body_point_test": {
|
||||
"waistband_right": [
|
||||
201,
|
||||
242
|
||||
],
|
||||
"hand_point_right": [
|
||||
222,
|
||||
312
|
||||
],
|
||||
"waistband_left": [
|
||||
114,
|
||||
243
|
||||
],
|
||||
"hand_point_left": [
|
||||
94,
|
||||
310
|
||||
],
|
||||
"shoulder_left": [
|
||||
102,
|
||||
116
|
||||
],
|
||||
"shoulder_right": [
|
||||
211,
|
||||
115
|
||||
]
|
||||
},
|
||||
"layer_order": true,
|
||||
"scale_bag": 0.7,
|
||||
"scale_earrings": 0.16,
|
||||
"self_template": true,
|
||||
"single_overall": "overall",
|
||||
"switch_category": ""
|
||||
},
|
||||
"items": [
|
||||
{
|
||||
"businessId": 264931,
|
||||
"color": "145 220 232",
|
||||
"image_id": 96844,
|
||||
"offset": [
|
||||
0,
|
||||
0
|
||||
],
|
||||
"path": "aida-users/87/sketch/2aa7aad5-74bb-41fa-9cdf-f06611b3e89a-2-87.png",
|
||||
"print": {
|
||||
"element": {
|
||||
"element_angle_list": [],
|
||||
"element_path_list": [],
|
||||
"element_scale_list": [],
|
||||
"location": []
|
||||
},
|
||||
"overall": {
|
||||
"location": [],
|
||||
"print_angle_list": [],
|
||||
"print_path_list": [],
|
||||
"print_scale_list": []
|
||||
},
|
||||
"single": {
|
||||
"location": [],
|
||||
"print_angle_list": [],
|
||||
"print_path_list": [],
|
||||
"print_scale_list": []
|
||||
}
|
||||
},
|
||||
"priority": 10,
|
||||
"resize_scale": [
|
||||
1.0,
|
||||
1.0
|
||||
],
|
||||
"type": "Dress"
|
||||
},
|
||||
{
|
||||
"body_path": "aida-sys-image/models/female/79805ec3-3f01-466d-91e0-36028d079699.png",
|
||||
"image_id": 95444,
|
||||
"type": "Body"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
],
|
||||
"process_id": "87",
|
||||
"tasks_id": ,
|
||||
}
|
||||
|
||||
|
||||
//用 openai jsonl
|
||||
//
|
||||
0
app/service/design/design_batch/items/__init__.py
Normal file
0
app/service/design/design_batch/items/__init__.py
Normal file
281
app/service/design/design_batch/items/item.py
Normal file
281
app/service/design/design_batch/items/item.py
Normal file
@@ -0,0 +1,281 @@
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pprint import pprint
|
||||
|
||||
import cv2
|
||||
|
||||
from app.core.config import PRIORITY_DICT
|
||||
from app.service.design.design_batch.items.pipeline import LoadImage, KeyPoint, Segmentation, Color, PrintPainting, Scaling, Split, ContourDetection, LoadBodyImage
|
||||
from app.service.design.utils.synthesis_item import synthesis, synthesis_single
|
||||
|
||||
|
||||
class BaseItem:
|
||||
def __init__(self, data, basic):
|
||||
self.result = data.copy()
|
||||
self.result['name'] = data['type'].lower()
|
||||
self.result.pop("type")
|
||||
self.result.update(basic)
|
||||
|
||||
|
||||
class TopItem(BaseItem):
|
||||
def __init__(self, data, basic):
|
||||
super().__init__(data, basic)
|
||||
self.top_pipeline = [
|
||||
LoadImage(),
|
||||
KeyPoint(),
|
||||
Segmentation(),
|
||||
Color(),
|
||||
PrintPainting(),
|
||||
Scaling(),
|
||||
Split()
|
||||
]
|
||||
|
||||
def process(self):
|
||||
for item in self.top_pipeline:
|
||||
self.result = item(self.result)
|
||||
return self.result
|
||||
|
||||
|
||||
class BottomItem(BaseItem):
|
||||
def __init__(self, data, basic):
|
||||
super().__init__(data, basic)
|
||||
self.bottom_pipeline = [
|
||||
LoadImage(),
|
||||
KeyPoint(),
|
||||
ContourDetection(),
|
||||
# Segmentation(),
|
||||
Color(),
|
||||
PrintPainting(),
|
||||
Scaling(),
|
||||
Split()
|
||||
]
|
||||
|
||||
def process(self):
|
||||
for item in self.bottom_pipeline:
|
||||
self.result = item(self.result)
|
||||
return self.result
|
||||
|
||||
|
||||
class BodyItem(BaseItem):
|
||||
def __init__(self, data, basic):
|
||||
super().__init__(data, basic)
|
||||
self.top_pipeline = [
|
||||
LoadBodyImage(),
|
||||
]
|
||||
|
||||
def process(self):
|
||||
for item in self.top_pipeline:
|
||||
self.result = item(self.result)
|
||||
return self.result
|
||||
|
||||
|
||||
def process_item(item, basic):
|
||||
if item['type'] == "Body":
|
||||
body_server = BodyItem(data=item, basic=basic)
|
||||
item_data = body_server.process()
|
||||
elif item['type'].lower() in ['blouse', 'outwear', 'dress', 'tops']:
|
||||
top_server = TopItem(data=item, basic=basic)
|
||||
item_data = top_server.process()
|
||||
else:
|
||||
bottom_server = BottomItem(data=item, basic=basic)
|
||||
item_data = bottom_server.process()
|
||||
return item_data
|
||||
|
||||
|
||||
def calculate_start_point(keypoint_type, scale, clothes_point, body_point, offset, resize_scale):
|
||||
"""
|
||||
Align left
|
||||
Args:
|
||||
keypoint_type: string, "waistband" | "shoulder" | "ear_point"
|
||||
scale: float
|
||||
clothes_point: dict{'left': [x1, y1, z1], 'right': [x2, y2, z2]}
|
||||
body_point: dict, containing keypoint data of body figure
|
||||
|
||||
Returns:
|
||||
start_point: tuple (x', y')
|
||||
x' = y_body - y1 * scale + offset
|
||||
y' = x_body - x1 * scale + offset
|
||||
|
||||
"""
|
||||
side_indicator = f'{keypoint_type}_left'
|
||||
start_point = (
|
||||
int(body_point[side_indicator][1] + offset[1] - int(clothes_point[side_indicator][0]) * scale), # y
|
||||
int(body_point[side_indicator][0] + offset[0] - int(clothes_point[side_indicator][1]) * scale) # x
|
||||
)
|
||||
return start_point
|
||||
|
||||
|
||||
# 服装图层给数据组装
|
||||
def organize_clothing(layer):
|
||||
# 起始坐标
|
||||
start_point = calculate_start_point(layer['keypoint'], layer['scale'], layer['clothes_keypoint'], layer['body_point_test'], layer["offset"], layer["resize_scale"])
|
||||
# 前片数据
|
||||
front_layer = dict(priority=layer['priority'] if layer.get("layer_order", False) else PRIORITY_DICT.get(f'{layer["name"].lower()}_front', None),
|
||||
name=f'{layer["name"].lower()}_front',
|
||||
image=layer["front_image"],
|
||||
# mask_image=layer['front_mask_image'],
|
||||
image_url=layer['front_image_url'],
|
||||
mask_url=layer['mask_url'],
|
||||
sacle=layer['scale'],
|
||||
clothes_keypoint=layer['clothes_keypoint'],
|
||||
position=start_point,
|
||||
resize_scale=layer["resize_scale"],
|
||||
mask=cv2.resize(layer['mask'], layer["front_image"].size),
|
||||
gradient_string=layer['gradient_string'] if 'gradient_string' in layer.keys() else "",
|
||||
pattern_image_url=layer['pattern_image_url'],
|
||||
pattern_image=layer['pattern_image']
|
||||
|
||||
)
|
||||
# 后片数据
|
||||
back_layer = dict(priority=-layer.get("priority", 0) if layer.get("layer_order", False) else PRIORITY_DICT.get(f'{layer["name"].lower()}_back', None),
|
||||
name=f'{layer["name"].lower()}_back',
|
||||
image=layer["back_image"],
|
||||
# mask_image=layer['back_mask_image'],
|
||||
image_url=layer['back_image_url'],
|
||||
mask_url=layer['mask_url'],
|
||||
sacle=layer['scale'],
|
||||
clothes_keypoint=layer['clothes_keypoint'],
|
||||
position=start_point,
|
||||
resize_scale=layer["resize_scale"],
|
||||
mask=cv2.resize(layer['mask'], layer["front_image"].size),
|
||||
gradient_string=layer['gradient_string'] if 'gradient_string' in layer.keys() else "",
|
||||
pattern_image_url=layer['pattern_image_url'],
|
||||
)
|
||||
return front_layer, back_layer
|
||||
|
||||
|
||||
# 模特图层给数据组装
|
||||
def organize_body(layer):
|
||||
body_layer = dict(priority=0,
|
||||
name=layer["name"].lower(),
|
||||
image=layer['body_image'],
|
||||
image_url=layer['body_path'],
|
||||
mask_image=None,
|
||||
mask_url=None,
|
||||
sacle=1,
|
||||
# mask=layer['body_mask'],
|
||||
position=(0, 0))
|
||||
return body_layer
|
||||
|
||||
|
||||
def process_layer(item, layers):
|
||||
if item['name'] == "mannequin":
|
||||
body_layer = organize_body(item)
|
||||
layers.append(body_layer)
|
||||
return item['body_image'].size
|
||||
else:
|
||||
front_layer, back_layer = organize_clothing(item)
|
||||
layers.append(front_layer)
|
||||
layers.append(back_layer)
|
||||
|
||||
|
||||
def process_object(object_data):
|
||||
basic = object_data['basic']
|
||||
items_response = {'layers': []}
|
||||
|
||||
if basic['single_overall'] == "overall":
|
||||
item_results = [process_item(item, basic) for item in object_data['items']]
|
||||
layers = []
|
||||
futures = []
|
||||
body_size = None
|
||||
for item in item_results:
|
||||
futures = [process_layer(item, layers)]
|
||||
for future in futures:
|
||||
if future is not None:
|
||||
body_size = future
|
||||
layers = sorted(layers, key=lambda s: s.get("priority", float('inf')))
|
||||
|
||||
layers, new_size = update_base_size_priority(layers, body_size)
|
||||
|
||||
for lay in layers:
|
||||
items_response['layers'].append({
|
||||
'image_category': lay['name'],
|
||||
'position': lay['position'],
|
||||
'priority': lay.get("priority", None),
|
||||
'resize_scale': lay['resize_scale'] if "resize_scale" in lay.keys() else None,
|
||||
'image_size': lay['image'] if lay['image'] is None else lay['image'].size,
|
||||
'gradient_string': lay['gradient_string'] if 'gradient_string' in lay.keys() else "",
|
||||
'mask_url': lay['mask_url'],
|
||||
'image_url': lay['image_url'] if 'image_url' in lay.keys() else None,
|
||||
'pattern_image_url': lay['pattern_image_url'] if 'pattern_image_url' in lay.keys() else None,
|
||||
|
||||
# 'image': lay['image'],
|
||||
# 'mask_image': lay['mask_image'],
|
||||
})
|
||||
items_response['synthesis_url'] = synthesis(layers, new_size, basic)
|
||||
else:
|
||||
item_results = process_item(object_data['items'][0], basic)
|
||||
items_response['layers'].append({
|
||||
'image_category': f"{item_results['name']}_front",
|
||||
'image_size': item_results['back_image'].size if item_results['back_image'] else None,
|
||||
'position': None,
|
||||
'priority': 0,
|
||||
'image_url': item_results['front_image_url'],
|
||||
'mask_url': item_results['mask_url'],
|
||||
"gradient_string": item_results['gradient_string'] if 'gradient_string' in item_results.keys() else "",
|
||||
'pattern_image_url': item_results['pattern_image_url'] if 'pattern_image_url' in item_results.keys() else None,
|
||||
|
||||
})
|
||||
items_response['layers'].append({
|
||||
'image_category': f"{item_results['name']}_back",
|
||||
'image_size': item_results['front_image'].size if item_results['front_image'] else None,
|
||||
'position': None,
|
||||
'priority': 0,
|
||||
'image_url': item_results['back_image_url'],
|
||||
'mask_url': item_results['mask_url'],
|
||||
"gradient_string": item_results['gradient_string'] if 'gradient_string' in item_results.keys() else "",
|
||||
'pattern_image_url': item_results['pattern_image_url'] if 'pattern_image_url' in item_results.keys() else None,
|
||||
|
||||
})
|
||||
items_response['synthesis_url'] = synthesis_single(item_results['front_image'], item_results['back_image'])
|
||||
return items_response
|
||||
|
||||
|
||||
def update_base_size_priority(layers, size):
|
||||
# 计算透明背景图片的宽度
|
||||
min_x = min(info['position'][1] for info in layers)
|
||||
x_list = []
|
||||
for info in layers:
|
||||
if info['image'] is not None:
|
||||
x_list.append(info['position'][1] + info['image'].width)
|
||||
max_x = max(x_list)
|
||||
new_width = max_x - min_x
|
||||
new_height = 700
|
||||
# 更新坐标
|
||||
for info in layers:
|
||||
info['adaptive_position'] = (info['position'][0], info['position'][1] - min_x)
|
||||
return layers, (new_width, new_height)
|
||||
|
||||
|
||||
def run():
|
||||
object = {"objects": [{"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 116441, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/outwear_p3139.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 81518, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0628000071.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 65687, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/outwear_746.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 90051, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0628000864.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 67420, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0825001648.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 90354, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0628001300.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 67420, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0825001648.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [
|
||||
{"color": "189 112 112", "icon": "none", "image_id": 101477, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/903000063.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}},
|
||||
"resize_scale": [1.0, 1.0], "type": "Outwear"}]}], "process_id": "3615898424593104"}
|
||||
|
||||
object_result = {}
|
||||
with ThreadPoolExecutor() as executor:
|
||||
results = list(executor.map(process_object, object['objects']))
|
||||
for i, result in enumerate(results):
|
||||
object_result[i] = result
|
||||
|
||||
pprint(object_result)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
start_time = time.time()
|
||||
run()
|
||||
print(time.time() - start_time)
|
||||
20
app/service/design/design_batch/items/pipeline/__init__.py
Normal file
20
app/service/design/design_batch/items/pipeline/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from .color import Color
|
||||
from .contour_detection import ContourDetection
|
||||
from .keypoint import KeyPoint
|
||||
from .keypoint import KeyPoint
|
||||
from .loading import LoadImage, LoadBodyImage
|
||||
from .print_painting import PrintPainting
|
||||
from .scale import Scaling
|
||||
from .segmentation import Segmentation
|
||||
from .split import Split
|
||||
|
||||
__all__ = [
|
||||
'LoadBodyImage', 'LoadImage',
|
||||
'KeyPoint',
|
||||
'ContourDetection',
|
||||
'Segmentation',
|
||||
'Color',
|
||||
'PrintPainting',
|
||||
'Scaling',
|
||||
'Split'
|
||||
]
|
||||
60
app/service/design/design_batch/items/pipeline/color.py
Normal file
60
app/service/design/design_batch/items/pipeline/color.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import logging
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from app.service.utils.oss_client import oss_get_image
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class Color:
|
||||
def __call__(self, result):
|
||||
dim_image_h, dim_image_w = result['image'].shape[0:2]
|
||||
if "gradient" in result.keys() and result['gradient'] != "":
|
||||
bucket_name = result['gradient'].split('/')[0]
|
||||
object_name = result['gradient'][result['gradient'].find('/') + 1:]
|
||||
pattern = self.get_gradient(bucket_name=bucket_name, object_name=object_name)
|
||||
resize_pattern = cv2.resize(pattern, (dim_image_w, dim_image_h), interpolation=cv2.INTER_AREA)
|
||||
else:
|
||||
pattern = self.get_pattern(result['color'])
|
||||
resize_pattern = cv2.resize(pattern, (dim_image_w, dim_image_h), interpolation=cv2.INTER_AREA)
|
||||
closed_mo = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
|
||||
gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2)
|
||||
get_image_fir = resize_pattern * (closed_mo / 255) * (gray_mo / 255)
|
||||
result['pattern_image'] = get_image_fir.astype(np.uint8)
|
||||
result['final_image'] = result['pattern_image']
|
||||
canvas = np.full_like(result['final_image'], 255)
|
||||
temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2)
|
||||
tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8)
|
||||
temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
|
||||
tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8)
|
||||
result['single_image'] = cv2.add(tmp1, tmp2)
|
||||
result['alpha'] = 100 / 255.0
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def get_gradient(bucket_name, object_name):
|
||||
# 获取渐变色图案
|
||||
image = oss_get_image(bucket=bucket_name, object_name=object_name, data_type="cv2")
|
||||
if image.shape[2] == 4:
|
||||
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
|
||||
return image
|
||||
|
||||
@staticmethod
|
||||
def crop_image(image, image_size_h, image_size_w):
|
||||
x_offset = np.random.randint(low=0, high=int(image_size_h / 5) - 6)
|
||||
y_offset = np.random.randint(low=0, high=int(image_size_w / 5) - 6)
|
||||
image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w, :]
|
||||
return image
|
||||
|
||||
@staticmethod
|
||||
def get_pattern(single_color):
|
||||
if single_color is None:
|
||||
raise False
|
||||
R, G, B = single_color.split(' ')
|
||||
pattern = np.zeros([1, 1, 3], np.uint8)
|
||||
pattern[0, 0, 0] = int(B)
|
||||
pattern[0, 0, 1] = int(G)
|
||||
pattern[0, 0, 2] = int(R)
|
||||
return pattern
|
||||
@@ -0,0 +1,37 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
class ContourDetection:
|
||||
def __call__(self, result):
|
||||
Contour = self.get_contours(result['image'])
|
||||
Mask = np.zeros(result['image'].shape[:2], np.uint8)
|
||||
if len(Contour):
|
||||
Max_contour = Contour[0]
|
||||
Epsilon = 0.001 * cv2.arcLength(Max_contour, True)
|
||||
Approx = cv2.approxPolyDP(Max_contour, Epsilon, True)
|
||||
cv2.drawContours(Mask, [Approx], -1, 255, -1)
|
||||
else:
|
||||
Mask = np.ones(result['image'].shape[:2], np.uint8) * 255
|
||||
# TODO 修复部分图片出现透明的情况 下版本上线
|
||||
# img2gray = cv2.cvtColor(result['image'], cv2.COLOR_BGR2GRAY)
|
||||
# ret, Mask = cv2.threshold(img2gray, 126, 255, cv2.THRESH_BINARY)
|
||||
# Mask = cv2.bitwise_not(Mask)
|
||||
if result['pre_mask'] is None:
|
||||
result['mask'] = Mask
|
||||
else:
|
||||
result['mask'] = cv2.bitwise_and(Mask, result['pre_mask'])
|
||||
result['front_mask'] = result['mask']
|
||||
result['back_mask'] = result['mask']
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def get_contours(image):
|
||||
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
Edge = cv2.Canny(gray, 10, 150)
|
||||
kernel = np.ones((5, 5), np.uint8)
|
||||
Edge = cv2.dilate(Edge, kernel=kernel, iterations=1)
|
||||
Edge = cv2.erode(Edge, kernel=kernel, iterations=1)
|
||||
Contour, _ = cv2.findContours(Edge, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
Contour = sorted(Contour, key=cv2.contourArea, reverse=True)
|
||||
return Contour
|
||||
114
app/service/design/design_batch/items/pipeline/keypoint.py
Normal file
114
app/service/design/design_batch/items/pipeline/keypoint.py
Normal file
@@ -0,0 +1,114 @@
|
||||
import logging
|
||||
|
||||
import numpy as np
|
||||
from pymilvus import MilvusClient
|
||||
|
||||
from app.core.config import *
|
||||
from app.service.design.utils.design_ensemble import get_keypoint_result
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KeyPoint:
|
||||
name = "KeyPoint"
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return cls.name
|
||||
|
||||
def __call__(self, result):
|
||||
if result['name'] in ['blouse', 'skirt', 'dress', 'outwear', 'trousers', 'tops', 'bottoms']: # 查询是否有数据 且类别相同 相同则直接读 不同则推理后更新
|
||||
# result['clothes_keypoint'] = self.infer_keypoint_result(result)
|
||||
site = 'up' if result['name'] in ['blouse', 'outwear', 'dress', 'tops'] else 'down'
|
||||
# keypoint_cache = search_keypoint_cache(result["image_id"], site)
|
||||
keypoint_cache = self.keypoint_cache(result, site)
|
||||
# 取消向量查询 直接过模型推理
|
||||
# keypoint_cache = False
|
||||
if keypoint_cache is False:
|
||||
keypoint_infer_result, site = self.infer_keypoint_result(result)
|
||||
result['clothes_keypoint'] = self.save_keypoint_cache(result["image_id"], keypoint_infer_result, site)
|
||||
else:
|
||||
result['clothes_keypoint'] = keypoint_cache
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def infer_keypoint_result(result):
|
||||
site = 'up' if result['name'] in ['blouse', 'outwear', 'dress', 'tops'] else 'down'
|
||||
keypoint_infer_result = get_keypoint_result(result["image"], site) # 推理结果
|
||||
return keypoint_infer_result, site
|
||||
|
||||
@staticmethod
|
||||
def save_keypoint_cache(keypoint_id, cache, site):
|
||||
if site == "down":
|
||||
zeros = np.zeros(20, dtype=int)
|
||||
result = np.concatenate([zeros, cache.flatten()])
|
||||
else:
|
||||
zeros = np.zeros(4, dtype=int)
|
||||
result = np.concatenate([cache.flatten(), zeros])
|
||||
# 取消向量保存 直接拿结果
|
||||
data = [
|
||||
{"keypoint_id": keypoint_id,
|
||||
"keypoint_site": site,
|
||||
"keypoint_vector": result.tolist()
|
||||
}
|
||||
]
|
||||
try:
|
||||
client = MilvusClient(uri=MILVUS_URL, token=MILVUS_TOKEN, db_name=MILVUS_ALIAS)
|
||||
res = client.upsert(collection_name=MILVUS_TABLE_KEYPOINT, data=data)
|
||||
client.close()
|
||||
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
|
||||
except Exception as e:
|
||||
logger.info(f"save keypoint cache milvus error : {e}")
|
||||
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
|
||||
|
||||
@staticmethod
|
||||
def update_keypoint_cache(keypoint_id, infer_result, search_result, site):
|
||||
if site == "up":
|
||||
# 需要的是up 即推理出来的是up 那么查询的就是down
|
||||
result = np.concatenate([infer_result.flatten(), search_result[-4:]])
|
||||
else:
|
||||
# 需要的是down 即推理出来的是down 那么查询的就是up
|
||||
result = np.concatenate([search_result[:20], infer_result.flatten()])
|
||||
data = [
|
||||
{"keypoint_id": keypoint_id,
|
||||
"keypoint_site": "all",
|
||||
"keypoint_vector": result.tolist()
|
||||
}
|
||||
]
|
||||
|
||||
try:
|
||||
client = MilvusClient(uri=MILVUS_URL, token=MILVUS_TOKEN, db_name=MILVUS_ALIAS)
|
||||
client.upsert(
|
||||
collection_name=MILVUS_TABLE_KEYPOINT,
|
||||
data=data
|
||||
)
|
||||
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
|
||||
except Exception as e:
|
||||
logger.info(f"save keypoint cache milvus error : {e}")
|
||||
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist()))
|
||||
|
||||
# @ RunTime
|
||||
def keypoint_cache(self, result, site):
|
||||
try:
|
||||
client = MilvusClient(uri=MILVUS_URL, token=MILVUS_TOKEN, db_name=MILVUS_ALIAS)
|
||||
keypoint_id = result['image_id']
|
||||
res = client.query(
|
||||
collection_name=MILVUS_TABLE_KEYPOINT,
|
||||
# ids=[keypoint_id],
|
||||
filter=f"keypoint_id == {keypoint_id}",
|
||||
output_fields=['keypoint_vector', 'keypoint_site']
|
||||
)
|
||||
if len(res) == 0:
|
||||
# 没有结果 直接推理拿结果 并保存
|
||||
keypoint_infer_result, site = self.infer_keypoint_result(result)
|
||||
return self.save_keypoint_cache(result['image_id'], keypoint_infer_result, site)
|
||||
elif res[0]["keypoint_site"] == "all" or res[0]["keypoint_site"] == site:
|
||||
# 需要的类型和查询的类型一致,或者查询的类型为all 则直接返回查询的结果
|
||||
return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, np.array(res[0]['keypoint_vector']).astype(int).reshape(12, 2).tolist()))
|
||||
elif res[0]["keypoint_site"] != site:
|
||||
# 需要的类型和查询到的不一致,则更新类型为all
|
||||
keypoint_infer_result, site = self.infer_keypoint_result(result)
|
||||
return self.update_keypoint_cache(result["image_id"], keypoint_infer_result, res[0]['keypoint_vector'], site)
|
||||
except Exception as e:
|
||||
logger.info(f"search keypoint cache milvus error {e}")
|
||||
return False
|
||||
68
app/service/design/design_batch/items/pipeline/loading.py
Normal file
68
app/service/design/design_batch/items/pipeline/loading.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import cv2
|
||||
|
||||
from app.service.utils.oss_client import oss_get_image
|
||||
|
||||
|
||||
class LoadBodyImage:
|
||||
name = "LoadBodyImage"
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return cls.name
|
||||
|
||||
def __call__(self, result):
|
||||
result["name"] = "mannequin"
|
||||
result['body_image'] = oss_get_image(bucket=result['body_path'].split("/", 1)[0], object_name=result['body_path'].split("/", 1)[1], data_type="PIL")
|
||||
return result
|
||||
|
||||
|
||||
class LoadImage:
|
||||
name = "LoadImage"
|
||||
|
||||
@classmethod
|
||||
def get_name(cls):
|
||||
return cls.name
|
||||
|
||||
def __call__(self, result):
|
||||
result['image'], result['pre_mask'] = self.read_image(result['path'])
|
||||
result['gray'] = cv2.cvtColor(result['image'], cv2.COLOR_BGR2GRAY)
|
||||
result['keypoint'] = self.get_keypoint(result['name'])
|
||||
result['img_shape'] = result['image'].shape
|
||||
result['ori_shape'] = result['image'].shape
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def read_image(image_path):
|
||||
image_mask = None
|
||||
image = oss_get_image(bucket=image_path.split("/", 1)[0], object_name=image_path.split("/", 1)[1], data_type="cv2")
|
||||
if len(image.shape) == 2:
|
||||
image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
|
||||
if image.shape[2] == 4: # 如果是四通道 mask
|
||||
image_mask = image[:, :, 3]
|
||||
image = image[:, :, :3]
|
||||
|
||||
if image.shape[:2] <= (50, 50):
|
||||
# 计算新尺寸
|
||||
new_size = (image.shape[1] * 2, image.shape[0] * 2)
|
||||
# 调整大小
|
||||
image = cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)
|
||||
return image, image_mask
|
||||
|
||||
@staticmethod
|
||||
def get_keypoint(name):
|
||||
if name == 'blouse' or name == 'outwear' or name == 'dress' or name == 'tops':
|
||||
keypoint = 'shoulder'
|
||||
elif name == 'trousers' or name == 'skirt' or name == 'bottoms':
|
||||
keypoint = 'waistband'
|
||||
elif name == 'bag':
|
||||
keypoint = 'hand_point'
|
||||
elif name == 'shoes':
|
||||
keypoint = 'toe'
|
||||
elif name == 'hairstyle':
|
||||
keypoint = 'head_point'
|
||||
elif name == 'earring':
|
||||
keypoint = 'ear_point'
|
||||
else:
|
||||
raise KeyError(f"{name} does not belong to item category list: blouse, outwear, dress, trousers, skirt, "
|
||||
f"bag, shoes, hairstyle, earring.")
|
||||
return keypoint
|
||||
523
app/service/design/design_batch/items/pipeline/print_painting.py
Normal file
523
app/service/design/design_batch/items/pipeline/print_painting.py
Normal file
@@ -0,0 +1,523 @@
|
||||
import random
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from app.service.utils.oss_client import oss_get_image
|
||||
|
||||
|
||||
class PrintPainting:
|
||||
def __call__(self, result):
|
||||
single_print = result['print']['single']
|
||||
overall_print = result['print']['overall']
|
||||
element_print = result['print']['element']
|
||||
result['single_image'] = None
|
||||
result['print_image'] = None
|
||||
if overall_print['print_path_list']:
|
||||
painting_dict = {'dim_image_h': result['pattern_image'].shape[0], 'dim_image_w': result['pattern_image'].shape[1]}
|
||||
result['print_image'] = result['pattern_image']
|
||||
if "print_angle_list" in overall_print.keys() and overall_print['print_angle_list'][0] != 0:
|
||||
painting_dict = self.painting_collection(painting_dict, overall_print, print_trigger=True)
|
||||
painting_dict['tile_print'] = self.rotate_crop_image(img=painting_dict['tile_print'], angle=-overall_print['print_angle_list'][0], crop=True)
|
||||
painting_dict['mask_inv_print'] = self.rotate_crop_image(img=painting_dict['mask_inv_print'], angle=-overall_print['print_angle_list'][0], crop=True)
|
||||
|
||||
# resize 到sketch大小
|
||||
painting_dict['tile_print'] = self.resize_and_crop(img=painting_dict['tile_print'], target_width=painting_dict['dim_image_w'], target_height=painting_dict['dim_image_h'])
|
||||
painting_dict['mask_inv_print'] = self.resize_and_crop(img=painting_dict['mask_inv_print'], target_width=painting_dict['dim_image_w'], target_height=painting_dict['dim_image_h'])
|
||||
else:
|
||||
painting_dict = self.painting_collection(painting_dict, overall_print, print_trigger=True, is_single=False)
|
||||
result['print_image'] = self.printpaint(result, painting_dict, print_=True)
|
||||
result['single_image'] = result['final_image'] = result['pattern_image'] = result['print_image']
|
||||
|
||||
if single_print['print_path_list']:
|
||||
print_background = np.zeros((result['pattern_image'].shape[0], result['pattern_image'].shape[1], 3), dtype=np.uint8)
|
||||
mask_background = np.zeros((result['pattern_image'].shape[0], result['pattern_image'].shape[1], 3), dtype=np.uint8)
|
||||
for i in range(len(single_print['print_path_list'])):
|
||||
image, image_mode = self.read_image(single_print['print_path_list'][i])
|
||||
if image_mode == "RGBA":
|
||||
new_size = (int(image.width * single_print['print_scale_list'][i]), int(image.height * single_print['print_scale_list'][i]))
|
||||
|
||||
mask = image.split()[3]
|
||||
resized_source = image.resize(new_size)
|
||||
resized_source_mask = mask.resize(new_size)
|
||||
|
||||
rotated_resized_source = resized_source.rotate(-single_print['print_angle_list'][i])
|
||||
rotated_resized_source_mask = resized_source_mask.rotate(-single_print['print_angle_list'][i])
|
||||
|
||||
source_image_pil = Image.fromarray(cv2.cvtColor(print_background, cv2.COLOR_BGR2RGB))
|
||||
source_image_pil_mask = Image.fromarray(cv2.cvtColor(mask_background, cv2.COLOR_BGR2RGB))
|
||||
|
||||
source_image_pil.paste(rotated_resized_source, (int(single_print['location'][i][0]), int(single_print['location'][i][1])), rotated_resized_source)
|
||||
source_image_pil_mask.paste(rotated_resized_source_mask, (int(single_print['location'][i][0]), int(single_print['location'][i][1])), rotated_resized_source_mask)
|
||||
|
||||
print_background = cv2.cvtColor(np.array(source_image_pil), cv2.COLOR_RGBA2BGR)
|
||||
mask_background = cv2.cvtColor(np.array(source_image_pil_mask), cv2.COLOR_RGBA2BGR)
|
||||
ret, mask_background = cv2.threshold(mask_background, 124, 255, cv2.THRESH_BINARY)
|
||||
else:
|
||||
mask = self.get_mask_inv(image)
|
||||
mask = np.expand_dims(mask, axis=2)
|
||||
mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
|
||||
mask = cv2.bitwise_not(mask)
|
||||
# 旋转后的坐标需要重新算
|
||||
rotate_mask, _ = self.img_rotate(mask, single_print['print_angle_list'][i], single_print['print_scale_list'][i])
|
||||
rotate_image, rotated_new_size = self.img_rotate(image, single_print['print_angle_list'][i], single_print['print_scale_list'][i])
|
||||
# x, y = int(result['print']['location'][i][0] - rotated_new_size[0] - (rotate_mask.shape[0] - image.shape[0]) / 2), int(result['print']['location'][i][1] - rotated_new_size[1] - (rotate_mask.shape[1] - image.shape[1]) / 2)
|
||||
x, y = int(single_print['location'][i][0] - rotated_new_size[0]), int(single_print['location'][i][1] - rotated_new_size[1])
|
||||
|
||||
image_x = print_background.shape[1]
|
||||
image_y = print_background.shape[0]
|
||||
print_x = rotate_image.shape[1]
|
||||
print_y = rotate_image.shape[0]
|
||||
|
||||
# 有bug
|
||||
# if x + print_x > image_x:
|
||||
# rotate_image = rotate_image[:, :x + print_x - image_x]
|
||||
# rotate_mask = rotate_mask[:, :x + print_x - image_x]
|
||||
# #
|
||||
# if y + print_y > image_y:
|
||||
# rotate_image = rotate_image[:y + print_y - image_y]
|
||||
# rotate_mask = rotate_mask[:y + print_y - image_y]
|
||||
|
||||
# 不能是并行
|
||||
# 当前第一轮的if (108以及115)是判断有没有过下界和右界。第二轮的是判断左上有没有超出。 如果这个样子的话,先裁了右边,再左移,region就会有问题
|
||||
# 先挪 再判断 最后裁剪
|
||||
|
||||
# 如果print旋转了 或者 print贴边了 则需要判断 判断左界和上界是否小于0
|
||||
if x <= 0:
|
||||
rotate_image = rotate_image[:, -x:]
|
||||
rotate_mask = rotate_mask[:, -x:]
|
||||
start_x = x = 0
|
||||
else:
|
||||
start_x = x
|
||||
|
||||
if y <= 0:
|
||||
rotate_image = rotate_image[-y:, :]
|
||||
rotate_mask = rotate_mask[-y:, :]
|
||||
start_y = y = 0
|
||||
else:
|
||||
start_y = y
|
||||
|
||||
# ------------------
|
||||
# 如果print-size大于image-size 则需要裁剪print
|
||||
|
||||
if x + print_x > image_x:
|
||||
rotate_image = rotate_image[:, :image_x - x]
|
||||
rotate_mask = rotate_mask[:, :image_x - x]
|
||||
|
||||
if y + print_y > image_y:
|
||||
rotate_image = rotate_image[:image_y - y, :]
|
||||
rotate_mask = rotate_mask[:image_y - y, :]
|
||||
|
||||
# mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = cv2.bitwise_xor(mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]], rotate_mask)
|
||||
# print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = cv2.add(print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]], rotate_image)
|
||||
|
||||
# mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = rotate_mask
|
||||
# print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image
|
||||
mask_background = self.stack_prin(mask_background, result['pattern_image'], rotate_mask, start_y, y, start_x, x)
|
||||
print_background = self.stack_prin(print_background, result['pattern_image'], rotate_image, start_y, y, start_x, x)
|
||||
|
||||
# gray_image = cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY)
|
||||
# print_background = cv2.bitwise_and(print_background, print_background, mask=gray_image)
|
||||
|
||||
print_mask = cv2.bitwise_and(result['mask'], cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY))
|
||||
img_fg = cv2.bitwise_or(print_background, print_background, mask=print_mask)
|
||||
img_bg = cv2.bitwise_and(result['pattern_image'], result['pattern_image'], mask=cv2.bitwise_not(print_mask))
|
||||
mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2)
|
||||
gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2)
|
||||
img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8)
|
||||
result['final_image'] = cv2.add(img_bg, img_fg)
|
||||
canvas = np.full_like(result['final_image'], 255)
|
||||
temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2)
|
||||
tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8)
|
||||
temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
|
||||
tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8)
|
||||
result['single_image'] = cv2.add(tmp1, tmp2)
|
||||
|
||||
if element_print['element_path_list']:
|
||||
print_background = np.zeros((result['final_image'].shape[0], result['final_image'].shape[1], 3), dtype=np.uint8)
|
||||
mask_background = np.zeros((result['final_image'].shape[0], result['final_image'].shape[1], 3), dtype=np.uint8)
|
||||
for i in range(len(element_print['element_path_list'])):
|
||||
image, image_mode = self.read_image(element_print['element_path_list'][i])
|
||||
if image_mode == "RGBA":
|
||||
new_size = (int(image.width * element_print['element_scale_list'][i]), int(image.height * element_print['element_scale_list'][i]))
|
||||
|
||||
mask = image.split()[3]
|
||||
resized_source = image.resize(new_size)
|
||||
resized_source_mask = mask.resize(new_size)
|
||||
|
||||
rotated_resized_source = resized_source.rotate(-element_print['element_angle_list'][i])
|
||||
rotated_resized_source_mask = resized_source_mask.rotate(-element_print['element_angle_list'][i])
|
||||
|
||||
source_image_pil = Image.fromarray(cv2.cvtColor(print_background, cv2.COLOR_BGR2RGB))
|
||||
source_image_pil_mask = Image.fromarray(cv2.cvtColor(mask_background, cv2.COLOR_BGR2RGB))
|
||||
|
||||
source_image_pil.paste(rotated_resized_source, (int(element_print['location'][i][0]), int(element_print['location'][i][1])), rotated_resized_source)
|
||||
source_image_pil_mask.paste(rotated_resized_source_mask, (int(element_print['location'][i][0]), int(element_print['location'][i][1])), rotated_resized_source_mask)
|
||||
|
||||
print_background = cv2.cvtColor(np.array(source_image_pil), cv2.COLOR_RGBA2BGR)
|
||||
mask_background = cv2.cvtColor(np.array(source_image_pil_mask), cv2.COLOR_RGBA2BGR)
|
||||
else:
|
||||
mask = self.get_mask_inv(image)
|
||||
mask = np.expand_dims(mask, axis=2)
|
||||
mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
|
||||
mask = cv2.bitwise_not(mask)
|
||||
# 旋转后的坐标需要重新算
|
||||
rotate_mask, _ = self.img_rotate(mask, element_print['element_angle_list'][i], element_print['element_scale_list'][i])
|
||||
rotate_image, rotated_new_size = self.img_rotate(image, element_print['element_angle_list'][i], element_print['element_scale_list'][i])
|
||||
# x, y = int(result['print']['location'][i][0] - rotated_new_size[0] - (rotate_mask.shape[0] - image.shape[0]) / 2), int(result['print']['location'][i][1] - rotated_new_size[1] - (rotate_mask.shape[1] - image.shape[1]) / 2)
|
||||
x, y = int(element_print['location'][i][0] - rotated_new_size[0]), int(element_print['location'][i][1] - rotated_new_size[1])
|
||||
|
||||
image_x = print_background.shape[1]
|
||||
image_y = print_background.shape[0]
|
||||
print_x = rotate_image.shape[1]
|
||||
print_y = rotate_image.shape[0]
|
||||
|
||||
# 有bug
|
||||
# if x + print_x > image_x:
|
||||
# rotate_image = rotate_image[:, :x + print_x - image_x]
|
||||
# rotate_mask = rotate_mask[:, :x + print_x - image_x]
|
||||
# #
|
||||
# if y + print_y > image_y:
|
||||
# rotate_image = rotate_image[:y + print_y - image_y]
|
||||
# rotate_mask = rotate_mask[:y + print_y - image_y]
|
||||
|
||||
# 不能是并行
|
||||
# 当前第一轮的if (108以及115)是判断有没有过下界和右界。第二轮的是判断左上有没有超出。 如果这个样子的话,先裁了右边,再左移,region就会有问题
|
||||
# 先挪 再判断 最后裁剪
|
||||
|
||||
# 如果print旋转了 或者 print贴边了 则需要判断 判断左界和上界是否小于0
|
||||
if x <= 0:
|
||||
rotate_image = rotate_image[:, -x:]
|
||||
rotate_mask = rotate_mask[:, -x:]
|
||||
start_x = x = 0
|
||||
else:
|
||||
start_x = x
|
||||
|
||||
if y <= 0:
|
||||
rotate_image = rotate_image[-y:, :]
|
||||
rotate_mask = rotate_mask[-y:, :]
|
||||
start_y = y = 0
|
||||
else:
|
||||
start_y = y
|
||||
|
||||
# ------------------
|
||||
# 如果print-size大于image-size 则需要裁剪print
|
||||
|
||||
if x + print_x > image_x:
|
||||
rotate_image = rotate_image[:, :image_x - x]
|
||||
rotate_mask = rotate_mask[:, :image_x - x]
|
||||
|
||||
if y + print_y > image_y:
|
||||
rotate_image = rotate_image[:image_y - y, :]
|
||||
rotate_mask = rotate_mask[:image_y - y, :]
|
||||
|
||||
# mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = cv2.bitwise_xor(mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]], rotate_mask)
|
||||
# print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = cv2.add(print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]], rotate_image)
|
||||
|
||||
# mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = rotate_mask
|
||||
# print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image
|
||||
mask_background = self.stack_prin(mask_background, result['pattern_image'], rotate_mask, start_y, y, start_x, x)
|
||||
print_background = self.stack_prin(print_background, result['pattern_image'], rotate_image, start_y, y, start_x, x)
|
||||
|
||||
# gray_image = cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY)
|
||||
# print_background = cv2.bitwise_and(print_background, print_background, mask=gray_image)
|
||||
|
||||
print_mask = cv2.bitwise_and(result['mask'], cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY))
|
||||
img_fg = cv2.bitwise_or(print_background, print_background, mask=print_mask)
|
||||
# TODO element 丢失信息
|
||||
three_channel_image = cv2.merge([cv2.bitwise_not(print_mask), cv2.bitwise_not(print_mask), cv2.bitwise_not(print_mask)])
|
||||
img_bg = cv2.bitwise_and(result['final_image'], three_channel_image)
|
||||
# mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2)
|
||||
# gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2)
|
||||
# img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8)
|
||||
result['final_image'] = cv2.add(img_bg, img_fg)
|
||||
canvas = np.full_like(result['final_image'], 255)
|
||||
temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2)
|
||||
tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8)
|
||||
temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2)
|
||||
tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8)
|
||||
result['single_image'] = cv2.add(tmp1, tmp2)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def stack_prin(print_background, pattern_image, rotate_image, start_y, y, start_x, x):
|
||||
temp_print = np.zeros((pattern_image.shape[0], pattern_image.shape[1], 3), dtype=np.uint8)
|
||||
temp_print[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image
|
||||
img2gray = cv2.cvtColor(temp_print, cv2.COLOR_BGR2GRAY)
|
||||
ret, mask_ = cv2.threshold(img2gray, 1, 255, cv2.THRESH_BINARY)
|
||||
mask_inv = cv2.bitwise_not(mask_)
|
||||
img1_bg = cv2.bitwise_and(print_background, print_background, mask=mask_inv)
|
||||
img2_fg = cv2.bitwise_and(temp_print, temp_print, mask=mask_)
|
||||
print_background = img1_bg + img2_fg
|
||||
return print_background
|
||||
|
||||
def painting_collection(self, painting_dict, print_dict, print_trigger=False, is_single=False):
|
||||
if print_trigger:
|
||||
print_ = self.get_print(print_dict)
|
||||
painting_dict['Trigger'] = not is_single
|
||||
painting_dict['location'] = print_['location']
|
||||
single_mask_inv_print = self.get_mask_inv(print_['image'])
|
||||
dim_max = max(painting_dict['dim_image_h'], painting_dict['dim_image_w'])
|
||||
dim_pattern = (int(dim_max * print_['scale'] / 5), int(dim_max * print_['scale'] / 5))
|
||||
if not is_single:
|
||||
self.random_seed = random.randint(0, 1000)
|
||||
# 如果print 模式为overall 且 有角度的话 , 组合的print为正方形,方便裁剪
|
||||
if "print_angle_list" in print_dict.keys() and print_dict['print_angle_list'][0] != 0:
|
||||
painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], dim_max, dim_max, painting_dict['location'], trigger=True)
|
||||
painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], dim_max, dim_max, painting_dict['location'], trigger=True)
|
||||
else:
|
||||
painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'], trigger=True)
|
||||
painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'], trigger=True)
|
||||
else:
|
||||
painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'])
|
||||
painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'])
|
||||
painting_dict['dim_print_h'], painting_dict['dim_print_w'] = dim_pattern
|
||||
return painting_dict
|
||||
|
||||
def tile_image(self, pattern, dim, scale, dim_image_h, dim_image_w, location, trigger=False):
|
||||
tile = None
|
||||
if not trigger:
|
||||
tile = cv2.resize(pattern, dim, interpolation=cv2.INTER_AREA)
|
||||
else:
|
||||
resize_pattern = cv2.resize(pattern, dim, interpolation=cv2.INTER_AREA)
|
||||
if len(pattern.shape) == 2:
|
||||
tile = np.tile(resize_pattern, (int((5 + 1) / scale) + 4, int((5 + 1) / scale) + 4))
|
||||
if len(pattern.shape) == 3:
|
||||
tile = np.tile(resize_pattern, (int((5 + 1) / scale) + 4, int((5 + 1) / scale) + 4, 1))
|
||||
tile = self.crop_image(tile, dim_image_h, dim_image_w, location, resize_pattern.shape)
|
||||
return tile
|
||||
|
||||
def get_mask_inv(self, print_):
|
||||
if print_[0][0][0] == 255 and print_[0][0][1] == 255 and print_[0][0][2] == 255:
|
||||
bg_color = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)[0][0]
|
||||
print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)
|
||||
bg_l, bg_a, bg_b = bg_color[0], bg_color[1], bg_color[2]
|
||||
bg_L_high, bg_L_low = self.get_low_high_lab(bg_l, L=True)
|
||||
bg_a_high, bg_a_low = self.get_low_high_lab(bg_a)
|
||||
bg_b_high, bg_b_low = self.get_low_high_lab(bg_b)
|
||||
lower = np.array([bg_L_low, bg_a_low, bg_b_low])
|
||||
upper = np.array([bg_L_high, bg_a_high, bg_b_high])
|
||||
mask_inv = cv2.inRange(print_tile, lower, upper)
|
||||
return mask_inv
|
||||
else:
|
||||
# bg_color = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)[0][0]
|
||||
# print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)
|
||||
# bg_l, bg_a, bg_b = bg_color[0], bg_color[1], bg_color[2]
|
||||
# bg_L_high, bg_L_low = self.get_low_high_lab(bg_l, L=True)
|
||||
# bg_a_high, bg_a_low = self.get_low_high_lab(bg_a)
|
||||
# bg_b_high, bg_b_low = self.get_low_high_lab(bg_b)
|
||||
# lower = np.array([bg_L_low, bg_a_low, bg_b_low])
|
||||
# upper = np.array([bg_L_high, bg_a_high, bg_b_high])
|
||||
|
||||
# print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)
|
||||
# mask_inv = cv2.cvtColor(print_tile, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
# mask_inv = cv2.cvtColor(print_, cv2.COLOR_BGR2GRAY)
|
||||
mask_inv = np.zeros(print_.shape[:2], dtype=np.uint8)
|
||||
return mask_inv
|
||||
|
||||
@staticmethod
|
||||
def printpaint(result, painting_dict, print_=False):
|
||||
|
||||
if print_ and painting_dict['Trigger']:
|
||||
print_mask = cv2.bitwise_and(result['mask'], cv2.bitwise_not(painting_dict['mask_inv_print']))
|
||||
img_fg = cv2.bitwise_and(painting_dict['tile_print'], painting_dict['tile_print'], mask=print_mask)
|
||||
else:
|
||||
print_mask = result['mask']
|
||||
img_fg = result['final_image']
|
||||
if print_ and not painting_dict['Trigger']:
|
||||
index_ = None
|
||||
try:
|
||||
index_ = len(painting_dict['location'])
|
||||
except:
|
||||
assert f'there must be parameter of location if choose IfSingle'
|
||||
|
||||
for i in range(index_):
|
||||
start_h, start_w = int(painting_dict['location'][i][1]), int(painting_dict['location'][i][0])
|
||||
|
||||
length_h = min(start_h + painting_dict['dim_print_h'], img_fg.shape[0])
|
||||
length_w = min(start_w + painting_dict['dim_print_w'], img_fg.shape[1])
|
||||
|
||||
change_region = img_fg[start_h: length_h, start_w: length_w, :]
|
||||
# problem in change_mask
|
||||
change_mask = print_mask[start_h: length_h, start_w: length_w]
|
||||
# get real part into change mask
|
||||
_, change_mask = cv2.threshold(change_mask, 220, 255, cv2.THRESH_BINARY)
|
||||
mask = cv2.bitwise_not(painting_dict['mask_inv_print'])
|
||||
img_fg[start_h:start_h + painting_dict['dim_print_h'], start_w:start_w + painting_dict['dim_print_w'], :] = change_region
|
||||
|
||||
clothes_mask_print = cv2.bitwise_not(print_mask)
|
||||
|
||||
img_bg = cv2.bitwise_and(result['pattern_image'], result['pattern_image'], mask=clothes_mask_print)
|
||||
mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2)
|
||||
gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2)
|
||||
img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8)
|
||||
print_image = cv2.add(img_bg, img_fg)
|
||||
return print_image
|
||||
|
||||
@staticmethod
|
||||
def get_print(print_dict):
|
||||
if 'print_scale_list' not in print_dict.keys() or print_dict['print_scale_list'][0] < 0.3:
|
||||
print_dict['scale'] = 0.3
|
||||
else:
|
||||
print_dict['scale'] = print_dict['print_scale_list'][0]
|
||||
|
||||
bucket_name = print_dict['print_path_list'][0].split("/", 1)[0]
|
||||
object_name = print_dict['print_path_list'][0].split("/", 1)[1]
|
||||
image = oss_get_image(bucket=bucket_name, object_name=object_name, data_type="PIL")
|
||||
# 判断图片格式,如果是RGBA 则贴在一张纯白图片上 防止透明转黑
|
||||
if image.mode == "RGBA":
|
||||
new_background = Image.new('RGB', image.size, (255, 255, 255))
|
||||
new_background.paste(image, mask=image.split()[3])
|
||||
image = new_background
|
||||
print_dict['image'] = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR)
|
||||
return print_dict
|
||||
|
||||
def crop_image(self, image, image_size_h, image_size_w, location, print_shape):
|
||||
print_w = print_shape[1]
|
||||
print_h = print_shape[0]
|
||||
|
||||
random.seed(self.random_seed)
|
||||
# logging.info(f'overall print location : {location}')
|
||||
# x_offset = random.randint(0, image.shape[0] - image_size_h)
|
||||
# y_offset = random.randint(0, image.shape[1] - image_size_w)
|
||||
|
||||
# 1.拿到偏移量后和resize后的print宽高取余 得到真正偏移量
|
||||
x_offset = print_w - int(location[0][1] % print_w)
|
||||
y_offset = print_w - int(location[0][0] % print_h)
|
||||
|
||||
# y_offset = int(location[0][0])
|
||||
# x_offset = int(location[0][1])
|
||||
|
||||
if len(image.shape) == 2:
|
||||
image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w]
|
||||
elif len(image.shape) == 3:
|
||||
image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w, :]
|
||||
return image
|
||||
|
||||
@staticmethod
|
||||
def get_low_high_lab(Lab_value, L=False):
|
||||
if L:
|
||||
high = Lab_value + 30 if Lab_value + 30 < 255 else 255
|
||||
low = Lab_value - 30 if Lab_value - 30 > 0 else 0
|
||||
else:
|
||||
high = Lab_value + 30 if Lab_value + 30 < 255 else 255
|
||||
low = Lab_value - 30 if Lab_value - 30 > 0 else 0
|
||||
return high, low
|
||||
|
||||
@staticmethod
|
||||
def img_rotate(image, angel, scale):
|
||||
"""顺时针旋转图像任意角度
|
||||
|
||||
Args:
|
||||
image (np.array): [原始图像]
|
||||
angel (float): [逆时针旋转的角度]
|
||||
|
||||
Returns:
|
||||
[array]: [旋转后的图像]
|
||||
"""
|
||||
|
||||
h, w = image.shape[:2]
|
||||
center = (w // 2, h // 2)
|
||||
# if type(angel) is not int:
|
||||
# angel = 0
|
||||
M = cv2.getRotationMatrix2D(center, -angel, scale)
|
||||
# 调整旋转后的图像长宽
|
||||
rotated_h = int((w * np.abs(M[0, 1]) + (h * np.abs(M[0, 0]))))
|
||||
rotated_w = int((h * np.abs(M[0, 1]) + (w * np.abs(M[0, 0]))))
|
||||
M[0, 2] += (rotated_w - w) // 2
|
||||
M[1, 2] += (rotated_h - h) // 2
|
||||
# 旋转图像
|
||||
rotated_img = cv2.warpAffine(image, M, (rotated_w, rotated_h))
|
||||
|
||||
return rotated_img, ((rotated_img.shape[1] - image.shape[1] * scale) // 2, (rotated_img.shape[0] - image.shape[0] * scale) // 2)
|
||||
# return rotated_img, (0, 0)
|
||||
|
||||
@staticmethod
|
||||
def rotate_crop_image(img, angle, crop):
|
||||
"""
|
||||
angle: 旋转的角度
|
||||
crop: 是否需要进行裁剪,布尔向量
|
||||
"""
|
||||
crop_image = lambda img, x0, y0, w, h: img[y0:y0 + h, x0:x0 + w]
|
||||
w, h = img.shape[:2]
|
||||
# 旋转角度的周期是360°
|
||||
angle %= 360
|
||||
# 计算仿射变换矩阵
|
||||
M_rotation = cv2.getRotationMatrix2D((w / 2, h / 2), angle, 1)
|
||||
# 得到旋转后的图像
|
||||
img_rotated = cv2.warpAffine(img, M_rotation, (w, h))
|
||||
|
||||
# 如果需要去除黑边
|
||||
if crop:
|
||||
# 裁剪角度的等效周期是180°
|
||||
angle_crop = angle % 180
|
||||
if angle > 90:
|
||||
angle_crop = 180 - angle_crop
|
||||
# 转化角度为弧度
|
||||
theta = angle_crop * np.pi / 180
|
||||
# 计算高宽比
|
||||
hw_ratio = float(h) / float(w)
|
||||
# 计算裁剪边长系数的分子项
|
||||
tan_theta = np.tan(theta)
|
||||
numerator = np.cos(theta) + np.sin(theta) * np.tan(theta)
|
||||
|
||||
# 计算分母中和高宽比相关的项
|
||||
r = hw_ratio if h > w else 1 / hw_ratio
|
||||
# 计算分母项
|
||||
denominator = r * tan_theta + 1
|
||||
# 最终的边长系数
|
||||
crop_mult = numerator / denominator
|
||||
|
||||
# 得到裁剪区域
|
||||
w_crop = int(crop_mult * w)
|
||||
h_crop = int(crop_mult * h)
|
||||
x0 = int((w - w_crop) / 2)
|
||||
y0 = int((h - h_crop) / 2)
|
||||
|
||||
img_rotated = crop_image(img_rotated, x0, y0, w_crop, h_crop)
|
||||
|
||||
return img_rotated
|
||||
|
||||
@staticmethod
|
||||
def read_image(image_url):
|
||||
image = oss_get_image(bucket=image_url.split("/", 1)[0], object_name=image_url.split("/", 1)[1], data_type="cv2")
|
||||
if image.shape[2] == 4:
|
||||
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGRA2RGBA)
|
||||
image = Image.fromarray(image_rgb)
|
||||
image_mode = "RGBA"
|
||||
else:
|
||||
image_mode = "RGB"
|
||||
return image, image_mode
|
||||
|
||||
@staticmethod
|
||||
def resize_and_crop(img, target_width, target_height):
|
||||
# 获取原始图像的尺寸
|
||||
original_height, original_width = img.shape[:2]
|
||||
|
||||
# 计算目标尺寸的宽高比
|
||||
target_ratio = target_width / target_height
|
||||
|
||||
# 计算原始图像的宽高比
|
||||
original_ratio = original_width / original_height
|
||||
|
||||
# 调整尺寸
|
||||
if original_ratio > target_ratio:
|
||||
# 原始图像更宽,按高度resize,然后裁剪宽度
|
||||
new_height = target_height
|
||||
new_width = int(original_width * (target_height / original_height))
|
||||
resized_img = cv2.resize(img, (new_width, new_height))
|
||||
# 裁剪宽度
|
||||
start_x = (new_width - target_width) // 2
|
||||
cropped_img = resized_img[:, start_x:start_x + target_width]
|
||||
else:
|
||||
# 原始图像更高,按宽度resize,然后裁剪高度
|
||||
new_width = target_width
|
||||
new_height = int(original_height * (target_width / original_width))
|
||||
resized_img = cv2.resize(img, (new_width, new_height))
|
||||
# 裁剪高度
|
||||
start_y = (new_height - target_height) // 2
|
||||
cropped_img = resized_img[start_y:start_y + target_height, :]
|
||||
|
||||
return cropped_img
|
||||
49
app/service/design/design_batch/items/pipeline/scale.py
Normal file
49
app/service/design/design_batch/items/pipeline/scale.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import math
|
||||
|
||||
import cv2
|
||||
|
||||
|
||||
class Scaling:
|
||||
def __call__(self, result):
|
||||
if result['keypoint'] in ['waistband', 'shoulder', 'head_point']:
|
||||
# milvus_db_keypoint_cache
|
||||
distance_clo = math.sqrt(
|
||||
(int(result['clothes_keypoint'][result['keypoint'] + '_left'][0]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'][0])) ** 2
|
||||
+
|
||||
(int(result['clothes_keypoint'][result['keypoint'] + '_left'][1]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'][1])) ** 2
|
||||
)
|
||||
|
||||
distance_bdy = math.sqrt(
|
||||
(int(result['body_point_test'][result['keypoint'] + '_left'][0])
|
||||
-
|
||||
int(result['body_point_test'][result['keypoint'] + '_right'][0])) ** 2 + 1
|
||||
)
|
||||
|
||||
if distance_clo == 0:
|
||||
result['scale'] = 1
|
||||
else:
|
||||
result['scale'] = distance_bdy / distance_clo
|
||||
elif result['keypoint'] == 'toe':
|
||||
distance_bdy = math.sqrt(
|
||||
(int(result['body_point_test']['foot_length'][0]) - int(result['body_point_test']['foot_length'][2])) ** 2
|
||||
+
|
||||
(int(result['body_point_test']['foot_length'][1]) - int(result['body_point_test']['foot_length'][3])) ** 2
|
||||
)
|
||||
|
||||
Blur = cv2.GaussianBlur(result['gray'], (3, 3), 0)
|
||||
Edge = cv2.Canny(Blur, 10, 200)
|
||||
Edge = cv2.dilate(Edge, None)
|
||||
Edge = cv2.erode(Edge, None)
|
||||
Contour, _ = cv2.findContours(Edge, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
Contours = sorted(Contour, key=cv2.contourArea, reverse=True)
|
||||
|
||||
Max_contour = Contours[0]
|
||||
x, y, w, h = cv2.boundingRect(Max_contour)
|
||||
width = w
|
||||
distance_clo = width
|
||||
result['scale'] = distance_bdy / distance_clo
|
||||
elif result['keypoint'] == 'hand_point':
|
||||
result['scale'] = result['scale_bag']
|
||||
elif result['keypoint'] == 'ear_point':
|
||||
result['scale'] = result['scale_earrings']
|
||||
return result
|
||||
@@ -0,0 +1,67 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from app.core.config import SEG_CACHE_PATH
|
||||
from app.service.design.utils.design_ensemble import get_seg_result
|
||||
from app.service.utils.oss_client import oss_get_image
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class Segmentation:
|
||||
def __call__(self, result):
|
||||
if "seg_mask_url" in result.keys() and result['seg_mask_url'] != "":
|
||||
seg_mask = oss_get_image(bucket=result['seg_mask_url'].split('/')[0], object_name=result['seg_mask_url'][result['seg_mask_url'].find('/') + 1:], data_type="cv2")
|
||||
seg_mask = cv2.resize(seg_mask, (result['img_shape'][1], result['img_shape'][0]), interpolation=cv2.INTER_NEAREST)
|
||||
# 转换颜色空间为 RGB(OpenCV 默认是 BGR)
|
||||
image_rgb = cv2.cvtColor(seg_mask, cv2.COLOR_BGR2RGB)
|
||||
|
||||
r, g, b = cv2.split(image_rgb)
|
||||
red_mask = r > g
|
||||
green_mask = g > r
|
||||
|
||||
# 创建红色和绿色掩码
|
||||
result['front_mask'] = np.array(red_mask, dtype=np.uint8) * 255
|
||||
result['back_mask'] = np.array(green_mask, dtype=np.uint8) * 255
|
||||
result['mask'] = result['front_mask'] + result['back_mask']
|
||||
else:
|
||||
# 本地查询seg 缓存是否存在
|
||||
_, seg_result = self.load_seg_result(result["image_id"])
|
||||
result['seg_result'] = seg_result
|
||||
if not _:
|
||||
# 推理获得seg 结果
|
||||
seg_result = get_seg_result(result["image_id"], result['image'])[0]
|
||||
self.save_seg_result(seg_result, result['image_id'])
|
||||
# 处理前片后片
|
||||
temp_front = seg_result == 1.0
|
||||
result['front_mask'] = (255 * (temp_front + 0).astype(np.uint8))
|
||||
temp_back = seg_result == 2.0
|
||||
result['back_mask'] = (255 * (temp_back + 0).astype(np.uint8))
|
||||
result['mask'] = result['front_mask'] + result['back_mask']
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def save_seg_result(seg_result, image_id):
|
||||
file_path = f"{SEG_CACHE_PATH}{image_id}.npy"
|
||||
try:
|
||||
np.save(file_path, seg_result)
|
||||
print("保存成功", os.path.abspath(file_path))
|
||||
except Exception as e:
|
||||
print(f"保存失败: {e}")
|
||||
|
||||
@staticmethod
|
||||
def load_seg_result(image_id):
|
||||
file_path = f"{SEG_CACHE_PATH}{image_id}.npy"
|
||||
logger.info(f"load seg file name is :{SEG_CACHE_PATH}{image_id}.npy")
|
||||
try:
|
||||
seg_result = np.load(file_path)
|
||||
return True, seg_result
|
||||
except FileNotFoundError:
|
||||
print("文件不存在")
|
||||
return False, None
|
||||
except Exception as e:
|
||||
print(f"加载失败: {e}")
|
||||
return False, None
|
||||
71
app/service/design/design_batch/items/pipeline/split.py
Normal file
71
app/service/design/design_batch/items/pipeline/split.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import io
|
||||
import logging
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
from cv2 import cvtColor, COLOR_BGR2RGBA
|
||||
|
||||
from app.core.config import AIDA_CLOTHING
|
||||
from app.service.design.utils.conversion_image import rgb_to_rgba
|
||||
from app.service.design.utils.upload_image import upload_png_mask
|
||||
from app.service.utils.generate_uuid import generate_uuid
|
||||
from app.service.utils.oss_client import oss_upload_image
|
||||
|
||||
|
||||
class Split(object):
|
||||
def __call__(self, result):
|
||||
try:
|
||||
|
||||
if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms'):
|
||||
front_mask = result['front_mask']
|
||||
back_mask = result['back_mask']
|
||||
rgba_image = rgb_to_rgba(result['final_image'], front_mask + back_mask)
|
||||
new_size = (int(rgba_image.shape[1] * result["scale"] * result["resize_scale"][0]), int(rgba_image.shape[0] * result["scale"] * result["resize_scale"][1]))
|
||||
rgba_image = cv2.resize(rgba_image, new_size)
|
||||
result_front_image = np.zeros_like(rgba_image)
|
||||
front_mask = cv2.resize(front_mask, new_size)
|
||||
result_front_image[front_mask != 0] = rgba_image[front_mask != 0]
|
||||
result_front_image_pil = Image.fromarray(cvtColor(result_front_image, COLOR_BGR2RGBA))
|
||||
result['front_image'], result["front_image_url"], _ = upload_png_mask(result_front_image_pil, f'{generate_uuid()}', mask=None)
|
||||
|
||||
height, width = front_mask.shape
|
||||
mask_image = np.zeros((height, width, 3))
|
||||
mask_image[front_mask != 0] = [0, 0, 255]
|
||||
|
||||
if result["name"] in ('blouse', 'dress', 'outwear', 'tops'):
|
||||
result_back_image = np.zeros_like(rgba_image)
|
||||
back_mask = cv2.resize(back_mask, new_size)
|
||||
result_back_image[back_mask != 0] = rgba_image[back_mask != 0]
|
||||
result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA))
|
||||
result['back_image'], result["back_image_url"], _ = upload_png_mask(result_back_image_pil, f'{generate_uuid()}', mask=None)
|
||||
mask_image[back_mask != 0] = [0, 255, 0]
|
||||
|
||||
rbga_mask = rgb_to_rgba(mask_image, front_mask + back_mask)
|
||||
mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA))
|
||||
image_data = io.BytesIO()
|
||||
mask_pil.save(image_data, format='PNG')
|
||||
image_data.seek(0)
|
||||
image_bytes = image_data.read()
|
||||
req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes)
|
||||
result['mask_url'] = req.bucket_name + "/" + req.object_name
|
||||
else:
|
||||
rbga_mask = rgb_to_rgba(mask_image, front_mask)
|
||||
mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA))
|
||||
image_data = io.BytesIO()
|
||||
mask_pil.save(image_data, format='PNG')
|
||||
image_data.seek(0)
|
||||
image_bytes = image_data.read()
|
||||
req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes)
|
||||
result['mask_url'] = req.bucket_name + "/" + req.object_name
|
||||
result['back_image'] = None
|
||||
result["back_image_url"] = None
|
||||
# result["back_mask_url"] = None
|
||||
# result['back_mask_image'] = None
|
||||
# 创建中间图层
|
||||
result_pattern_image_rgba = rgb_to_rgba(result['pattern_image'], result['mask'])
|
||||
result_pattern_image_pil = Image.fromarray(cvtColor(result_pattern_image_rgba, COLOR_BGR2RGBA))
|
||||
result['pattern_image'], result['pattern_image_url'], _ = upload_png_mask(result_pattern_image_pil, f'{generate_uuid()}')
|
||||
return result
|
||||
except Exception as e:
|
||||
logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}")
|
||||
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
"""
|
||||
@Project :trinity_client
|
||||
@File :conversion_image.py
|
||||
@Author :周成融
|
||||
@Date :2023/8/21 10:40:29
|
||||
@detail :
|
||||
"""
|
||||
import numpy as np
|
||||
|
||||
|
||||
# def rgb_to_rgba(rgb_size, rgb_image, mask):
|
||||
# alpha_channel = np.full(rgb_size, 255, dtype=np.uint8)
|
||||
# # 创建四通道的结果图像
|
||||
# rgba_image = np.dstack((rgb_image, alpha_channel))
|
||||
# alpha_channel = np.where(mask > 0, 255, 0)
|
||||
# # 更新RGBA图像的透明度通道
|
||||
# rgba_image[:, :, 3] = alpha_channel
|
||||
# return rgba_image
|
||||
|
||||
def rgb_to_rgba(rgb_image, mask):
|
||||
# 创建全透明的alpha通道
|
||||
alpha_channel = np.where(mask > 0, 255, 0).astype(np.uint8)
|
||||
# 合并RGB图像和alpha通道
|
||||
rgba_image = np.dstack((rgb_image, alpha_channel))
|
||||
return rgba_image
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
image = open("")
|
||||
143
app/service/design/design_batch/items/utils/design_ensemble.py
Normal file
143
app/service/design/design_batch/items/utils/design_ensemble.py
Normal file
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
"""
|
||||
@Project :trinity_client
|
||||
@File :design_ensemble.py
|
||||
@Author :周成融
|
||||
@Date :2023/8/16 19:36:21
|
||||
@detail :发起请求 获取推理结果
|
||||
"""
|
||||
import logging
|
||||
|
||||
import cv2
|
||||
import mmcv
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
import tritonclient.http as httpclient
|
||||
|
||||
from app.core.config import *
|
||||
|
||||
"""
|
||||
keypoint
|
||||
预处理 推理 后处理
|
||||
"""
|
||||
|
||||
|
||||
def keypoint_preprocess(img_path):
|
||||
img = mmcv.imread(img_path)
|
||||
img_scale = (256, 256)
|
||||
h, w = img.shape[:2]
|
||||
img = cv2.resize(img, img_scale)
|
||||
w_scale = img_scale[0] / w
|
||||
h_scale = img_scale[1] / h
|
||||
img = mmcv.imnormalize(img, mean=np.array([123.675, 116.28, 103.53]), std=np.array([58.395, 57.12, 57.375]), to_rgb=True)
|
||||
preprocessed_img = np.expand_dims(img.transpose(2, 0, 1), axis=0)
|
||||
return preprocessed_img, (w_scale, h_scale)
|
||||
|
||||
|
||||
# @ RunTime
|
||||
# 推理
|
||||
def get_keypoint_result(image, site):
|
||||
keypoint_result = None
|
||||
try:
|
||||
image, scale_factor = keypoint_preprocess(image)
|
||||
client = httpclient.InferenceServerClient(url=DESIGN_MODEL_URL)
|
||||
transformed_img = image.astype(np.float32)
|
||||
inputs = [httpclient.InferInput(f"input", transformed_img.shape, datatype="FP32")]
|
||||
inputs[0].set_data_from_numpy(transformed_img, binary_data=True)
|
||||
outputs = [httpclient.InferRequestedOutput(f"output", binary_data=True)]
|
||||
results = client.infer(model_name=f"keypoint_{site}_ocrnet_hr18", inputs=inputs, outputs=outputs)
|
||||
inference_output = torch.from_numpy(results.as_numpy(f'output'))
|
||||
keypoint_result = keypoint_postprocess(inference_output, scale_factor)
|
||||
except Exception as e:
|
||||
logging.warning(f"get_keypoint_result : {e}")
|
||||
return keypoint_result
|
||||
|
||||
|
||||
def keypoint_postprocess(output, scale_factor):
|
||||
max_indices = torch.argmax(output.view(output.size(0), output.size(1), -1), dim=2).unsqueeze(dim=2)
|
||||
max_coords = torch.cat((max_indices / output.size(3), max_indices % output.size(3)), dim=2)
|
||||
segment_result = max_coords.numpy()
|
||||
scale_factor = [1 / x for x in scale_factor[::-1]]
|
||||
scale_matrix = np.diag(scale_factor)
|
||||
nan = np.isinf(scale_matrix)
|
||||
scale_matrix[nan] = 0
|
||||
return np.ceil(np.dot(segment_result, scale_matrix) * 4)
|
||||
|
||||
|
||||
"""
|
||||
seg
|
||||
预处理 推理 后处理
|
||||
"""
|
||||
|
||||
|
||||
# KNet
|
||||
def seg_preprocess(img_path):
|
||||
img = mmcv.imread(img_path)
|
||||
ori_shape = img.shape[:2]
|
||||
img_scale_w, img_scale_h = ori_shape
|
||||
if ori_shape[0] > 1024:
|
||||
img_scale_w = 1024
|
||||
if ori_shape[1] > 1024:
|
||||
img_scale_h = 1024
|
||||
# 如果图片size任意一边 大于 1024, 则会resize 成1024
|
||||
if ori_shape != (img_scale_w, img_scale_h):
|
||||
# mmcv.imresize(img, img_scale_h, img_scale_w) # 老代码 引以为戒!哈哈哈~ h和w写反了
|
||||
img = cv2.resize(img, (img_scale_h, img_scale_w))
|
||||
img = mmcv.imnormalize(img, mean=np.array([123.675, 116.28, 103.53]), std=np.array([58.395, 57.12, 57.375]), to_rgb=True)
|
||||
preprocessed_img = np.expand_dims(img.transpose(2, 0, 1), axis=0)
|
||||
return preprocessed_img, ori_shape
|
||||
|
||||
|
||||
# @ RunTime
|
||||
def get_seg_result(image_id, image):
|
||||
image, ori_shape = seg_preprocess(image)
|
||||
client = httpclient.InferenceServerClient(url=f"{DESIGN_MODEL_URL}")
|
||||
transformed_img = image.astype(np.float32)
|
||||
# 输入集
|
||||
inputs = [
|
||||
httpclient.InferInput(SEGMENTATION['input'], transformed_img.shape, datatype="FP32")
|
||||
]
|
||||
inputs[0].set_data_from_numpy(transformed_img, binary_data=True)
|
||||
# 输出集
|
||||
outputs = [
|
||||
httpclient.InferRequestedOutput(SEGMENTATION['output'], binary_data=True),
|
||||
]
|
||||
results = client.infer(model_name=SEGMENTATION['new_model_name'], inputs=inputs, outputs=outputs)
|
||||
# 推理
|
||||
# 取结果
|
||||
inference_output1 = results.as_numpy(SEGMENTATION['output'])
|
||||
seg_result = seg_postprocess(int(image_id), inference_output1, ori_shape)
|
||||
return seg_result
|
||||
|
||||
|
||||
# no cache
|
||||
def seg_postprocess(image_id, output, ori_shape):
|
||||
seg_logit = F.interpolate(torch.tensor(output).float(), size=ori_shape, scale_factor=None, mode='bilinear', align_corners=False)
|
||||
seg_pred = seg_logit.cpu().numpy()
|
||||
return seg_pred[0]
|
||||
|
||||
|
||||
def key_point_show(image_path, key_point_result=None):
|
||||
img = cv2.imread(image_path)
|
||||
points_list = key_point_result
|
||||
point_size = 1
|
||||
point_color = (0, 0, 255) # BGR
|
||||
thickness = 4 # 可以为 0 、4、8
|
||||
for point in points_list:
|
||||
cv2.circle(img, point[::-1], point_size, point_color, thickness)
|
||||
cv2.imshow("0", img)
|
||||
cv2.waitKey(0)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
image = cv2.imread("9070101c-e5be-49b5-9602-4113a968969b.png")
|
||||
a = get_keypoint_result(image, "up")
|
||||
new_list = []
|
||||
print(list)
|
||||
for i in a[0]:
|
||||
new_list.append((int(i[0]), int(i[1])))
|
||||
key_point_show("9070101c-e5be-49b5-9602-4113a968969b.png", new_list)
|
||||
# a = get_seg_result(1, image)
|
||||
print(a)
|
||||
99
app/service/design/design_batch/items/utils/redis_utils.py
Normal file
99
app/service/design/design_batch/items/utils/redis_utils.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import redis
|
||||
|
||||
from app.core.config import REDIS_HOST, REDIS_PORT
|
||||
|
||||
|
||||
class Redis(object):
|
||||
"""
|
||||
redis数据库操作
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def _get_r():
|
||||
host = REDIS_HOST
|
||||
port = REDIS_PORT
|
||||
db = 0
|
||||
r = redis.StrictRedis(host, port, db)
|
||||
return r
|
||||
|
||||
@classmethod
|
||||
def write(cls, key, value, expire=None):
|
||||
"""
|
||||
写入键值对
|
||||
"""
|
||||
# 判断是否有过期时间,没有就设置默认值
|
||||
if expire:
|
||||
expire_in_seconds = expire
|
||||
else:
|
||||
expire_in_seconds = 100
|
||||
r = cls._get_r()
|
||||
r.set(key, value, ex=expire_in_seconds)
|
||||
|
||||
@classmethod
|
||||
def read(cls, key):
|
||||
"""
|
||||
读取键值对内容
|
||||
"""
|
||||
r = cls._get_r()
|
||||
value = r.get(key)
|
||||
return value.decode('utf-8') if value else value
|
||||
|
||||
@classmethod
|
||||
def hset(cls, name, key, value):
|
||||
"""
|
||||
写入hash表
|
||||
"""
|
||||
r = cls._get_r()
|
||||
r.hset(name, key, value)
|
||||
|
||||
@classmethod
|
||||
def hget(cls, name, key):
|
||||
"""
|
||||
读取指定hash表的键值
|
||||
"""
|
||||
r = cls._get_r()
|
||||
value = r.hget(name, key)
|
||||
return value.decode('utf-8') if value else value
|
||||
|
||||
@classmethod
|
||||
def hgetall(cls, name):
|
||||
"""
|
||||
获取指定hash表所有的值
|
||||
"""
|
||||
r = cls._get_r()
|
||||
return r.hgetall(name)
|
||||
|
||||
@classmethod
|
||||
def delete(cls, *names):
|
||||
"""
|
||||
删除一个或者多个
|
||||
"""
|
||||
r = cls._get_r()
|
||||
r.delete(*names)
|
||||
|
||||
@classmethod
|
||||
def hdel(cls, name, key):
|
||||
"""
|
||||
删除指定hash表的键值
|
||||
"""
|
||||
r = cls._get_r()
|
||||
r.hdel(name, key)
|
||||
|
||||
@classmethod
|
||||
def expire(cls, name, expire=None):
|
||||
"""
|
||||
设置过期时间
|
||||
"""
|
||||
if expire:
|
||||
expire_in_seconds = expire
|
||||
else:
|
||||
expire_in_seconds = 100
|
||||
r = cls._get_r()
|
||||
r.expire(name, expire_in_seconds)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
redis_client = Redis()
|
||||
# print(redis_client.write(key="1230", value=0))
|
||||
redis_client.write(key="1230", value=10)
|
||||
# print(redis_client.read(key="1230"))
|
||||
181
app/service/design/design_batch/items/utils/synthesis_item.py
Normal file
181
app/service/design/design_batch/items/utils/synthesis_item.py
Normal file
@@ -0,0 +1,181 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
"""
|
||||
@Project :trinity_client
|
||||
@File :synthesis_item.py
|
||||
@Author :周成融
|
||||
@Date :2023/8/26 14:13:04
|
||||
@detail :
|
||||
"""
|
||||
import io
|
||||
import logging
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PIL import Image
|
||||
|
||||
from app.service.utils.generate_uuid import generate_uuid
|
||||
from app.service.utils.oss_client import oss_upload_image
|
||||
|
||||
|
||||
def positioning(all_mask_shape, mask_shape, offset):
|
||||
all_start = 0
|
||||
all_end = 0
|
||||
mask_start = 0
|
||||
mask_end = 0
|
||||
if offset == 0:
|
||||
all_start = 0
|
||||
all_end = min(all_mask_shape, mask_shape)
|
||||
|
||||
mask_start = 0
|
||||
mask_end = min(all_mask_shape, mask_shape)
|
||||
elif offset > 0:
|
||||
all_start = min(offset, all_mask_shape)
|
||||
all_end = min(offset + mask_shape, all_mask_shape)
|
||||
|
||||
mask_start = 0
|
||||
mask_end = 0 if offset > all_mask_shape else min(all_mask_shape - offset, mask_shape)
|
||||
elif offset < 0:
|
||||
if abs(offset) > mask_shape:
|
||||
all_start = 0
|
||||
all_end = 0
|
||||
else:
|
||||
all_start = 0
|
||||
if mask_shape - abs(offset) > all_mask_shape:
|
||||
all_end = min(mask_shape - abs(offset), all_mask_shape)
|
||||
else:
|
||||
all_end = mask_shape - abs(offset)
|
||||
|
||||
if abs(offset) > mask_shape:
|
||||
mask_start = mask_shape
|
||||
mask_end = mask_shape
|
||||
else:
|
||||
mask_start = abs(offset)
|
||||
if mask_shape - abs(offset) >= all_mask_shape:
|
||||
mask_end = all_mask_shape + abs(offset)
|
||||
else:
|
||||
mask_end = mask_shape
|
||||
return all_start, all_end, mask_start, mask_end
|
||||
|
||||
|
||||
# @RunTime
|
||||
def synthesis(data, size, basic_info):
|
||||
# 创建底图
|
||||
base_image = Image.new('RGBA', size, (0, 0, 0, 0))
|
||||
try:
|
||||
all_mask_shape = (size[1], size[0])
|
||||
body_mask = None
|
||||
for d in data:
|
||||
if d['name'] == 'body' or d['name'] == 'mannequin':
|
||||
# 创建一个新的宽高透明图像, 把模特贴上去获取mask
|
||||
transparent_image = Image.new("RGBA", size, (0, 0, 0, 0))
|
||||
transparent_image.paste(d['image'], (d['adaptive_position'][1], d['adaptive_position'][0]), d['image']) # 此处可变数组会被paste篡改值,所以使用下标获取position
|
||||
body_mask = np.array(transparent_image.split()[3])
|
||||
|
||||
# 根据新的坐标获取新的肩点
|
||||
left_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_left'], [d['adaptive_position'][1], d['adaptive_position'][0]])]
|
||||
right_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_right'], [d['adaptive_position'][1], d['adaptive_position'][0]])]
|
||||
body_mask[:min(left_shoulder[1], right_shoulder[1]), left_shoulder[0]:right_shoulder[0]] = 255
|
||||
_, binary_body_mask = cv2.threshold(body_mask, 127, 255, cv2.THRESH_BINARY)
|
||||
top_outer_mask = np.array(binary_body_mask)
|
||||
bottom_outer_mask = np.array(binary_body_mask)
|
||||
|
||||
top = True
|
||||
bottom = True
|
||||
i = len(data)
|
||||
while i:
|
||||
i -= 1
|
||||
if top and data[i]['name'] in ["blouse_front", "outwear_front", "dress_front", "tops_front"]:
|
||||
top = False
|
||||
mask_shape = data[i]['mask'].shape
|
||||
y_offset, x_offset = data[i]['adaptive_position']
|
||||
# 初始化叠加区域的起始和结束位置
|
||||
all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset)
|
||||
all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset)
|
||||
# 将叠加区域赋值为相应的像素值
|
||||
_, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY)
|
||||
background = np.zeros_like(top_outer_mask)
|
||||
background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end]
|
||||
top_outer_mask = background + top_outer_mask
|
||||
elif bottom and data[i]['name'] in ["trousers_front", "skirt_front", "bottoms_front", "dress_front"]:
|
||||
bottom = False
|
||||
mask_shape = data[i]['mask'].shape
|
||||
y_offset, x_offset = data[i]['adaptive_position']
|
||||
# 初始化叠加区域的起始和结束位置
|
||||
all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset)
|
||||
all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset)
|
||||
# 将叠加区域赋值为相应的像素值
|
||||
_, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY)
|
||||
background = np.zeros_like(top_outer_mask)
|
||||
background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end]
|
||||
bottom_outer_mask = background + bottom_outer_mask
|
||||
elif bottom is False and top is False:
|
||||
break
|
||||
|
||||
all_mask = cv2.bitwise_or(top_outer_mask, bottom_outer_mask)
|
||||
|
||||
for layer in data:
|
||||
if layer['image'] is not None:
|
||||
if layer['name'] != "body":
|
||||
test_image = Image.new('RGBA', size, (0, 0, 0, 0))
|
||||
test_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image'])
|
||||
mask_data = np.where(all_mask > 0, 255, 0).astype(np.uint8)
|
||||
mask_alpha = Image.fromarray(mask_data)
|
||||
cropped_image = Image.composite(test_image, Image.new("RGBA", test_image.size, (255, 255, 255, 0)), mask_alpha)
|
||||
base_image.paste(test_image, (0, 0), cropped_image) # test_image 已经按照坐标贴到最大宽值的图片上 坐着这里坐标为00
|
||||
else:
|
||||
base_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image'])
|
||||
|
||||
result_image = base_image
|
||||
|
||||
image_data = io.BytesIO()
|
||||
result_image.save(image_data, format='PNG')
|
||||
image_data.seek(0)
|
||||
|
||||
# oss upload
|
||||
image_bytes = image_data.read()
|
||||
bucket_name = "aida-results"
|
||||
object_name = f'result_{generate_uuid()}.png'
|
||||
req = oss_upload_image(bucket=bucket_name, object_name=object_name, image_bytes=image_bytes)
|
||||
return f"{bucket_name}/{object_name}"
|
||||
# return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
|
||||
|
||||
# object_name = f'result_{generate_uuid()}.png'
|
||||
# response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
|
||||
# object_url = f"aida-results/{object_name}"
|
||||
# if response['ResponseMetadata']['HTTPStatusCode'] == 200:
|
||||
# return object_url
|
||||
# else:
|
||||
# return ""
|
||||
|
||||
except Exception as e:
|
||||
logging.warning(f"synthesis runtime exception : {e}")
|
||||
|
||||
|
||||
def synthesis_single(front_image, back_image):
|
||||
result_image = None
|
||||
if front_image:
|
||||
result_image = front_image
|
||||
if back_image:
|
||||
result_image.paste(back_image, (0, 0), back_image)
|
||||
|
||||
# with io.BytesIO() as output:
|
||||
# result_image.save(output, format='PNG')
|
||||
# data = output.getvalue()
|
||||
# object_name = f'result_{generate_uuid()}.png'
|
||||
# response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png')
|
||||
# object_url = f"aida-results/{object_name}"
|
||||
# if response['ResponseMetadata']['HTTPStatusCode'] == 200:
|
||||
# return object_url
|
||||
# else:
|
||||
# return ""
|
||||
image_data = io.BytesIO()
|
||||
result_image.save(image_data, format='PNG')
|
||||
image_data.seek(0)
|
||||
image_bytes = image_data.read()
|
||||
# return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
|
||||
# oss upload
|
||||
bucket_name = 'aida-results'
|
||||
object_name = f'result_{generate_uuid()}.png'
|
||||
req = oss_upload_image(bucket=bucket_name, object_name=object_name, image_bytes=image_bytes)
|
||||
return f"{bucket_name}/{object_name}"
|
||||
55
app/service/design/design_batch/items/utils/upload_image.py
Normal file
55
app/service/design/design_batch/items/utils/upload_image.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
"""
|
||||
@Project :trinity_client
|
||||
@File :upload_image.py
|
||||
@Author :周成融
|
||||
@Date :2023/8/28 13:49:20
|
||||
@detail :
|
||||
"""
|
||||
import io
|
||||
import logging
|
||||
|
||||
import cv2
|
||||
|
||||
from app.core.config import *
|
||||
from app.service.utils.decorator import RunTime
|
||||
from app.service.utils.oss_client import oss_upload_image
|
||||
|
||||
|
||||
# @RunTime
|
||||
def upload_png_mask(front_image, object_name, mask=None):
|
||||
try:
|
||||
mask_url = None
|
||||
if mask is not None:
|
||||
mask_inverted = cv2.bitwise_not(mask)
|
||||
# 将掩模的3通道转换为4通道,白色部分不透明,黑色部分透明
|
||||
rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA)
|
||||
rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0]
|
||||
# image_bytes = io.BytesIO()
|
||||
# image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes())
|
||||
# image_bytes.seek(0)
|
||||
# mask_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'mask/mask_{object_name}.png', image_bytes, len(image_bytes.getvalue()), content_type='image/png').object_name}"
|
||||
# oss upload ####################
|
||||
req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"mask/mask_{object_name}.png", image_bytes=cv2.imencode('.png', rgba_image)[1])
|
||||
mask_url = f"{AIDA_CLOTHING}/mask/mask_{object_name}.png"
|
||||
|
||||
image_data = io.BytesIO()
|
||||
front_image.save(image_data, format='PNG')
|
||||
image_data.seek(0)
|
||||
image_bytes = image_data.read()
|
||||
# image_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'image/image_{object_name}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}"
|
||||
req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"image/image_{object_name}.png", image_bytes=image_bytes)
|
||||
image_url = f"{AIDA_CLOTHING}/image/image_{object_name}.png"
|
||||
return front_image, image_url, mask_url
|
||||
except Exception as e:
|
||||
logging.warning(f"upload_png_mask runtime exception : {e}")
|
||||
|
||||
|
||||
# @RunTime
|
||||
# def upload_png_mask(front_image, object_name, mask=None):
|
||||
# mask_url = None
|
||||
# if mask is not None:
|
||||
# mask_url = f"{AIDA_CLOTHING}/mask/mask_{object_name}.png"
|
||||
# image_url = f"{AIDA_CLOTHING}/image/image_{object_name}.png"
|
||||
# return front_image, image_url, mask_url
|
||||
Reference in New Issue
Block a user