diff --git a/app/api/api_process_lookbooks.py b/app/api/api_process_lookbooks.py index 752fce9..b019e0c 100644 --- a/app/api/api_process_lookbooks.py +++ b/app/api/api_process_lookbooks.py @@ -4,52 +4,31 @@ import shutil from typing import List import tqdm -from fastapi import UploadFile, File, APIRouter - -from app.service.lookbooks.service import create_image_batch_requests +from fastapi import UploadFile, File, APIRouter, BackgroundTasks +from app.service.lookbooks.service import create_image_batch_requests, process_lookbook_task # 引入服务逻辑 logger = logging.getLogger() router = APIRouter() -@router.post("/process_lookbooks/") -async def process_lookbooks(files: List[UploadFile] = File(...)): +@router.post("/process-lookbooks/") +async def process_lookbooks( + background_tasks: BackgroundTasks, + files: List[UploadFile] = File(...), + tag: str = Form(...), + year: str = Form(...) +): lookbook_dir = "service/lookbooks/temp_lookbooks" os.makedirs(lookbook_dir, exist_ok=True) lookbook_list = [] for file in files: file_path = os.path.join(lookbook_dir, file.filename) - with open(file_path, "wb") as f: - shutil.copyfileobj(file.file, f) + async with aiofiles.open(file_path, "wb") as f: + await f.write(await file.read()) lookbook_list.append(file_path) - image_list = [] - for look_book_path in tqdm.tqdm(lookbook_list): - lookbook_name = os.path.splitext(os.path.basename(look_book_path))[0] - output_dir = os.path.join("app/service/lookbooks/fashion_documents/lookbook/images", lookbook_name) - os.makedirs(output_dir, exist_ok=True) - if not os.listdir(output_dir): - from unstructured.partition.pdf import partition_pdf - partition_pdf( - filename=look_book_path, - extract_images_in_pdf=True, - infer_table_structure=False, - chunking_strategy="by_title", - max_characters=4000, - new_after_n_chars=3800, - combine_text_under_n_chars=2000, - extract_image_block_output_dir=output_dir, - ) - else: - current_images = os.listdir(output_dir) - image_list.extend([os.path.join(output_dir, x) for x in current_images]) + # 将任务放入后台异步执行 + background_tasks.add_task(process_lookbook_task, lookbook_list, tag, year) - image_description_results_file = create_image_batch_requests(image_list, "app/service/lookbooks/fashion_documents/lookbook/results") - - shutil.rmtree(lookbook_dir) - - if image_description_results_file: - return {"message": "Lookbooks processed successfully", "result_file": image_description_results_file} - else: - return {"message": "No new images to process"} + return {"message": "Lookbooks are being processed in the background."} diff --git a/app/service/lookbooks/service.py b/app/service/lookbooks/service.py index 065d7ee..9c08cd5 100644 --- a/app/service/lookbooks/service.py +++ b/app/service/lookbooks/service.py @@ -1,11 +1,17 @@ import json import os +import logging +import tqdm +import aiofiles from openai import OpenAI - from app.service.lookbooks.utils.image_utils import base64_encode_image, generate_text_id from app.service.lookbooks.utils.openai_utils import wait_for_job_completion +# 设置日志 +logger = logging.getLogger() + +# OpenAI 配置 OPENAI_API_KEY = "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" OPENAI_API_BASE = "https://pangkaichen-openai-prox-98.deno.dev/v1" client = OpenAI( @@ -84,7 +90,8 @@ def create_image_batch_requests( } tasks.append(task) id2img[current_id] = image_filename - print(f"In total {len(tasks)} images") + logger.info(f"In total {len(tasks)} images to process") + if tasks: batch_file_name = os.path.join(output_path, "image_batch_requests.jsonl") with open(batch_file_name, 'w', encoding='utf-8') as file: @@ -120,9 +127,77 @@ def create_image_batch_requests( }) f.write(output + '\n') except json.JSONDecodeError as error: - print(f"Error parsing: {error} -- at line: {line}") + logger.error(f"Error parsing: {error} -- at line: {line}") else: - print("Job failed") + logger.error("Job failed") return os.path.join(output_path, "image_description_results.jsonl") else: return None + + +async def process_lookbook_task(lookbook_list, tag, year): + """后台异步任务,用于处理 lookbook 并保存到向量数据库""" + image_list = [] + try: + for look_book_path in tqdm.tqdm(lookbook_list): + lookbook_name = os.path.splitext(os.path.basename(look_book_path))[0] + output_dir = os.path.join("fashion_documents/lookbook/images", lookbook_name) + os.makedirs(output_dir, exist_ok=True) + if not os.listdir(output_dir): + from unstructured.partition.pdf import partition_pdf + partition_pdf( + filename=look_book_path, + extract_images_in_pdf=True, + infer_table_structure=False, + chunking_strategy="by_title", + max_characters=4000, + new_after_n_chars=3800, + combine_text_under_n_chars=2000, + extract_image_block_output_dir=output_dir, + ) + else: + current_images = os.listdir(output_dir) + image_list.extend([os.path.join(output_dir, x) for x in current_images]) + + # 1. 处理图片并生成批量请求 + image_description_results_file = create_image_batch_requests(image_list, "fashion_documents/lookbook/results") + + # 2. 保存结果到向量数据库 + if image_description_results_file: + save_to_vector_db(image_description_results_file, tag, year) + + except Exception as e: + logger.error(f"Error processing lookbooks: {str(e)}") + raise e + + +def save_to_vector_db(image_description_results_file, tag, year): + """保存图像描述到向量数据库""" + image_ids = set() + image_summaries = [] + image_metadatas = [] + + try: + with open(image_description_results_file, "r", encoding="utf-8") as f: + for image_content in f: + image_content = json.loads(image_content) + # 确保ID不重复 + if image_content["custom_id"] not in image_ids: + image_ids.add(image_content["custom_id"]) + image_summaries.append(image_content["summary"]) + image_metadatas.append({ + "data_type": "image", + "url": image_content["url"].replace("\\", "/"), + "source": "mitu", + "tag": tag, + "year": year, + "gender": "female" + }) + + # 将图像的描述和元数据添加到向量数据库中 + collection.add_texts(texts=image_summaries, metadatas=image_metadatas, ids=list(image_ids)) + logger.info("Successfully saved data to vector database") + + except Exception as e: + logger.error(f"Error saving to vector database: {e}") + raise e