feat generate single logo 部署

This commit is contained in:
zhouchengrong
2024-06-03 11:38:20 +08:00
parent e96c1ae209
commit 2121f0ab46
7 changed files with 235 additions and 139 deletions

View File

@@ -9,25 +9,39 @@
"""
import json
import logging
import time
import cv2
import numpy as np
import redis
from PIL import Image
from minio import Minio
from tritonclient.utils import np_to_triton_dtype
from app.core.config import *
import tritonclient.grpc as grpcclient
from app.schemas.generate_image import GenerateSingleLogoImageModel
from app.service.generate_image.utils.upload_sd_image import upload_png_sd, upload_single_logo
logger = logging.getLogger()
class GenerateSingleLogoImage:
def __init__(self, request_data):
# if DEBUG is False:
# self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_PARAMS))
# self.channel = self.connection.channel()
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_PARAMS))
self.channel = self.connection.channel()
if DEBUG is False:
self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_PARAMS))
self.channel = self.connection.channel()
# self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_PARAMS))
# self.channel = self.connection.channel()
self.minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
# self.grpc_client = grpcclient.InferenceServerClient(url=GI_MODEL_URL)
self.grpc_client = grpcclient.InferenceServerClient(url=GSL_MODEL_URL)
self.redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
self.batch_size = 1
self.category = "single_logo"
self.negative_prompts = "bad, ugly"
self.seed = request_data.seed
self.tasks_id = request_data.tasks_id
self.prompt = request_data.prompt
self.user_id = self.tasks_id[self.tasks_id.rfind('-') + 1:]
self.gen_single_logo_data = {'tasks_id': self.tasks_id, 'status': 'PENDING', 'message': "pending", 'image_url': ''}
self.redis_client.set(self.tasks_id, json.dumps(self.gen_single_logo_data))
@@ -37,20 +51,72 @@ class GenerateSingleLogoImage:
status_data = self.redis_client.get(self.tasks_id)
return json.loads(status_data), status_data
def infer(self, inputs):
return self.grpc_client.async_infer(
model_name=GSL_MODEL_NAME,
inputs=inputs,
callback=self.callback
)
def callback(self, result, error):
if error:
self.gen_single_logo_data['status'] = "FAILURE"
self.gen_single_logo_data['message'] = str(error)
# self.generate_data['data'] = str(error)
self.redis_client.set(self.tasks_id, json.dumps(self.gen_single_logo_data))
else:
image = result.as_numpy("generated_image")
image_result = Image.fromarray(np.squeeze(image.astype(np.uint8)))
image_url = upload_single_logo(image_result, user_id=self.user_id, category=f"{self.category}", object_name=f"{self.tasks_id}.png")
self.gen_single_logo_data['status'] = "SUCCESS"
self.gen_single_logo_data['message'] = "success"
self.gen_single_logo_data['image_url'] = str(image_url)
self.redis_client.set(self.tasks_id, json.dumps(self.gen_single_logo_data))
def get_result(self):
try:
generate_data, _ = self.read_tasks_status()
# prompt
prompts = [self.prompt] * self.batch_size
text_obj = np.array(prompts, dtype="object").reshape((-1, 1))
input_text = grpcclient.InferInput("prompt", text_obj.shape, np_to_triton_dtype(text_obj.dtype))
input_text.set_data_from_numpy(text_obj)
# negative_prompts
text_obj_neg = np.array(self.negative_prompts, dtype="object").reshape((-1, 1))
# print('text obj neg: ', text_obj_neg)
input_text_neg = grpcclient.InferInput("negative_prompt", text_obj_neg.shape, np_to_triton_dtype(text_obj_neg.dtype))
input_text_neg.set_data_from_numpy(text_obj_neg)
# seed
seed = np.array(self.seed, dtype="object").reshape((-1, 1))
print('seed: ', self.seed)
input_seed = grpcclient.InferInput("seed", seed.shape, np_to_triton_dtype(seed.dtype))
input_seed.set_data_from_numpy(seed)
inputs = [input_text, input_text_neg, input_seed]
ctx = self.infer(inputs)
time_out = 600
generate_data = None
while time_out > 0:
generate_data, _ = self.read_tasks_status()
# logger.info(generate_data)
if generate_data['status'] in ["REVOKED", "FAILURE"]:
ctx.cancel()
break
elif generate_data['status'] == "SUCCESS":
break
time_out -= 1
time.sleep(0.1)
# logger.info(time_out, generate_data)
return generate_data
except Exception as e:
self.gen_single_logo_data['status'] = "FAILURE"
self.gen_single_logo_data['message'] = str(e)
self.redis_client.set(self.tasks_id, json.dumps(self.gen_single_logo_data))
raise Exception(str(e))
finally:
dict_generate_data, str_generate_data = self.read_tasks_status()
# if DEBUG is False:
# self.channel.basic_publish(exchange='', routing_key=GI_RABBITMQ_QUEUES, body=str_generate_data)
self.channel.basic_publish(exchange='', routing_key=GEN_SINGLE_LOGO_RABBITMQ_QUEUES, body=str_generate_data)
if DEBUG is False:
self.channel.basic_publish(exchange='', routing_key=GEN_SINGLE_LOGO_RABBITMQ_QUEUES, body=str_generate_data)
# self.channel.basic_publish(exchange='', routing_key=GI_RABBITMQ_QUEUES, body=str_generate_data)
logger.info(f" [x] Sent {json.dumps(dict_generate_data, indent=4)}")
@@ -64,9 +130,9 @@ def infer_cancel(tasks_id):
if __name__ == '__main__':
rd = GenerateSingleLogoImageModel(
tasks_id="123-8",
prompt='skeleton sitting by the side of a river looking soulful, concert poster, 4k, artistic',
image_url="",
tasks_id="123-89",
prompt='an apple',
seed="1",
)
server = GenerateSingleLogoImage(rd)
print(server.get_result())

View File

@@ -10,6 +10,7 @@
import io
import logging
import boto3
import cv2
from PIL import Image
from minio import Minio
@@ -17,6 +18,39 @@ from minio import Minio
from app.core.config import *
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
s3 = boto3.client('s3', aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_AWS_SECRET_ACCESS_KEY, region_name=S3_REGION_NAME)
# def upload_single_logo(image, user_id, category, object_name):
# with io.BytesIO() as output:
# image.save(output, format='PNG')
# data = output.getvalue()
# # 创建一个 S3 客户端
# try:
# key = f'{user_id}/{category}/{object_name}'
# image_url = f"{AIDA_CLOTHING}/{key}"
# s3.put_object(Bucket=GSL_MINIO_BUCKET, Key=key, Body=data, ContentType='image/png')
# return image_url
# except Exception as e:
# print(f'上传到 S3 失败: {e}')
def upload_single_logo(image, user_id, category, object_name):
try:
image_data = io.BytesIO()
image.save(image_data, format='PNG')
image_data.seek(0)
image_bytes = image_data.read()
minio_req = minio_client.put_object(
GI_MINIO_BUCKET,
f'{user_id}/{category}/{object_name}',
io.BytesIO(image_bytes),
len(image_bytes),
content_type='image/jpeg'
)
image_url = f"aida-users/{minio_req.object_name}"
return image_url
except Exception as e:
logging.warning(f"upload_png_mask runtime exception : {e}")
def upload_png_sd(image, user_id, category, object_name):