From 44bb38094429e988d072d3c20984fd89040421f1 Mon Sep 17 00:00:00 2001 From: zhouchengrong Date: Thu, 12 Sep 2024 10:05:38 +0800 Subject: [PATCH] feat design batch fix --- .../request_data/requests_data.json | 90 +++ .../design/design_batch/items/__init__.py | 0 app/service/design/design_batch/items/item.py | 281 ++++++++++ .../design_batch/items/pipeline/__init__.py | 20 + .../design_batch/items/pipeline/color.py | 60 ++ .../items/pipeline/contour_detection.py | 37 ++ .../design_batch/items/pipeline/keypoint.py | 114 ++++ .../design_batch/items/pipeline/loading.py | 68 +++ .../items/pipeline/print_painting.py | 523 ++++++++++++++++++ .../design_batch/items/pipeline/scale.py | 49 ++ .../items/pipeline/segmentation.py | 67 +++ .../design_batch/items/pipeline/split.py | 71 +++ .../design_batch/items/utils/__init__.py | 0 .../items/utils/conversion_image.py | 31 ++ .../items/utils/design_ensemble.py | 143 +++++ .../design_batch/items/utils/redis_utils.py | 99 ++++ .../items/utils/synthesis_item.py | 181 ++++++ .../design_batch/items/utils/upload_image.py | 55 ++ 18 files changed, 1889 insertions(+) create mode 100644 app/design_batch/request_data/requests_data.json create mode 100644 app/service/design/design_batch/items/__init__.py create mode 100644 app/service/design/design_batch/items/item.py create mode 100644 app/service/design/design_batch/items/pipeline/__init__.py create mode 100644 app/service/design/design_batch/items/pipeline/color.py create mode 100644 app/service/design/design_batch/items/pipeline/contour_detection.py create mode 100644 app/service/design/design_batch/items/pipeline/keypoint.py create mode 100644 app/service/design/design_batch/items/pipeline/loading.py create mode 100644 app/service/design/design_batch/items/pipeline/print_painting.py create mode 100644 app/service/design/design_batch/items/pipeline/scale.py create mode 100644 app/service/design/design_batch/items/pipeline/segmentation.py create mode 100644 app/service/design/design_batch/items/pipeline/split.py create mode 100644 app/service/design/design_batch/items/utils/__init__.py create mode 100644 app/service/design/design_batch/items/utils/conversion_image.py create mode 100644 app/service/design/design_batch/items/utils/design_ensemble.py create mode 100644 app/service/design/design_batch/items/utils/redis_utils.py create mode 100644 app/service/design/design_batch/items/utils/synthesis_item.py create mode 100644 app/service/design/design_batch/items/utils/upload_image.py diff --git a/app/design_batch/request_data/requests_data.json b/app/design_batch/request_data/requests_data.json new file mode 100644 index 0000000..1dba8d1 --- /dev/null +++ b/app/design_batch/request_data/requests_data.json @@ -0,0 +1,90 @@ +{ + "objects": [ + { + "basic": { + "body_point_test": { + "waistband_right": [ + 201, + 242 + ], + "hand_point_right": [ + 222, + 312 + ], + "waistband_left": [ + 114, + 243 + ], + "hand_point_left": [ + 94, + 310 + ], + "shoulder_left": [ + 102, + 116 + ], + "shoulder_right": [ + 211, + 115 + ] + }, + "layer_order": true, + "scale_bag": 0.7, + "scale_earrings": 0.16, + "self_template": true, + "single_overall": "overall", + "switch_category": "" + }, + "items": [ + { + "businessId": 264931, + "color": "145 220 232", + "image_id": 96844, + "offset": [ + 0, + 0 + ], + "path": "aida-users/87/sketch/2aa7aad5-74bb-41fa-9cdf-f06611b3e89a-2-87.png", + "print": { + "element": { + "element_angle_list": [], + "element_path_list": [], + "element_scale_list": [], + "location": [] + }, + "overall": { + "location": [], + "print_angle_list": [], + "print_path_list": [], + "print_scale_list": [] + }, + "single": { + "location": [], + "print_angle_list": [], + "print_path_list": [], + "print_scale_list": [] + } + }, + "priority": 10, + "resize_scale": [ + 1.0, + 1.0 + ], + "type": "Dress" + }, + { + "body_path": "aida-sys-image/models/female/79805ec3-3f01-466d-91e0-36028d079699.png", + "image_id": 95444, + "type": "Body" + } + ] + } + + ], + "process_id": "87", + "tasks_id": , +} + + +//用 openai jsonl +// \ No newline at end of file diff --git a/app/service/design/design_batch/items/__init__.py b/app/service/design/design_batch/items/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/service/design/design_batch/items/item.py b/app/service/design/design_batch/items/item.py new file mode 100644 index 0000000..1e51fe8 --- /dev/null +++ b/app/service/design/design_batch/items/item.py @@ -0,0 +1,281 @@ +import time +from concurrent.futures import ThreadPoolExecutor +from pprint import pprint + +import cv2 + +from app.core.config import PRIORITY_DICT +from app.service.design.design_batch.items.pipeline import LoadImage, KeyPoint, Segmentation, Color, PrintPainting, Scaling, Split, ContourDetection, LoadBodyImage +from app.service.design.utils.synthesis_item import synthesis, synthesis_single + + +class BaseItem: + def __init__(self, data, basic): + self.result = data.copy() + self.result['name'] = data['type'].lower() + self.result.pop("type") + self.result.update(basic) + + +class TopItem(BaseItem): + def __init__(self, data, basic): + super().__init__(data, basic) + self.top_pipeline = [ + LoadImage(), + KeyPoint(), + Segmentation(), + Color(), + PrintPainting(), + Scaling(), + Split() + ] + + def process(self): + for item in self.top_pipeline: + self.result = item(self.result) + return self.result + + +class BottomItem(BaseItem): + def __init__(self, data, basic): + super().__init__(data, basic) + self.bottom_pipeline = [ + LoadImage(), + KeyPoint(), + ContourDetection(), + # Segmentation(), + Color(), + PrintPainting(), + Scaling(), + Split() + ] + + def process(self): + for item in self.bottom_pipeline: + self.result = item(self.result) + return self.result + + +class BodyItem(BaseItem): + def __init__(self, data, basic): + super().__init__(data, basic) + self.top_pipeline = [ + LoadBodyImage(), + ] + + def process(self): + for item in self.top_pipeline: + self.result = item(self.result) + return self.result + + +def process_item(item, basic): + if item['type'] == "Body": + body_server = BodyItem(data=item, basic=basic) + item_data = body_server.process() + elif item['type'].lower() in ['blouse', 'outwear', 'dress', 'tops']: + top_server = TopItem(data=item, basic=basic) + item_data = top_server.process() + else: + bottom_server = BottomItem(data=item, basic=basic) + item_data = bottom_server.process() + return item_data + + +def calculate_start_point(keypoint_type, scale, clothes_point, body_point, offset, resize_scale): + """ + Align left + Args: + keypoint_type: string, "waistband" | "shoulder" | "ear_point" + scale: float + clothes_point: dict{'left': [x1, y1, z1], 'right': [x2, y2, z2]} + body_point: dict, containing keypoint data of body figure + + Returns: + start_point: tuple (x', y') + x' = y_body - y1 * scale + offset + y' = x_body - x1 * scale + offset + + """ + side_indicator = f'{keypoint_type}_left' + start_point = ( + int(body_point[side_indicator][1] + offset[1] - int(clothes_point[side_indicator][0]) * scale), # y + int(body_point[side_indicator][0] + offset[0] - int(clothes_point[side_indicator][1]) * scale) # x + ) + return start_point + + +# 服装图层给数据组装 +def organize_clothing(layer): + # 起始坐标 + start_point = calculate_start_point(layer['keypoint'], layer['scale'], layer['clothes_keypoint'], layer['body_point_test'], layer["offset"], layer["resize_scale"]) + # 前片数据 + front_layer = dict(priority=layer['priority'] if layer.get("layer_order", False) else PRIORITY_DICT.get(f'{layer["name"].lower()}_front', None), + name=f'{layer["name"].lower()}_front', + image=layer["front_image"], + # mask_image=layer['front_mask_image'], + image_url=layer['front_image_url'], + mask_url=layer['mask_url'], + sacle=layer['scale'], + clothes_keypoint=layer['clothes_keypoint'], + position=start_point, + resize_scale=layer["resize_scale"], + mask=cv2.resize(layer['mask'], layer["front_image"].size), + gradient_string=layer['gradient_string'] if 'gradient_string' in layer.keys() else "", + pattern_image_url=layer['pattern_image_url'], + pattern_image=layer['pattern_image'] + + ) + # 后片数据 + back_layer = dict(priority=-layer.get("priority", 0) if layer.get("layer_order", False) else PRIORITY_DICT.get(f'{layer["name"].lower()}_back', None), + name=f'{layer["name"].lower()}_back', + image=layer["back_image"], + # mask_image=layer['back_mask_image'], + image_url=layer['back_image_url'], + mask_url=layer['mask_url'], + sacle=layer['scale'], + clothes_keypoint=layer['clothes_keypoint'], + position=start_point, + resize_scale=layer["resize_scale"], + mask=cv2.resize(layer['mask'], layer["front_image"].size), + gradient_string=layer['gradient_string'] if 'gradient_string' in layer.keys() else "", + pattern_image_url=layer['pattern_image_url'], + ) + return front_layer, back_layer + + +# 模特图层给数据组装 +def organize_body(layer): + body_layer = dict(priority=0, + name=layer["name"].lower(), + image=layer['body_image'], + image_url=layer['body_path'], + mask_image=None, + mask_url=None, + sacle=1, + # mask=layer['body_mask'], + position=(0, 0)) + return body_layer + + +def process_layer(item, layers): + if item['name'] == "mannequin": + body_layer = organize_body(item) + layers.append(body_layer) + return item['body_image'].size + else: + front_layer, back_layer = organize_clothing(item) + layers.append(front_layer) + layers.append(back_layer) + + +def process_object(object_data): + basic = object_data['basic'] + items_response = {'layers': []} + + if basic['single_overall'] == "overall": + item_results = [process_item(item, basic) for item in object_data['items']] + layers = [] + futures = [] + body_size = None + for item in item_results: + futures = [process_layer(item, layers)] + for future in futures: + if future is not None: + body_size = future + layers = sorted(layers, key=lambda s: s.get("priority", float('inf'))) + + layers, new_size = update_base_size_priority(layers, body_size) + + for lay in layers: + items_response['layers'].append({ + 'image_category': lay['name'], + 'position': lay['position'], + 'priority': lay.get("priority", None), + 'resize_scale': lay['resize_scale'] if "resize_scale" in lay.keys() else None, + 'image_size': lay['image'] if lay['image'] is None else lay['image'].size, + 'gradient_string': lay['gradient_string'] if 'gradient_string' in lay.keys() else "", + 'mask_url': lay['mask_url'], + 'image_url': lay['image_url'] if 'image_url' in lay.keys() else None, + 'pattern_image_url': lay['pattern_image_url'] if 'pattern_image_url' in lay.keys() else None, + + # 'image': lay['image'], + # 'mask_image': lay['mask_image'], + }) + items_response['synthesis_url'] = synthesis(layers, new_size, basic) + else: + item_results = process_item(object_data['items'][0], basic) + items_response['layers'].append({ + 'image_category': f"{item_results['name']}_front", + 'image_size': item_results['back_image'].size if item_results['back_image'] else None, + 'position': None, + 'priority': 0, + 'image_url': item_results['front_image_url'], + 'mask_url': item_results['mask_url'], + "gradient_string": item_results['gradient_string'] if 'gradient_string' in item_results.keys() else "", + 'pattern_image_url': item_results['pattern_image_url'] if 'pattern_image_url' in item_results.keys() else None, + + }) + items_response['layers'].append({ + 'image_category': f"{item_results['name']}_back", + 'image_size': item_results['front_image'].size if item_results['front_image'] else None, + 'position': None, + 'priority': 0, + 'image_url': item_results['back_image_url'], + 'mask_url': item_results['mask_url'], + "gradient_string": item_results['gradient_string'] if 'gradient_string' in item_results.keys() else "", + 'pattern_image_url': item_results['pattern_image_url'] if 'pattern_image_url' in item_results.keys() else None, + + }) + items_response['synthesis_url'] = synthesis_single(item_results['front_image'], item_results['back_image']) + return items_response + + +def update_base_size_priority(layers, size): + # 计算透明背景图片的宽度 + min_x = min(info['position'][1] for info in layers) + x_list = [] + for info in layers: + if info['image'] is not None: + x_list.append(info['position'][1] + info['image'].width) + max_x = max(x_list) + new_width = max_x - min_x + new_height = 700 + # 更新坐标 + for info in layers: + info['adaptive_position'] = (info['position'][0], info['position'][1] - min_x) + return layers, (new_width, new_height) + + +def run(): + object = {"objects": [{"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 116441, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/outwear_p3139.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 81518, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0628000071.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 65687, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/outwear_746.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 90051, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0628000864.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 67420, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0825001648.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 90354, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0628001300.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 67420, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/0825001648.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}, {"basic": {"body_point_test": {"waistband_right": [199, 239], "hand_point_right": [220, 308], "waistband_left": [113, 239], "hand_point_left": [92, 310], "shoulder_left": [99, 111], "shoulder_right": [214, 111]}, "layer_order": False, "scale_bag": 0.7, "scale_earrings": 0.16, "self_template": True, "single_overall": "single", "switch_category": "Outwear"}, "items": [ + {"color": "189 112 112", "icon": "none", "image_id": 101477, "offset": [1, 1], "path": "aida-sys-image/images/female/outwear/903000063.jpg", "print": {"element": {"element_angle_list": [], "element_path_list": [], "element_scale_list": [], "location": []}, "overall": {"location": [[0.0, 0.0]], "print_angle_list": [0.0, 0.0], "print_path_list": [], "print_scale_list": [0.0, 0.0]}, "single": {"location": [], "print_angle_list": [], "print_path_list": [], "print_scale_list": []}}, + "resize_scale": [1.0, 1.0], "type": "Outwear"}]}], "process_id": "3615898424593104"} + + object_result = {} + with ThreadPoolExecutor() as executor: + results = list(executor.map(process_object, object['objects'])) + for i, result in enumerate(results): + object_result[i] = result + + pprint(object_result) + + +if __name__ == '__main__': + start_time = time.time() + run() + print(time.time() - start_time) diff --git a/app/service/design/design_batch/items/pipeline/__init__.py b/app/service/design/design_batch/items/pipeline/__init__.py new file mode 100644 index 0000000..ec55933 --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/__init__.py @@ -0,0 +1,20 @@ +from .color import Color +from .contour_detection import ContourDetection +from .keypoint import KeyPoint +from .keypoint import KeyPoint +from .loading import LoadImage, LoadBodyImage +from .print_painting import PrintPainting +from .scale import Scaling +from .segmentation import Segmentation +from .split import Split + +__all__ = [ + 'LoadBodyImage', 'LoadImage', + 'KeyPoint', + 'ContourDetection', + 'Segmentation', + 'Color', + 'PrintPainting', + 'Scaling', + 'Split' +] diff --git a/app/service/design/design_batch/items/pipeline/color.py b/app/service/design/design_batch/items/pipeline/color.py new file mode 100644 index 0000000..bc3676f --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/color.py @@ -0,0 +1,60 @@ +import logging + +import cv2 +import numpy as np + +from app.service.utils.oss_client import oss_get_image + +logger = logging.getLogger() + + +class Color: + def __call__(self, result): + dim_image_h, dim_image_w = result['image'].shape[0:2] + if "gradient" in result.keys() and result['gradient'] != "": + bucket_name = result['gradient'].split('/')[0] + object_name = result['gradient'][result['gradient'].find('/') + 1:] + pattern = self.get_gradient(bucket_name=bucket_name, object_name=object_name) + resize_pattern = cv2.resize(pattern, (dim_image_w, dim_image_h), interpolation=cv2.INTER_AREA) + else: + pattern = self.get_pattern(result['color']) + resize_pattern = cv2.resize(pattern, (dim_image_w, dim_image_h), interpolation=cv2.INTER_AREA) + closed_mo = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2) + gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2) + get_image_fir = resize_pattern * (closed_mo / 255) * (gray_mo / 255) + result['pattern_image'] = get_image_fir.astype(np.uint8) + result['final_image'] = result['pattern_image'] + canvas = np.full_like(result['final_image'], 255) + temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2) + tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8) + temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2) + tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8) + result['single_image'] = cv2.add(tmp1, tmp2) + result['alpha'] = 100 / 255.0 + return result + + @staticmethod + def get_gradient(bucket_name, object_name): + # 获取渐变色图案 + image = oss_get_image(bucket=bucket_name, object_name=object_name, data_type="cv2") + if image.shape[2] == 4: + image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) + return image + + @staticmethod + def crop_image(image, image_size_h, image_size_w): + x_offset = np.random.randint(low=0, high=int(image_size_h / 5) - 6) + y_offset = np.random.randint(low=0, high=int(image_size_w / 5) - 6) + image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w, :] + return image + + @staticmethod + def get_pattern(single_color): + if single_color is None: + raise False + R, G, B = single_color.split(' ') + pattern = np.zeros([1, 1, 3], np.uint8) + pattern[0, 0, 0] = int(B) + pattern[0, 0, 1] = int(G) + pattern[0, 0, 2] = int(R) + return pattern diff --git a/app/service/design/design_batch/items/pipeline/contour_detection.py b/app/service/design/design_batch/items/pipeline/contour_detection.py new file mode 100644 index 0000000..2b76c0b --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/contour_detection.py @@ -0,0 +1,37 @@ +import cv2 +import numpy as np + + +class ContourDetection: + def __call__(self, result): + Contour = self.get_contours(result['image']) + Mask = np.zeros(result['image'].shape[:2], np.uint8) + if len(Contour): + Max_contour = Contour[0] + Epsilon = 0.001 * cv2.arcLength(Max_contour, True) + Approx = cv2.approxPolyDP(Max_contour, Epsilon, True) + cv2.drawContours(Mask, [Approx], -1, 255, -1) + else: + Mask = np.ones(result['image'].shape[:2], np.uint8) * 255 + # TODO 修复部分图片出现透明的情况 下版本上线 + # img2gray = cv2.cvtColor(result['image'], cv2.COLOR_BGR2GRAY) + # ret, Mask = cv2.threshold(img2gray, 126, 255, cv2.THRESH_BINARY) + # Mask = cv2.bitwise_not(Mask) + if result['pre_mask'] is None: + result['mask'] = Mask + else: + result['mask'] = cv2.bitwise_and(Mask, result['pre_mask']) + result['front_mask'] = result['mask'] + result['back_mask'] = result['mask'] + return result + + @staticmethod + def get_contours(image): + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + Edge = cv2.Canny(gray, 10, 150) + kernel = np.ones((5, 5), np.uint8) + Edge = cv2.dilate(Edge, kernel=kernel, iterations=1) + Edge = cv2.erode(Edge, kernel=kernel, iterations=1) + Contour, _ = cv2.findContours(Edge, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + Contour = sorted(Contour, key=cv2.contourArea, reverse=True) + return Contour diff --git a/app/service/design/design_batch/items/pipeline/keypoint.py b/app/service/design/design_batch/items/pipeline/keypoint.py new file mode 100644 index 0000000..243cf4e --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/keypoint.py @@ -0,0 +1,114 @@ +import logging + +import numpy as np +from pymilvus import MilvusClient + +from app.core.config import * +from app.service.design.utils.design_ensemble import get_keypoint_result + +logger = logging.getLogger(__name__) + + +class KeyPoint: + name = "KeyPoint" + + @classmethod + def get_name(cls): + return cls.name + + def __call__(self, result): + if result['name'] in ['blouse', 'skirt', 'dress', 'outwear', 'trousers', 'tops', 'bottoms']: # 查询是否有数据 且类别相同 相同则直接读 不同则推理后更新 + # result['clothes_keypoint'] = self.infer_keypoint_result(result) + site = 'up' if result['name'] in ['blouse', 'outwear', 'dress', 'tops'] else 'down' + # keypoint_cache = search_keypoint_cache(result["image_id"], site) + keypoint_cache = self.keypoint_cache(result, site) + # 取消向量查询 直接过模型推理 + # keypoint_cache = False + if keypoint_cache is False: + keypoint_infer_result, site = self.infer_keypoint_result(result) + result['clothes_keypoint'] = self.save_keypoint_cache(result["image_id"], keypoint_infer_result, site) + else: + result['clothes_keypoint'] = keypoint_cache + return result + + @staticmethod + def infer_keypoint_result(result): + site = 'up' if result['name'] in ['blouse', 'outwear', 'dress', 'tops'] else 'down' + keypoint_infer_result = get_keypoint_result(result["image"], site) # 推理结果 + return keypoint_infer_result, site + + @staticmethod + def save_keypoint_cache(keypoint_id, cache, site): + if site == "down": + zeros = np.zeros(20, dtype=int) + result = np.concatenate([zeros, cache.flatten()]) + else: + zeros = np.zeros(4, dtype=int) + result = np.concatenate([cache.flatten(), zeros]) + # 取消向量保存 直接拿结果 + data = [ + {"keypoint_id": keypoint_id, + "keypoint_site": site, + "keypoint_vector": result.tolist() + } + ] + try: + client = MilvusClient(uri=MILVUS_URL, token=MILVUS_TOKEN, db_name=MILVUS_ALIAS) + res = client.upsert(collection_name=MILVUS_TABLE_KEYPOINT, data=data) + client.close() + return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist())) + except Exception as e: + logger.info(f"save keypoint cache milvus error : {e}") + return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist())) + + @staticmethod + def update_keypoint_cache(keypoint_id, infer_result, search_result, site): + if site == "up": + # 需要的是up 即推理出来的是up 那么查询的就是down + result = np.concatenate([infer_result.flatten(), search_result[-4:]]) + else: + # 需要的是down 即推理出来的是down 那么查询的就是up + result = np.concatenate([search_result[:20], infer_result.flatten()]) + data = [ + {"keypoint_id": keypoint_id, + "keypoint_site": "all", + "keypoint_vector": result.tolist() + } + ] + + try: + client = MilvusClient(uri=MILVUS_URL, token=MILVUS_TOKEN, db_name=MILVUS_ALIAS) + client.upsert( + collection_name=MILVUS_TABLE_KEYPOINT, + data=data + ) + return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist())) + except Exception as e: + logger.info(f"save keypoint cache milvus error : {e}") + return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, result.reshape(12, 2).astype(int).tolist())) + + # @ RunTime + def keypoint_cache(self, result, site): + try: + client = MilvusClient(uri=MILVUS_URL, token=MILVUS_TOKEN, db_name=MILVUS_ALIAS) + keypoint_id = result['image_id'] + res = client.query( + collection_name=MILVUS_TABLE_KEYPOINT, + # ids=[keypoint_id], + filter=f"keypoint_id == {keypoint_id}", + output_fields=['keypoint_vector', 'keypoint_site'] + ) + if len(res) == 0: + # 没有结果 直接推理拿结果 并保存 + keypoint_infer_result, site = self.infer_keypoint_result(result) + return self.save_keypoint_cache(result['image_id'], keypoint_infer_result, site) + elif res[0]["keypoint_site"] == "all" or res[0]["keypoint_site"] == site: + # 需要的类型和查询的类型一致,或者查询的类型为all 则直接返回查询的结果 + return dict(zip(KEYPOINT_RESULT_TABLE_FIELD_SET, np.array(res[0]['keypoint_vector']).astype(int).reshape(12, 2).tolist())) + elif res[0]["keypoint_site"] != site: + # 需要的类型和查询到的不一致,则更新类型为all + keypoint_infer_result, site = self.infer_keypoint_result(result) + return self.update_keypoint_cache(result["image_id"], keypoint_infer_result, res[0]['keypoint_vector'], site) + except Exception as e: + logger.info(f"search keypoint cache milvus error {e}") + return False diff --git a/app/service/design/design_batch/items/pipeline/loading.py b/app/service/design/design_batch/items/pipeline/loading.py new file mode 100644 index 0000000..8786db0 --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/loading.py @@ -0,0 +1,68 @@ +import cv2 + +from app.service.utils.oss_client import oss_get_image + + +class LoadBodyImage: + name = "LoadBodyImage" + + @classmethod + def get_name(cls): + return cls.name + + def __call__(self, result): + result["name"] = "mannequin" + result['body_image'] = oss_get_image(bucket=result['body_path'].split("/", 1)[0], object_name=result['body_path'].split("/", 1)[1], data_type="PIL") + return result + + +class LoadImage: + name = "LoadImage" + + @classmethod + def get_name(cls): + return cls.name + + def __call__(self, result): + result['image'], result['pre_mask'] = self.read_image(result['path']) + result['gray'] = cv2.cvtColor(result['image'], cv2.COLOR_BGR2GRAY) + result['keypoint'] = self.get_keypoint(result['name']) + result['img_shape'] = result['image'].shape + result['ori_shape'] = result['image'].shape + return result + + @staticmethod + def read_image(image_path): + image_mask = None + image = oss_get_image(bucket=image_path.split("/", 1)[0], object_name=image_path.split("/", 1)[1], data_type="cv2") + if len(image.shape) == 2: + image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB) + if image.shape[2] == 4: # 如果是四通道 mask + image_mask = image[:, :, 3] + image = image[:, :, :3] + + if image.shape[:2] <= (50, 50): + # 计算新尺寸 + new_size = (image.shape[1] * 2, image.shape[0] * 2) + # 调整大小 + image = cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR) + return image, image_mask + + @staticmethod + def get_keypoint(name): + if name == 'blouse' or name == 'outwear' or name == 'dress' or name == 'tops': + keypoint = 'shoulder' + elif name == 'trousers' or name == 'skirt' or name == 'bottoms': + keypoint = 'waistband' + elif name == 'bag': + keypoint = 'hand_point' + elif name == 'shoes': + keypoint = 'toe' + elif name == 'hairstyle': + keypoint = 'head_point' + elif name == 'earring': + keypoint = 'ear_point' + else: + raise KeyError(f"{name} does not belong to item category list: blouse, outwear, dress, trousers, skirt, " + f"bag, shoes, hairstyle, earring.") + return keypoint diff --git a/app/service/design/design_batch/items/pipeline/print_painting.py b/app/service/design/design_batch/items/pipeline/print_painting.py new file mode 100644 index 0000000..a620872 --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/print_painting.py @@ -0,0 +1,523 @@ +import random + +import cv2 +import numpy as np +from PIL import Image + +from app.service.utils.oss_client import oss_get_image + + +class PrintPainting: + def __call__(self, result): + single_print = result['print']['single'] + overall_print = result['print']['overall'] + element_print = result['print']['element'] + result['single_image'] = None + result['print_image'] = None + if overall_print['print_path_list']: + painting_dict = {'dim_image_h': result['pattern_image'].shape[0], 'dim_image_w': result['pattern_image'].shape[1]} + result['print_image'] = result['pattern_image'] + if "print_angle_list" in overall_print.keys() and overall_print['print_angle_list'][0] != 0: + painting_dict = self.painting_collection(painting_dict, overall_print, print_trigger=True) + painting_dict['tile_print'] = self.rotate_crop_image(img=painting_dict['tile_print'], angle=-overall_print['print_angle_list'][0], crop=True) + painting_dict['mask_inv_print'] = self.rotate_crop_image(img=painting_dict['mask_inv_print'], angle=-overall_print['print_angle_list'][0], crop=True) + + # resize 到sketch大小 + painting_dict['tile_print'] = self.resize_and_crop(img=painting_dict['tile_print'], target_width=painting_dict['dim_image_w'], target_height=painting_dict['dim_image_h']) + painting_dict['mask_inv_print'] = self.resize_and_crop(img=painting_dict['mask_inv_print'], target_width=painting_dict['dim_image_w'], target_height=painting_dict['dim_image_h']) + else: + painting_dict = self.painting_collection(painting_dict, overall_print, print_trigger=True, is_single=False) + result['print_image'] = self.printpaint(result, painting_dict, print_=True) + result['single_image'] = result['final_image'] = result['pattern_image'] = result['print_image'] + + if single_print['print_path_list']: + print_background = np.zeros((result['pattern_image'].shape[0], result['pattern_image'].shape[1], 3), dtype=np.uint8) + mask_background = np.zeros((result['pattern_image'].shape[0], result['pattern_image'].shape[1], 3), dtype=np.uint8) + for i in range(len(single_print['print_path_list'])): + image, image_mode = self.read_image(single_print['print_path_list'][i]) + if image_mode == "RGBA": + new_size = (int(image.width * single_print['print_scale_list'][i]), int(image.height * single_print['print_scale_list'][i])) + + mask = image.split()[3] + resized_source = image.resize(new_size) + resized_source_mask = mask.resize(new_size) + + rotated_resized_source = resized_source.rotate(-single_print['print_angle_list'][i]) + rotated_resized_source_mask = resized_source_mask.rotate(-single_print['print_angle_list'][i]) + + source_image_pil = Image.fromarray(cv2.cvtColor(print_background, cv2.COLOR_BGR2RGB)) + source_image_pil_mask = Image.fromarray(cv2.cvtColor(mask_background, cv2.COLOR_BGR2RGB)) + + source_image_pil.paste(rotated_resized_source, (int(single_print['location'][i][0]), int(single_print['location'][i][1])), rotated_resized_source) + source_image_pil_mask.paste(rotated_resized_source_mask, (int(single_print['location'][i][0]), int(single_print['location'][i][1])), rotated_resized_source_mask) + + print_background = cv2.cvtColor(np.array(source_image_pil), cv2.COLOR_RGBA2BGR) + mask_background = cv2.cvtColor(np.array(source_image_pil_mask), cv2.COLOR_RGBA2BGR) + ret, mask_background = cv2.threshold(mask_background, 124, 255, cv2.THRESH_BINARY) + else: + mask = self.get_mask_inv(image) + mask = np.expand_dims(mask, axis=2) + mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) + mask = cv2.bitwise_not(mask) + # 旋转后的坐标需要重新算 + rotate_mask, _ = self.img_rotate(mask, single_print['print_angle_list'][i], single_print['print_scale_list'][i]) + rotate_image, rotated_new_size = self.img_rotate(image, single_print['print_angle_list'][i], single_print['print_scale_list'][i]) + # x, y = int(result['print']['location'][i][0] - rotated_new_size[0] - (rotate_mask.shape[0] - image.shape[0]) / 2), int(result['print']['location'][i][1] - rotated_new_size[1] - (rotate_mask.shape[1] - image.shape[1]) / 2) + x, y = int(single_print['location'][i][0] - rotated_new_size[0]), int(single_print['location'][i][1] - rotated_new_size[1]) + + image_x = print_background.shape[1] + image_y = print_background.shape[0] + print_x = rotate_image.shape[1] + print_y = rotate_image.shape[0] + + # 有bug + # if x + print_x > image_x: + # rotate_image = rotate_image[:, :x + print_x - image_x] + # rotate_mask = rotate_mask[:, :x + print_x - image_x] + # # + # if y + print_y > image_y: + # rotate_image = rotate_image[:y + print_y - image_y] + # rotate_mask = rotate_mask[:y + print_y - image_y] + + # 不能是并行 + # 当前第一轮的if (108以及115)是判断有没有过下界和右界。第二轮的是判断左上有没有超出。 如果这个样子的话,先裁了右边,再左移,region就会有问题 + # 先挪 再判断 最后裁剪 + + # 如果print旋转了 或者 print贴边了 则需要判断 判断左界和上界是否小于0 + if x <= 0: + rotate_image = rotate_image[:, -x:] + rotate_mask = rotate_mask[:, -x:] + start_x = x = 0 + else: + start_x = x + + if y <= 0: + rotate_image = rotate_image[-y:, :] + rotate_mask = rotate_mask[-y:, :] + start_y = y = 0 + else: + start_y = y + + # ------------------ + # 如果print-size大于image-size 则需要裁剪print + + if x + print_x > image_x: + rotate_image = rotate_image[:, :image_x - x] + rotate_mask = rotate_mask[:, :image_x - x] + + if y + print_y > image_y: + rotate_image = rotate_image[:image_y - y, :] + rotate_mask = rotate_mask[:image_y - y, :] + + # mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = cv2.bitwise_xor(mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]], rotate_mask) + # print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = cv2.add(print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]], rotate_image) + + # mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = rotate_mask + # print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image + mask_background = self.stack_prin(mask_background, result['pattern_image'], rotate_mask, start_y, y, start_x, x) + print_background = self.stack_prin(print_background, result['pattern_image'], rotate_image, start_y, y, start_x, x) + + # gray_image = cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY) + # print_background = cv2.bitwise_and(print_background, print_background, mask=gray_image) + + print_mask = cv2.bitwise_and(result['mask'], cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY)) + img_fg = cv2.bitwise_or(print_background, print_background, mask=print_mask) + img_bg = cv2.bitwise_and(result['pattern_image'], result['pattern_image'], mask=cv2.bitwise_not(print_mask)) + mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2) + gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2) + img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8) + result['final_image'] = cv2.add(img_bg, img_fg) + canvas = np.full_like(result['final_image'], 255) + temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2) + tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8) + temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2) + tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8) + result['single_image'] = cv2.add(tmp1, tmp2) + + if element_print['element_path_list']: + print_background = np.zeros((result['final_image'].shape[0], result['final_image'].shape[1], 3), dtype=np.uint8) + mask_background = np.zeros((result['final_image'].shape[0], result['final_image'].shape[1], 3), dtype=np.uint8) + for i in range(len(element_print['element_path_list'])): + image, image_mode = self.read_image(element_print['element_path_list'][i]) + if image_mode == "RGBA": + new_size = (int(image.width * element_print['element_scale_list'][i]), int(image.height * element_print['element_scale_list'][i])) + + mask = image.split()[3] + resized_source = image.resize(new_size) + resized_source_mask = mask.resize(new_size) + + rotated_resized_source = resized_source.rotate(-element_print['element_angle_list'][i]) + rotated_resized_source_mask = resized_source_mask.rotate(-element_print['element_angle_list'][i]) + + source_image_pil = Image.fromarray(cv2.cvtColor(print_background, cv2.COLOR_BGR2RGB)) + source_image_pil_mask = Image.fromarray(cv2.cvtColor(mask_background, cv2.COLOR_BGR2RGB)) + + source_image_pil.paste(rotated_resized_source, (int(element_print['location'][i][0]), int(element_print['location'][i][1])), rotated_resized_source) + source_image_pil_mask.paste(rotated_resized_source_mask, (int(element_print['location'][i][0]), int(element_print['location'][i][1])), rotated_resized_source_mask) + + print_background = cv2.cvtColor(np.array(source_image_pil), cv2.COLOR_RGBA2BGR) + mask_background = cv2.cvtColor(np.array(source_image_pil_mask), cv2.COLOR_RGBA2BGR) + else: + mask = self.get_mask_inv(image) + mask = np.expand_dims(mask, axis=2) + mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR) + mask = cv2.bitwise_not(mask) + # 旋转后的坐标需要重新算 + rotate_mask, _ = self.img_rotate(mask, element_print['element_angle_list'][i], element_print['element_scale_list'][i]) + rotate_image, rotated_new_size = self.img_rotate(image, element_print['element_angle_list'][i], element_print['element_scale_list'][i]) + # x, y = int(result['print']['location'][i][0] - rotated_new_size[0] - (rotate_mask.shape[0] - image.shape[0]) / 2), int(result['print']['location'][i][1] - rotated_new_size[1] - (rotate_mask.shape[1] - image.shape[1]) / 2) + x, y = int(element_print['location'][i][0] - rotated_new_size[0]), int(element_print['location'][i][1] - rotated_new_size[1]) + + image_x = print_background.shape[1] + image_y = print_background.shape[0] + print_x = rotate_image.shape[1] + print_y = rotate_image.shape[0] + + # 有bug + # if x + print_x > image_x: + # rotate_image = rotate_image[:, :x + print_x - image_x] + # rotate_mask = rotate_mask[:, :x + print_x - image_x] + # # + # if y + print_y > image_y: + # rotate_image = rotate_image[:y + print_y - image_y] + # rotate_mask = rotate_mask[:y + print_y - image_y] + + # 不能是并行 + # 当前第一轮的if (108以及115)是判断有没有过下界和右界。第二轮的是判断左上有没有超出。 如果这个样子的话,先裁了右边,再左移,region就会有问题 + # 先挪 再判断 最后裁剪 + + # 如果print旋转了 或者 print贴边了 则需要判断 判断左界和上界是否小于0 + if x <= 0: + rotate_image = rotate_image[:, -x:] + rotate_mask = rotate_mask[:, -x:] + start_x = x = 0 + else: + start_x = x + + if y <= 0: + rotate_image = rotate_image[-y:, :] + rotate_mask = rotate_mask[-y:, :] + start_y = y = 0 + else: + start_y = y + + # ------------------ + # 如果print-size大于image-size 则需要裁剪print + + if x + print_x > image_x: + rotate_image = rotate_image[:, :image_x - x] + rotate_mask = rotate_mask[:, :image_x - x] + + if y + print_y > image_y: + rotate_image = rotate_image[:image_y - y, :] + rotate_mask = rotate_mask[:image_y - y, :] + + # mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = cv2.bitwise_xor(mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]], rotate_mask) + # print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = cv2.add(print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]], rotate_image) + + # mask_background[start_y:y + rotate_mask.shape[0], start_x:x + rotate_mask.shape[1]] = rotate_mask + # print_background[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image + mask_background = self.stack_prin(mask_background, result['pattern_image'], rotate_mask, start_y, y, start_x, x) + print_background = self.stack_prin(print_background, result['pattern_image'], rotate_image, start_y, y, start_x, x) + + # gray_image = cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY) + # print_background = cv2.bitwise_and(print_background, print_background, mask=gray_image) + + print_mask = cv2.bitwise_and(result['mask'], cv2.cvtColor(mask_background, cv2.COLOR_BGR2GRAY)) + img_fg = cv2.bitwise_or(print_background, print_background, mask=print_mask) + # TODO element 丢失信息 + three_channel_image = cv2.merge([cv2.bitwise_not(print_mask), cv2.bitwise_not(print_mask), cv2.bitwise_not(print_mask)]) + img_bg = cv2.bitwise_and(result['final_image'], three_channel_image) + # mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2) + # gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2) + # img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8) + result['final_image'] = cv2.add(img_bg, img_fg) + canvas = np.full_like(result['final_image'], 255) + temp_bg = np.expand_dims(cv2.bitwise_not(result['mask']), axis=2).repeat(3, axis=2) + tmp1 = (canvas * (temp_bg / 255)).astype(np.uint8) + temp_fg = np.expand_dims(result['mask'], axis=2).repeat(3, axis=2) + tmp2 = (result['final_image'] * (temp_fg / 255)).astype(np.uint8) + result['single_image'] = cv2.add(tmp1, tmp2) + return result + + @staticmethod + def stack_prin(print_background, pattern_image, rotate_image, start_y, y, start_x, x): + temp_print = np.zeros((pattern_image.shape[0], pattern_image.shape[1], 3), dtype=np.uint8) + temp_print[start_y:y + rotate_image.shape[0], start_x:x + rotate_image.shape[1]] = rotate_image + img2gray = cv2.cvtColor(temp_print, cv2.COLOR_BGR2GRAY) + ret, mask_ = cv2.threshold(img2gray, 1, 255, cv2.THRESH_BINARY) + mask_inv = cv2.bitwise_not(mask_) + img1_bg = cv2.bitwise_and(print_background, print_background, mask=mask_inv) + img2_fg = cv2.bitwise_and(temp_print, temp_print, mask=mask_) + print_background = img1_bg + img2_fg + return print_background + + def painting_collection(self, painting_dict, print_dict, print_trigger=False, is_single=False): + if print_trigger: + print_ = self.get_print(print_dict) + painting_dict['Trigger'] = not is_single + painting_dict['location'] = print_['location'] + single_mask_inv_print = self.get_mask_inv(print_['image']) + dim_max = max(painting_dict['dim_image_h'], painting_dict['dim_image_w']) + dim_pattern = (int(dim_max * print_['scale'] / 5), int(dim_max * print_['scale'] / 5)) + if not is_single: + self.random_seed = random.randint(0, 1000) + # 如果print 模式为overall 且 有角度的话 , 组合的print为正方形,方便裁剪 + if "print_angle_list" in print_dict.keys() and print_dict['print_angle_list'][0] != 0: + painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], dim_max, dim_max, painting_dict['location'], trigger=True) + painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], dim_max, dim_max, painting_dict['location'], trigger=True) + else: + painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'], trigger=True) + painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location'], trigger=True) + else: + painting_dict['mask_inv_print'] = self.tile_image(single_mask_inv_print, dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location']) + painting_dict['tile_print'] = self.tile_image(print_['image'], dim_pattern, print_['scale'], painting_dict['dim_image_h'], painting_dict['dim_image_w'], painting_dict['location']) + painting_dict['dim_print_h'], painting_dict['dim_print_w'] = dim_pattern + return painting_dict + + def tile_image(self, pattern, dim, scale, dim_image_h, dim_image_w, location, trigger=False): + tile = None + if not trigger: + tile = cv2.resize(pattern, dim, interpolation=cv2.INTER_AREA) + else: + resize_pattern = cv2.resize(pattern, dim, interpolation=cv2.INTER_AREA) + if len(pattern.shape) == 2: + tile = np.tile(resize_pattern, (int((5 + 1) / scale) + 4, int((5 + 1) / scale) + 4)) + if len(pattern.shape) == 3: + tile = np.tile(resize_pattern, (int((5 + 1) / scale) + 4, int((5 + 1) / scale) + 4, 1)) + tile = self.crop_image(tile, dim_image_h, dim_image_w, location, resize_pattern.shape) + return tile + + def get_mask_inv(self, print_): + if print_[0][0][0] == 255 and print_[0][0][1] == 255 and print_[0][0][2] == 255: + bg_color = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)[0][0] + print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB) + bg_l, bg_a, bg_b = bg_color[0], bg_color[1], bg_color[2] + bg_L_high, bg_L_low = self.get_low_high_lab(bg_l, L=True) + bg_a_high, bg_a_low = self.get_low_high_lab(bg_a) + bg_b_high, bg_b_low = self.get_low_high_lab(bg_b) + lower = np.array([bg_L_low, bg_a_low, bg_b_low]) + upper = np.array([bg_L_high, bg_a_high, bg_b_high]) + mask_inv = cv2.inRange(print_tile, lower, upper) + return mask_inv + else: + # bg_color = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB)[0][0] + # print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB) + # bg_l, bg_a, bg_b = bg_color[0], bg_color[1], bg_color[2] + # bg_L_high, bg_L_low = self.get_low_high_lab(bg_l, L=True) + # bg_a_high, bg_a_low = self.get_low_high_lab(bg_a) + # bg_b_high, bg_b_low = self.get_low_high_lab(bg_b) + # lower = np.array([bg_L_low, bg_a_low, bg_b_low]) + # upper = np.array([bg_L_high, bg_a_high, bg_b_high]) + + # print_tile = cv2.cvtColor(print_, cv2.COLOR_BGR2LAB) + # mask_inv = cv2.cvtColor(print_tile, cv2.COLOR_BGR2GRAY) + + # mask_inv = cv2.cvtColor(print_, cv2.COLOR_BGR2GRAY) + mask_inv = np.zeros(print_.shape[:2], dtype=np.uint8) + return mask_inv + + @staticmethod + def printpaint(result, painting_dict, print_=False): + + if print_ and painting_dict['Trigger']: + print_mask = cv2.bitwise_and(result['mask'], cv2.bitwise_not(painting_dict['mask_inv_print'])) + img_fg = cv2.bitwise_and(painting_dict['tile_print'], painting_dict['tile_print'], mask=print_mask) + else: + print_mask = result['mask'] + img_fg = result['final_image'] + if print_ and not painting_dict['Trigger']: + index_ = None + try: + index_ = len(painting_dict['location']) + except: + assert f'there must be parameter of location if choose IfSingle' + + for i in range(index_): + start_h, start_w = int(painting_dict['location'][i][1]), int(painting_dict['location'][i][0]) + + length_h = min(start_h + painting_dict['dim_print_h'], img_fg.shape[0]) + length_w = min(start_w + painting_dict['dim_print_w'], img_fg.shape[1]) + + change_region = img_fg[start_h: length_h, start_w: length_w, :] + # problem in change_mask + change_mask = print_mask[start_h: length_h, start_w: length_w] + # get real part into change mask + _, change_mask = cv2.threshold(change_mask, 220, 255, cv2.THRESH_BINARY) + mask = cv2.bitwise_not(painting_dict['mask_inv_print']) + img_fg[start_h:start_h + painting_dict['dim_print_h'], start_w:start_w + painting_dict['dim_print_w'], :] = change_region + + clothes_mask_print = cv2.bitwise_not(print_mask) + + img_bg = cv2.bitwise_and(result['pattern_image'], result['pattern_image'], mask=clothes_mask_print) + mask_mo = np.expand_dims(print_mask, axis=2).repeat(3, axis=2) + gray_mo = np.expand_dims(result['gray'], axis=2).repeat(3, axis=2) + img_fg = (img_fg * (mask_mo / 255) * (gray_mo / 255)).astype(np.uint8) + print_image = cv2.add(img_bg, img_fg) + return print_image + + @staticmethod + def get_print(print_dict): + if 'print_scale_list' not in print_dict.keys() or print_dict['print_scale_list'][0] < 0.3: + print_dict['scale'] = 0.3 + else: + print_dict['scale'] = print_dict['print_scale_list'][0] + + bucket_name = print_dict['print_path_list'][0].split("/", 1)[0] + object_name = print_dict['print_path_list'][0].split("/", 1)[1] + image = oss_get_image(bucket=bucket_name, object_name=object_name, data_type="PIL") + # 判断图片格式,如果是RGBA 则贴在一张纯白图片上 防止透明转黑 + if image.mode == "RGBA": + new_background = Image.new('RGB', image.size, (255, 255, 255)) + new_background.paste(image, mask=image.split()[3]) + image = new_background + print_dict['image'] = cv2.cvtColor(np.asarray(image), cv2.COLOR_RGB2BGR) + return print_dict + + def crop_image(self, image, image_size_h, image_size_w, location, print_shape): + print_w = print_shape[1] + print_h = print_shape[0] + + random.seed(self.random_seed) + # logging.info(f'overall print location : {location}') + # x_offset = random.randint(0, image.shape[0] - image_size_h) + # y_offset = random.randint(0, image.shape[1] - image_size_w) + + # 1.拿到偏移量后和resize后的print宽高取余 得到真正偏移量 + x_offset = print_w - int(location[0][1] % print_w) + y_offset = print_w - int(location[0][0] % print_h) + + # y_offset = int(location[0][0]) + # x_offset = int(location[0][1]) + + if len(image.shape) == 2: + image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w] + elif len(image.shape) == 3: + image = image[x_offset: x_offset + image_size_h, y_offset: y_offset + image_size_w, :] + return image + + @staticmethod + def get_low_high_lab(Lab_value, L=False): + if L: + high = Lab_value + 30 if Lab_value + 30 < 255 else 255 + low = Lab_value - 30 if Lab_value - 30 > 0 else 0 + else: + high = Lab_value + 30 if Lab_value + 30 < 255 else 255 + low = Lab_value - 30 if Lab_value - 30 > 0 else 0 + return high, low + + @staticmethod + def img_rotate(image, angel, scale): + """顺时针旋转图像任意角度 + + Args: + image (np.array): [原始图像] + angel (float): [逆时针旋转的角度] + + Returns: + [array]: [旋转后的图像] + """ + + h, w = image.shape[:2] + center = (w // 2, h // 2) + # if type(angel) is not int: + # angel = 0 + M = cv2.getRotationMatrix2D(center, -angel, scale) + # 调整旋转后的图像长宽 + rotated_h = int((w * np.abs(M[0, 1]) + (h * np.abs(M[0, 0])))) + rotated_w = int((h * np.abs(M[0, 1]) + (w * np.abs(M[0, 0])))) + M[0, 2] += (rotated_w - w) // 2 + M[1, 2] += (rotated_h - h) // 2 + # 旋转图像 + rotated_img = cv2.warpAffine(image, M, (rotated_w, rotated_h)) + + return rotated_img, ((rotated_img.shape[1] - image.shape[1] * scale) // 2, (rotated_img.shape[0] - image.shape[0] * scale) // 2) + # return rotated_img, (0, 0) + + @staticmethod + def rotate_crop_image(img, angle, crop): + """ + angle: 旋转的角度 + crop: 是否需要进行裁剪,布尔向量 + """ + crop_image = lambda img, x0, y0, w, h: img[y0:y0 + h, x0:x0 + w] + w, h = img.shape[:2] + # 旋转角度的周期是360° + angle %= 360 + # 计算仿射变换矩阵 + M_rotation = cv2.getRotationMatrix2D((w / 2, h / 2), angle, 1) + # 得到旋转后的图像 + img_rotated = cv2.warpAffine(img, M_rotation, (w, h)) + + # 如果需要去除黑边 + if crop: + # 裁剪角度的等效周期是180° + angle_crop = angle % 180 + if angle > 90: + angle_crop = 180 - angle_crop + # 转化角度为弧度 + theta = angle_crop * np.pi / 180 + # 计算高宽比 + hw_ratio = float(h) / float(w) + # 计算裁剪边长系数的分子项 + tan_theta = np.tan(theta) + numerator = np.cos(theta) + np.sin(theta) * np.tan(theta) + + # 计算分母中和高宽比相关的项 + r = hw_ratio if h > w else 1 / hw_ratio + # 计算分母项 + denominator = r * tan_theta + 1 + # 最终的边长系数 + crop_mult = numerator / denominator + + # 得到裁剪区域 + w_crop = int(crop_mult * w) + h_crop = int(crop_mult * h) + x0 = int((w - w_crop) / 2) + y0 = int((h - h_crop) / 2) + + img_rotated = crop_image(img_rotated, x0, y0, w_crop, h_crop) + + return img_rotated + + @staticmethod + def read_image(image_url): + image = oss_get_image(bucket=image_url.split("/", 1)[0], object_name=image_url.split("/", 1)[1], data_type="cv2") + if image.shape[2] == 4: + image_rgb = cv2.cvtColor(image, cv2.COLOR_BGRA2RGBA) + image = Image.fromarray(image_rgb) + image_mode = "RGBA" + else: + image_mode = "RGB" + return image, image_mode + + @staticmethod + def resize_and_crop(img, target_width, target_height): + # 获取原始图像的尺寸 + original_height, original_width = img.shape[:2] + + # 计算目标尺寸的宽高比 + target_ratio = target_width / target_height + + # 计算原始图像的宽高比 + original_ratio = original_width / original_height + + # 调整尺寸 + if original_ratio > target_ratio: + # 原始图像更宽,按高度resize,然后裁剪宽度 + new_height = target_height + new_width = int(original_width * (target_height / original_height)) + resized_img = cv2.resize(img, (new_width, new_height)) + # 裁剪宽度 + start_x = (new_width - target_width) // 2 + cropped_img = resized_img[:, start_x:start_x + target_width] + else: + # 原始图像更高,按宽度resize,然后裁剪高度 + new_width = target_width + new_height = int(original_height * (target_width / original_width)) + resized_img = cv2.resize(img, (new_width, new_height)) + # 裁剪高度 + start_y = (new_height - target_height) // 2 + cropped_img = resized_img[start_y:start_y + target_height, :] + + return cropped_img diff --git a/app/service/design/design_batch/items/pipeline/scale.py b/app/service/design/design_batch/items/pipeline/scale.py new file mode 100644 index 0000000..732fcd8 --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/scale.py @@ -0,0 +1,49 @@ +import math + +import cv2 + + +class Scaling: + def __call__(self, result): + if result['keypoint'] in ['waistband', 'shoulder', 'head_point']: + # milvus_db_keypoint_cache + distance_clo = math.sqrt( + (int(result['clothes_keypoint'][result['keypoint'] + '_left'][0]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'][0])) ** 2 + + + (int(result['clothes_keypoint'][result['keypoint'] + '_left'][1]) - int(result['clothes_keypoint'][result['keypoint'] + '_right'][1])) ** 2 + ) + + distance_bdy = math.sqrt( + (int(result['body_point_test'][result['keypoint'] + '_left'][0]) + - + int(result['body_point_test'][result['keypoint'] + '_right'][0])) ** 2 + 1 + ) + + if distance_clo == 0: + result['scale'] = 1 + else: + result['scale'] = distance_bdy / distance_clo + elif result['keypoint'] == 'toe': + distance_bdy = math.sqrt( + (int(result['body_point_test']['foot_length'][0]) - int(result['body_point_test']['foot_length'][2])) ** 2 + + + (int(result['body_point_test']['foot_length'][1]) - int(result['body_point_test']['foot_length'][3])) ** 2 + ) + + Blur = cv2.GaussianBlur(result['gray'], (3, 3), 0) + Edge = cv2.Canny(Blur, 10, 200) + Edge = cv2.dilate(Edge, None) + Edge = cv2.erode(Edge, None) + Contour, _ = cv2.findContours(Edge, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + Contours = sorted(Contour, key=cv2.contourArea, reverse=True) + + Max_contour = Contours[0] + x, y, w, h = cv2.boundingRect(Max_contour) + width = w + distance_clo = width + result['scale'] = distance_bdy / distance_clo + elif result['keypoint'] == 'hand_point': + result['scale'] = result['scale_bag'] + elif result['keypoint'] == 'ear_point': + result['scale'] = result['scale_earrings'] + return result diff --git a/app/service/design/design_batch/items/pipeline/segmentation.py b/app/service/design/design_batch/items/pipeline/segmentation.py new file mode 100644 index 0000000..d8aa6d2 --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/segmentation.py @@ -0,0 +1,67 @@ +import logging +import os + +import cv2 +import numpy as np + +from app.core.config import SEG_CACHE_PATH +from app.service.design.utils.design_ensemble import get_seg_result +from app.service.utils.oss_client import oss_get_image + +logger = logging.getLogger() + + +class Segmentation: + def __call__(self, result): + if "seg_mask_url" in result.keys() and result['seg_mask_url'] != "": + seg_mask = oss_get_image(bucket=result['seg_mask_url'].split('/')[0], object_name=result['seg_mask_url'][result['seg_mask_url'].find('/') + 1:], data_type="cv2") + seg_mask = cv2.resize(seg_mask, (result['img_shape'][1], result['img_shape'][0]), interpolation=cv2.INTER_NEAREST) + # 转换颜色空间为 RGB(OpenCV 默认是 BGR) + image_rgb = cv2.cvtColor(seg_mask, cv2.COLOR_BGR2RGB) + + r, g, b = cv2.split(image_rgb) + red_mask = r > g + green_mask = g > r + + # 创建红色和绿色掩码 + result['front_mask'] = np.array(red_mask, dtype=np.uint8) * 255 + result['back_mask'] = np.array(green_mask, dtype=np.uint8) * 255 + result['mask'] = result['front_mask'] + result['back_mask'] + else: + # 本地查询seg 缓存是否存在 + _, seg_result = self.load_seg_result(result["image_id"]) + result['seg_result'] = seg_result + if not _: + # 推理获得seg 结果 + seg_result = get_seg_result(result["image_id"], result['image'])[0] + self.save_seg_result(seg_result, result['image_id']) + # 处理前片后片 + temp_front = seg_result == 1.0 + result['front_mask'] = (255 * (temp_front + 0).astype(np.uint8)) + temp_back = seg_result == 2.0 + result['back_mask'] = (255 * (temp_back + 0).astype(np.uint8)) + result['mask'] = result['front_mask'] + result['back_mask'] + return result + + @staticmethod + def save_seg_result(seg_result, image_id): + file_path = f"{SEG_CACHE_PATH}{image_id}.npy" + try: + np.save(file_path, seg_result) + print("保存成功", os.path.abspath(file_path)) + except Exception as e: + print(f"保存失败: {e}") + + @staticmethod + def load_seg_result(image_id): + file_path = f"{SEG_CACHE_PATH}{image_id}.npy" + logger.info(f"load seg file name is :{SEG_CACHE_PATH}{image_id}.npy") + try: + seg_result = np.load(file_path) + return True, seg_result + except FileNotFoundError: + print("文件不存在") + return False, None + except Exception as e: + print(f"加载失败: {e}") + return False, None diff --git a/app/service/design/design_batch/items/pipeline/split.py b/app/service/design/design_batch/items/pipeline/split.py new file mode 100644 index 0000000..2fba315 --- /dev/null +++ b/app/service/design/design_batch/items/pipeline/split.py @@ -0,0 +1,71 @@ +import io +import logging + +import cv2 +import numpy as np +from PIL import Image +from cv2 import cvtColor, COLOR_BGR2RGBA + +from app.core.config import AIDA_CLOTHING +from app.service.design.utils.conversion_image import rgb_to_rgba +from app.service.design.utils.upload_image import upload_png_mask +from app.service.utils.generate_uuid import generate_uuid +from app.service.utils.oss_client import oss_upload_image + + +class Split(object): + def __call__(self, result): + try: + + if result['name'] in ('outwear', 'dress', 'blouse', 'skirt', 'trousers', 'tops', 'bottoms'): + front_mask = result['front_mask'] + back_mask = result['back_mask'] + rgba_image = rgb_to_rgba(result['final_image'], front_mask + back_mask) + new_size = (int(rgba_image.shape[1] * result["scale"] * result["resize_scale"][0]), int(rgba_image.shape[0] * result["scale"] * result["resize_scale"][1])) + rgba_image = cv2.resize(rgba_image, new_size) + result_front_image = np.zeros_like(rgba_image) + front_mask = cv2.resize(front_mask, new_size) + result_front_image[front_mask != 0] = rgba_image[front_mask != 0] + result_front_image_pil = Image.fromarray(cvtColor(result_front_image, COLOR_BGR2RGBA)) + result['front_image'], result["front_image_url"], _ = upload_png_mask(result_front_image_pil, f'{generate_uuid()}', mask=None) + + height, width = front_mask.shape + mask_image = np.zeros((height, width, 3)) + mask_image[front_mask != 0] = [0, 0, 255] + + if result["name"] in ('blouse', 'dress', 'outwear', 'tops'): + result_back_image = np.zeros_like(rgba_image) + back_mask = cv2.resize(back_mask, new_size) + result_back_image[back_mask != 0] = rgba_image[back_mask != 0] + result_back_image_pil = Image.fromarray(cvtColor(result_back_image, COLOR_BGR2RGBA)) + result['back_image'], result["back_image_url"], _ = upload_png_mask(result_back_image_pil, f'{generate_uuid()}', mask=None) + mask_image[back_mask != 0] = [0, 255, 0] + + rbga_mask = rgb_to_rgba(mask_image, front_mask + back_mask) + mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA)) + image_data = io.BytesIO() + mask_pil.save(image_data, format='PNG') + image_data.seek(0) + image_bytes = image_data.read() + req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes) + result['mask_url'] = req.bucket_name + "/" + req.object_name + else: + rbga_mask = rgb_to_rgba(mask_image, front_mask) + mask_pil = Image.fromarray(cvtColor(rbga_mask.astype(np.uint8), COLOR_BGR2RGBA)) + image_data = io.BytesIO() + mask_pil.save(image_data, format='PNG') + image_data.seek(0) + image_bytes = image_data.read() + req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"mask/mask_{generate_uuid()}.png", image_bytes=image_bytes) + result['mask_url'] = req.bucket_name + "/" + req.object_name + result['back_image'] = None + result["back_image_url"] = None + # result["back_mask_url"] = None + # result['back_mask_image'] = None + # 创建中间图层 + result_pattern_image_rgba = rgb_to_rgba(result['pattern_image'], result['mask']) + result_pattern_image_pil = Image.fromarray(cvtColor(result_pattern_image_rgba, COLOR_BGR2RGBA)) + result['pattern_image'], result['pattern_image_url'], _ = upload_png_mask(result_pattern_image_pil, f'{generate_uuid()}') + return result + except Exception as e: + logging.warning(f"split runtime exception : {e} image_id : {result['image_id']}") diff --git a/app/service/design/design_batch/items/utils/__init__.py b/app/service/design/design_batch/items/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/service/design/design_batch/items/utils/conversion_image.py b/app/service/design/design_batch/items/utils/conversion_image.py new file mode 100644 index 0000000..11e39ae --- /dev/null +++ b/app/service/design/design_batch/items/utils/conversion_image.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +""" +@Project :trinity_client +@File :conversion_image.py +@Author :周成融 +@Date :2023/8/21 10:40:29 +@detail : +""" +import numpy as np + + +# def rgb_to_rgba(rgb_size, rgb_image, mask): +# alpha_channel = np.full(rgb_size, 255, dtype=np.uint8) +# # 创建四通道的结果图像 +# rgba_image = np.dstack((rgb_image, alpha_channel)) +# alpha_channel = np.where(mask > 0, 255, 0) +# # 更新RGBA图像的透明度通道 +# rgba_image[:, :, 3] = alpha_channel +# return rgba_image + +def rgb_to_rgba(rgb_image, mask): + # 创建全透明的alpha通道 + alpha_channel = np.where(mask > 0, 255, 0).astype(np.uint8) + # 合并RGB图像和alpha通道 + rgba_image = np.dstack((rgb_image, alpha_channel)) + return rgba_image + + +if __name__ == '__main__': + image = open("") diff --git a/app/service/design/design_batch/items/utils/design_ensemble.py b/app/service/design/design_batch/items/utils/design_ensemble.py new file mode 100644 index 0000000..f4f6a34 --- /dev/null +++ b/app/service/design/design_batch/items/utils/design_ensemble.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +""" +@Project :trinity_client +@File :design_ensemble.py +@Author :周成融 +@Date :2023/8/16 19:36:21 +@detail :发起请求 获取推理结果 +""" +import logging + +import cv2 +import mmcv +import numpy as np +import torch +import torch.nn.functional as F +import tritonclient.http as httpclient + +from app.core.config import * + +""" + keypoint + 预处理 推理 后处理 +""" + + +def keypoint_preprocess(img_path): + img = mmcv.imread(img_path) + img_scale = (256, 256) + h, w = img.shape[:2] + img = cv2.resize(img, img_scale) + w_scale = img_scale[0] / w + h_scale = img_scale[1] / h + img = mmcv.imnormalize(img, mean=np.array([123.675, 116.28, 103.53]), std=np.array([58.395, 57.12, 57.375]), to_rgb=True) + preprocessed_img = np.expand_dims(img.transpose(2, 0, 1), axis=0) + return preprocessed_img, (w_scale, h_scale) + + +# @ RunTime +# 推理 +def get_keypoint_result(image, site): + keypoint_result = None + try: + image, scale_factor = keypoint_preprocess(image) + client = httpclient.InferenceServerClient(url=DESIGN_MODEL_URL) + transformed_img = image.astype(np.float32) + inputs = [httpclient.InferInput(f"input", transformed_img.shape, datatype="FP32")] + inputs[0].set_data_from_numpy(transformed_img, binary_data=True) + outputs = [httpclient.InferRequestedOutput(f"output", binary_data=True)] + results = client.infer(model_name=f"keypoint_{site}_ocrnet_hr18", inputs=inputs, outputs=outputs) + inference_output = torch.from_numpy(results.as_numpy(f'output')) + keypoint_result = keypoint_postprocess(inference_output, scale_factor) + except Exception as e: + logging.warning(f"get_keypoint_result : {e}") + return keypoint_result + + +def keypoint_postprocess(output, scale_factor): + max_indices = torch.argmax(output.view(output.size(0), output.size(1), -1), dim=2).unsqueeze(dim=2) + max_coords = torch.cat((max_indices / output.size(3), max_indices % output.size(3)), dim=2) + segment_result = max_coords.numpy() + scale_factor = [1 / x for x in scale_factor[::-1]] + scale_matrix = np.diag(scale_factor) + nan = np.isinf(scale_matrix) + scale_matrix[nan] = 0 + return np.ceil(np.dot(segment_result, scale_matrix) * 4) + + +""" + seg + 预处理 推理 后处理 +""" + + +# KNet +def seg_preprocess(img_path): + img = mmcv.imread(img_path) + ori_shape = img.shape[:2] + img_scale_w, img_scale_h = ori_shape + if ori_shape[0] > 1024: + img_scale_w = 1024 + if ori_shape[1] > 1024: + img_scale_h = 1024 + # 如果图片size任意一边 大于 1024, 则会resize 成1024 + if ori_shape != (img_scale_w, img_scale_h): + # mmcv.imresize(img, img_scale_h, img_scale_w) # 老代码 引以为戒!哈哈哈~ h和w写反了 + img = cv2.resize(img, (img_scale_h, img_scale_w)) + img = mmcv.imnormalize(img, mean=np.array([123.675, 116.28, 103.53]), std=np.array([58.395, 57.12, 57.375]), to_rgb=True) + preprocessed_img = np.expand_dims(img.transpose(2, 0, 1), axis=0) + return preprocessed_img, ori_shape + + +# @ RunTime +def get_seg_result(image_id, image): + image, ori_shape = seg_preprocess(image) + client = httpclient.InferenceServerClient(url=f"{DESIGN_MODEL_URL}") + transformed_img = image.astype(np.float32) + # 输入集 + inputs = [ + httpclient.InferInput(SEGMENTATION['input'], transformed_img.shape, datatype="FP32") + ] + inputs[0].set_data_from_numpy(transformed_img, binary_data=True) + # 输出集 + outputs = [ + httpclient.InferRequestedOutput(SEGMENTATION['output'], binary_data=True), + ] + results = client.infer(model_name=SEGMENTATION['new_model_name'], inputs=inputs, outputs=outputs) + # 推理 + # 取结果 + inference_output1 = results.as_numpy(SEGMENTATION['output']) + seg_result = seg_postprocess(int(image_id), inference_output1, ori_shape) + return seg_result + + +# no cache +def seg_postprocess(image_id, output, ori_shape): + seg_logit = F.interpolate(torch.tensor(output).float(), size=ori_shape, scale_factor=None, mode='bilinear', align_corners=False) + seg_pred = seg_logit.cpu().numpy() + return seg_pred[0] + + +def key_point_show(image_path, key_point_result=None): + img = cv2.imread(image_path) + points_list = key_point_result + point_size = 1 + point_color = (0, 0, 255) # BGR + thickness = 4 # 可以为 0 、4、8 + for point in points_list: + cv2.circle(img, point[::-1], point_size, point_color, thickness) + cv2.imshow("0", img) + cv2.waitKey(0) + + +if __name__ == '__main__': + image = cv2.imread("9070101c-e5be-49b5-9602-4113a968969b.png") + a = get_keypoint_result(image, "up") + new_list = [] + print(list) + for i in a[0]: + new_list.append((int(i[0]), int(i[1]))) + key_point_show("9070101c-e5be-49b5-9602-4113a968969b.png", new_list) + # a = get_seg_result(1, image) + print(a) diff --git a/app/service/design/design_batch/items/utils/redis_utils.py b/app/service/design/design_batch/items/utils/redis_utils.py new file mode 100644 index 0000000..012fbe0 --- /dev/null +++ b/app/service/design/design_batch/items/utils/redis_utils.py @@ -0,0 +1,99 @@ +import redis + +from app.core.config import REDIS_HOST, REDIS_PORT + + +class Redis(object): + """ + redis数据库操作 + """ + + @staticmethod + def _get_r(): + host = REDIS_HOST + port = REDIS_PORT + db = 0 + r = redis.StrictRedis(host, port, db) + return r + + @classmethod + def write(cls, key, value, expire=None): + """ + 写入键值对 + """ + # 判断是否有过期时间,没有就设置默认值 + if expire: + expire_in_seconds = expire + else: + expire_in_seconds = 100 + r = cls._get_r() + r.set(key, value, ex=expire_in_seconds) + + @classmethod + def read(cls, key): + """ + 读取键值对内容 + """ + r = cls._get_r() + value = r.get(key) + return value.decode('utf-8') if value else value + + @classmethod + def hset(cls, name, key, value): + """ + 写入hash表 + """ + r = cls._get_r() + r.hset(name, key, value) + + @classmethod + def hget(cls, name, key): + """ + 读取指定hash表的键值 + """ + r = cls._get_r() + value = r.hget(name, key) + return value.decode('utf-8') if value else value + + @classmethod + def hgetall(cls, name): + """ + 获取指定hash表所有的值 + """ + r = cls._get_r() + return r.hgetall(name) + + @classmethod + def delete(cls, *names): + """ + 删除一个或者多个 + """ + r = cls._get_r() + r.delete(*names) + + @classmethod + def hdel(cls, name, key): + """ + 删除指定hash表的键值 + """ + r = cls._get_r() + r.hdel(name, key) + + @classmethod + def expire(cls, name, expire=None): + """ + 设置过期时间 + """ + if expire: + expire_in_seconds = expire + else: + expire_in_seconds = 100 + r = cls._get_r() + r.expire(name, expire_in_seconds) + + +if __name__ == '__main__': + redis_client = Redis() + # print(redis_client.write(key="1230", value=0)) + redis_client.write(key="1230", value=10) + # print(redis_client.read(key="1230")) diff --git a/app/service/design/design_batch/items/utils/synthesis_item.py b/app/service/design/design_batch/items/utils/synthesis_item.py new file mode 100644 index 0000000..9527cd2 --- /dev/null +++ b/app/service/design/design_batch/items/utils/synthesis_item.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +""" +@Project :trinity_client +@File :synthesis_item.py +@Author :周成融 +@Date :2023/8/26 14:13:04 +@detail : +""" +import io +import logging + +import cv2 +import numpy as np +from PIL import Image + +from app.service.utils.generate_uuid import generate_uuid +from app.service.utils.oss_client import oss_upload_image + + +def positioning(all_mask_shape, mask_shape, offset): + all_start = 0 + all_end = 0 + mask_start = 0 + mask_end = 0 + if offset == 0: + all_start = 0 + all_end = min(all_mask_shape, mask_shape) + + mask_start = 0 + mask_end = min(all_mask_shape, mask_shape) + elif offset > 0: + all_start = min(offset, all_mask_shape) + all_end = min(offset + mask_shape, all_mask_shape) + + mask_start = 0 + mask_end = 0 if offset > all_mask_shape else min(all_mask_shape - offset, mask_shape) + elif offset < 0: + if abs(offset) > mask_shape: + all_start = 0 + all_end = 0 + else: + all_start = 0 + if mask_shape - abs(offset) > all_mask_shape: + all_end = min(mask_shape - abs(offset), all_mask_shape) + else: + all_end = mask_shape - abs(offset) + + if abs(offset) > mask_shape: + mask_start = mask_shape + mask_end = mask_shape + else: + mask_start = abs(offset) + if mask_shape - abs(offset) >= all_mask_shape: + mask_end = all_mask_shape + abs(offset) + else: + mask_end = mask_shape + return all_start, all_end, mask_start, mask_end + + +# @RunTime +def synthesis(data, size, basic_info): + # 创建底图 + base_image = Image.new('RGBA', size, (0, 0, 0, 0)) + try: + all_mask_shape = (size[1], size[0]) + body_mask = None + for d in data: + if d['name'] == 'body' or d['name'] == 'mannequin': + # 创建一个新的宽高透明图像, 把模特贴上去获取mask + transparent_image = Image.new("RGBA", size, (0, 0, 0, 0)) + transparent_image.paste(d['image'], (d['adaptive_position'][1], d['adaptive_position'][0]), d['image']) # 此处可变数组会被paste篡改值,所以使用下标获取position + body_mask = np.array(transparent_image.split()[3]) + + # 根据新的坐标获取新的肩点 + left_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_left'], [d['adaptive_position'][1], d['adaptive_position'][0]])] + right_shoulder = [x + y for x, y in zip(basic_info['body_point_test']['shoulder_right'], [d['adaptive_position'][1], d['adaptive_position'][0]])] + body_mask[:min(left_shoulder[1], right_shoulder[1]), left_shoulder[0]:right_shoulder[0]] = 255 + _, binary_body_mask = cv2.threshold(body_mask, 127, 255, cv2.THRESH_BINARY) + top_outer_mask = np.array(binary_body_mask) + bottom_outer_mask = np.array(binary_body_mask) + + top = True + bottom = True + i = len(data) + while i: + i -= 1 + if top and data[i]['name'] in ["blouse_front", "outwear_front", "dress_front", "tops_front"]: + top = False + mask_shape = data[i]['mask'].shape + y_offset, x_offset = data[i]['adaptive_position'] + # 初始化叠加区域的起始和结束位置 + all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset) + all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset) + # 将叠加区域赋值为相应的像素值 + _, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY) + background = np.zeros_like(top_outer_mask) + background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end] + top_outer_mask = background + top_outer_mask + elif bottom and data[i]['name'] in ["trousers_front", "skirt_front", "bottoms_front", "dress_front"]: + bottom = False + mask_shape = data[i]['mask'].shape + y_offset, x_offset = data[i]['adaptive_position'] + # 初始化叠加区域的起始和结束位置 + all_y_start, all_y_end, mask_y_start, mask_y_end = positioning(all_mask_shape=all_mask_shape[0], mask_shape=mask_shape[0], offset=y_offset) + all_x_start, all_x_end, mask_x_start, mask_x_end = positioning(all_mask_shape=all_mask_shape[1], mask_shape=mask_shape[1], offset=x_offset) + # 将叠加区域赋值为相应的像素值 + _, sketch_mask = cv2.threshold(data[i]['mask'], 127, 255, cv2.THRESH_BINARY) + background = np.zeros_like(top_outer_mask) + background[all_y_start:all_y_end, all_x_start:all_x_end] = sketch_mask[mask_y_start:mask_y_end, mask_x_start:mask_x_end] + bottom_outer_mask = background + bottom_outer_mask + elif bottom is False and top is False: + break + + all_mask = cv2.bitwise_or(top_outer_mask, bottom_outer_mask) + + for layer in data: + if layer['image'] is not None: + if layer['name'] != "body": + test_image = Image.new('RGBA', size, (0, 0, 0, 0)) + test_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image']) + mask_data = np.where(all_mask > 0, 255, 0).astype(np.uint8) + mask_alpha = Image.fromarray(mask_data) + cropped_image = Image.composite(test_image, Image.new("RGBA", test_image.size, (255, 255, 255, 0)), mask_alpha) + base_image.paste(test_image, (0, 0), cropped_image) # test_image 已经按照坐标贴到最大宽值的图片上 坐着这里坐标为00 + else: + base_image.paste(layer['image'], (layer['adaptive_position'][1], layer['adaptive_position'][0]), layer['image']) + + result_image = base_image + + image_data = io.BytesIO() + result_image.save(image_data, format='PNG') + image_data.seek(0) + + # oss upload + image_bytes = image_data.read() + bucket_name = "aida-results" + object_name = f'result_{generate_uuid()}.png' + req = oss_upload_image(bucket=bucket_name, object_name=object_name, image_bytes=image_bytes) + return f"{bucket_name}/{object_name}" + # return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}" + + # object_name = f'result_{generate_uuid()}.png' + # response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png') + # object_url = f"aida-results/{object_name}" + # if response['ResponseMetadata']['HTTPStatusCode'] == 200: + # return object_url + # else: + # return "" + + except Exception as e: + logging.warning(f"synthesis runtime exception : {e}") + + +def synthesis_single(front_image, back_image): + result_image = None + if front_image: + result_image = front_image + if back_image: + result_image.paste(back_image, (0, 0), back_image) + + # with io.BytesIO() as output: + # result_image.save(output, format='PNG') + # data = output.getvalue() + # object_name = f'result_{generate_uuid()}.png' + # response = s3.put_object(Bucket="aida-results", Key=object_name, Body=data, ContentType='image/png') + # object_url = f"aida-results/{object_name}" + # if response['ResponseMetadata']['HTTPStatusCode'] == 200: + # return object_url + # else: + # return "" + image_data = io.BytesIO() + result_image.save(image_data, format='PNG') + image_data.seek(0) + image_bytes = image_data.read() + # return f"aida-results/{minio_client.put_object('aida-results', f'result_{generate_uuid()}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}" + # oss upload + bucket_name = 'aida-results' + object_name = f'result_{generate_uuid()}.png' + req = oss_upload_image(bucket=bucket_name, object_name=object_name, image_bytes=image_bytes) + return f"{bucket_name}/{object_name}" diff --git a/app/service/design/design_batch/items/utils/upload_image.py b/app/service/design/design_batch/items/utils/upload_image.py new file mode 100644 index 0000000..9039ce7 --- /dev/null +++ b/app/service/design/design_batch/items/utils/upload_image.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +""" +@Project :trinity_client +@File :upload_image.py +@Author :周成融 +@Date :2023/8/28 13:49:20 +@detail : +""" +import io +import logging + +import cv2 + +from app.core.config import * +from app.service.utils.decorator import RunTime +from app.service.utils.oss_client import oss_upload_image + + +# @RunTime +def upload_png_mask(front_image, object_name, mask=None): + try: + mask_url = None + if mask is not None: + mask_inverted = cv2.bitwise_not(mask) + # 将掩模的3通道转换为4通道,白色部分不透明,黑色部分透明 + rgba_image = cv2.cvtColor(mask_inverted, cv2.COLOR_BGR2BGRA) + rgba_image[rgba_image[:, :, 0] == 0] = [0, 0, 0, 0] + # image_bytes = io.BytesIO() + # image_bytes.write(cv2.imencode('.png', rgba_image)[1].tobytes()) + # image_bytes.seek(0) + # mask_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'mask/mask_{object_name}.png', image_bytes, len(image_bytes.getvalue()), content_type='image/png').object_name}" + # oss upload #################### + req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"mask/mask_{object_name}.png", image_bytes=cv2.imencode('.png', rgba_image)[1]) + mask_url = f"{AIDA_CLOTHING}/mask/mask_{object_name}.png" + + image_data = io.BytesIO() + front_image.save(image_data, format='PNG') + image_data.seek(0) + image_bytes = image_data.read() + # image_url = f"{AIDA_CLOTHING}/{minio_client.put_object('aida-clothing', f'image/image_{object_name}.png', io.BytesIO(image_bytes), len(image_bytes), content_type='image/png').object_name}" + req = oss_upload_image(bucket=AIDA_CLOTHING, object_name=f"image/image_{object_name}.png", image_bytes=image_bytes) + image_url = f"{AIDA_CLOTHING}/image/image_{object_name}.png" + return front_image, image_url, mask_url + except Exception as e: + logging.warning(f"upload_png_mask runtime exception : {e}") + + +# @RunTime +# def upload_png_mask(front_image, object_name, mask=None): +# mask_url = None +# if mask is not None: +# mask_url = f"{AIDA_CLOTHING}/mask/mask_{object_name}.png" +# image_url = f"{AIDA_CLOTHING}/image/image_{object_name}.png" +# return front_image, image_url, mask_url