import json import logging import redis from app.core.config import * logger = logging.getLogger() class Slogan: def __init__(self, request_data): self.tasks_id = request_data.tasks_id self.prompt = request_data.prompt self.svg = request_data.svg self.redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True) self.slogan_data = {'tasks_id': self.tasks_id, 'status': 'PENDING', 'message': "pending", 'image_url': ''} self.redis_client.set(self.tasks_id, json.dumps(self.slogan_data)) self.redis_client.expire(self.tasks_id, 600) # if DEBUG is False: self.connection = pika.BlockingConnection(pika.ConnectionParameters(**RABBITMQ_PARAMS)) self.channel = self.connection.channel() self.result_image_url = "test/slogan/init_img.png" def read_tasks_status(self): status_data = self.redis_client.get(self.tasks_id) return json.loads(status_data), status_data def get_result(self): self.slogan_data['status'] = "SUCCESS" self.slogan_data['message'] = "success" self.slogan_data['image_url'] = "test/slogan/init_img.png" dict_slogan_data, str_slogan_data = self.read_tasks_status() self.channel.basic_publish(exchange='', routing_key=SLOGAN_RABBITMQ_QUEUES, body=str_slogan_data) logger.info(f" [x] Sent {json.dumps(dict_slogan_data, indent=4)}")