first commit

This commit is contained in:
zhh
2025-10-17 16:04:42 +08:00
parent ad4dccfe49
commit d8d558e185
11 changed files with 510 additions and 88 deletions

39
Dockerfile Normal file
View File

@@ -0,0 +1,39 @@
# Change CUDA and cuDNN version here
FROM nvidia/cuda:12.4.1-base-ubuntu22.04
ARG PYTHON_VERSION=3.11
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
software-properties-common \
wget \
&& add-apt-repository ppa:deadsnakes/ppa \
&& apt-get update && apt-get install -y --no-install-recommends \
python$PYTHON_VERSION \
python$PYTHON_VERSION-dev \
python$PYTHON_VERSION-venv \
&& wget https://bootstrap.pypa.io/get-pip.py -O get-pip.py \
&& python$PYTHON_VERSION get-pip.py \
&& rm get-pip.py \
&& ln -sf /usr/bin/python$PYTHON_VERSION /usr/bin/python \
&& ln -sf /usr/local/bin/pip$PYTHON_VERSION /usr/local/bin/pip \
&& python --version \
&& pip --version \
&& apt-get purge -y --auto-remove software-properties-common \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
####### Add your own installation commands here #######
# RUN pip install some-package
# RUN wget https://path/to/some/data/or/weights
RUN apt-get update && apt-get install ffmpeg libsm6 libxext6 -y \
&& apt install -y build-essential g++
WORKDIR /app
COPY . /app
# Install litserve and requirements
RUN pip install --no-cache-dir litserve==0.2.16
RUN pip install -r requirements-GPU.txt
RUN pip install opencv-python
EXPOSE 8000
CMD ["python", "/app/litserver_main.py"]

8
README-AIDA-LC.md Normal file
View File

@@ -0,0 +1,8 @@
创建Docker file
litserve dockerize server.py --port 8000 --gpu
构建镜像:
docker build -t litserve-model .
运行容器:
docker run -p 8000:8000 litserve-model

7
client.py Normal file
View File

@@ -0,0 +1,7 @@
# This file is auto-generated by LitServe.
# Disable auto-generation by setting `generate_client_file=False` in `LitServer.run()`.
import requests
response = requests.post("http://127.0.0.1:8080/predict", json={"input": 4.0})
print(f"Status: {response.status_code}\nResponse:\n {response.text}")

View File

@@ -0,0 +1,93 @@
import os
import torch
import cv2
import numpy as np
from pathlib import Path
from torchvision.transforms.functional import normalize
from basicsr.utils import img2tensor, tensor2img
from basicsr.utils.download_util import load_file_from_url
from facelib.utils.face_restoration_helper import FaceRestoreHelper
from basicsr.utils.registry import ARCH_REGISTRY
# Cross-platform device selection: CUDA > MPS > CPU
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
# Download and load model
pretrain_model_url = {
'restoration': 'https://github.com/sczhou/CodeFormer/releases/download/v0.1.0/codeformer.pth',
}
net = ARCH_REGISTRY.get('CodeFormer')(dim_embd=512, codebook_size=1024, n_head=8, n_layers=9,
connect_list=['32', '64', '128', '256']).to(device)
ckpt_path = load_file_from_url(url=pretrain_model_url['restoration'],
model_dir='weights/CodeFormer', progress=True, file_name=None)
checkpoint = torch.load(ckpt_path, map_location=device)['params_ema']
net.load_state_dict(checkpoint)
net.eval()
face_helper = FaceRestoreHelper(
upscale_factor=1,
face_size=512,
crop_ratio=(1, 1),
det_model='retinaface_resnet50',
save_ext='jpg',
use_parse=True,
device=device
)
def _enhance_img(img: np.ndarray, w: float = 0.5) -> np.ndarray:
"""
Internal helper to enhance a numpy image with CodeFormer.
"""
face_helper.clean_all()
face_helper.read_image(img)
num_faces = face_helper.get_face_landmarks_5(only_center_face=False, resize=640, eye_dist_threshold=5)
if num_faces == 0:
return img # Return original if no faces detected
face_helper.align_warp_face()
for cropped_face in face_helper.cropped_faces:
cropped_face_t = img2tensor(cropped_face / 255., bgr2rgb=True, float32=True).to(device)
normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True)
cropped_face_t = cropped_face_t.unsqueeze(0) # (1, 3, H, W), already on correct device
with torch.no_grad():
output = net(cropped_face_t, w=w, adain=True)[0]
restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1))
restored_face = restored_face.astype('uint8')
face_helper.add_restored_face(restored_face)
face_helper.get_inverse_affine(None)
restored_img = face_helper.paste_faces_to_input_image()
return restored_img
def enhance_image(img: str, w: float = 0.5) -> str:
"""
Enhances an input image using CodeFormer and saves it with a '.enhanced.jpg' suffix.
"""
# input_path = Path(input_image_path)
# output_path = input_path.with_name(f"{input_path.stem}.enhanced.jpg")
# img = cv2.imread(str(input_path), cv2.IMREAD_COLOR)
if img is None:
raise ValueError(f"Cannot read image")
restored_img = _enhance_img(img, w=w)
# os.makedirs(output_path.parent, exist_ok=True)
# cv2.imwrite(str(output_path), restored_img)
# print(f"Enhanced image saved to: {output_path}")
return restored_img
def enhance_image_memory(img: np.ndarray, w: float = 0.5) -> np.ndarray:
"""
Enhances an input image entirely in memory and returns the enhanced image.
"""
return _enhance_img(img, w=w)

70
litserver_main.py Normal file
View File

@@ -0,0 +1,70 @@
import time
import cv2
import litserve as ls
from pydantic import BaseModel
from refacer_no_path import Refacer as NoPathRefacer
from utils.minio_client import oss_get_image, minio_client, oss_upload_image
class PredictRequest(BaseModel):
input_image_list: list[str] # 待换脸图片
input_face: str # 目标脸图片
threshold: float = 0.2 # 相似度 max0.5
class InferencePipeline(ls.LitAPI):
def setup(self, device):
force_cpu = False
colab_performance = False
self.supported_exts = {'jpg', 'jpeg', 'png', 'bmp', 'webp'}
self.refacer = NoPathRefacer(force_cpu=force_cpu, colab_performance=colab_performance)
def decode_request(self, request: PredictRequest):
self.input_image_list = []
for path in request.input_image_list:
self.input_image_list.append({
'img_obj': oss_get_image(oss_client=minio_client, path=path, data_type="cv2"),
'img_path': path
})
dest_img = oss_get_image(oss_client=minio_client, path=request.input_face, data_type="cv2")
faces_config = [
{
'origin': None,
'destination': dest_img,
'destination_path': request.input_face,
'threshold': request.threshold,
}
]
self.refacer.prepare_faces(faces_config)
return faces_config
def predict(self, faces_config):
refaced_images_url = []
for i, image in enumerate(self.input_image_list):
ext = image['img_path'].rsplit(".", 1)[1].lower()
if ext not in self.supported_exts:
print(f"Skipping non-image file: {image['img_path']}")
continue
print(f"Refacing: {image['img_path']}")
try:
refaced_image = self.refacer.reface_image(image['img_obj'], faces_config, disable_similarity=True)
refaced_image_rgb = cv2.cvtColor(refaced_image, cv2.COLOR_RGB2BGR)
image_bytes = cv2.imencode('.jpg', refaced_image_rgb)[1].tobytes()
req = oss_upload_image(oss_client=minio_client, bucket="lanecarford", object_name=f"refaced_image/refaced{time.time()}.{ext}", image_bytes=image_bytes)
refaced_images_url.append(f"{req.bucket_name}/{req.object_name}")
print(f"Saved -> {req.bucket_name}/{req.object_name}")
except Exception as e:
print(f"Failed to process {image['img_path']}: {e}")
return refaced_images_url
def encode_response(self, output):
return {"output": output}
if __name__ == '__main__':
api = InferencePipeline()
server = ls.LitServer(api, accelerator="gpu")
server.run(port=8080)

View File

@@ -1,6 +1,9 @@
import cv2
import onnxruntime as rt
import sys
from utils.minio_client import oss_get_image, minio_client
sys.path.insert(1, './recognition')
from scrfd import SCRFD
from arcface_onnx import ArcFaceONNX
@@ -40,9 +43,11 @@ if sys.platform in ("win32", "win64"):
if hasattr(rt, "preload_dlls"):
rt.preload_dlls()
class RefacerMode(Enum):
CPU, CUDA, COREML, TENSORRT = range(1, 5)
class Refacer:
def __init__(self, force_cpu=False, colab_performance=False):
self.disable_similarity = False
@@ -95,7 +100,6 @@ class Refacer:
return blended_frame
def __download_with_progress(self, url, output_path):
response = requests.get(url, stream=True)
total_size = int(response.headers.get('content-length', 0))
@@ -368,10 +372,6 @@ class Refacer:
return converted_path, None
def __generate_gif(self, video_path, gif_output_path):
os.makedirs(os.path.dirname(gif_output_path), exist_ok=True)
print(f"Generating GIF at {gif_output_path}")
@@ -400,13 +400,16 @@ class Refacer:
self.first_face = False if multiple_faces_mode else (faces[0].get("origin") is None or disable_similarity)
self.partial_reface_ratio = partial_reface_ratio
ext = osp.splitext(image_path)[1].lower()
os.makedirs("output", exist_ok=True)
ext = osp.splitext(image_path)[1].lower() #
# ext = image_path.rsplit('.',1)[1].lower()
os.makedirs("output", exist_ok=True) #
original_name = osp.splitext(osp.basename(image_path))[0]
timestamp = str(int(time.time()))
if ext in ['.tif', '.tiff']:
pil_img = Image.open(image_path)
pil_img = Image.open(image_path) #
# pil_img = oss_get_image(oss_client=minio_client, path=image_path, data_type="PIL")
frames = []
page_count = 0
@@ -417,7 +420,8 @@ class Refacer:
except EOFError:
pass
pil_img = Image.open(image_path)
pil_img = Image.open(image_path) #
# pil_img = oss_get_image(oss_client=minio_client, path=image_path, data_type="PIL")
with tqdm(total=page_count, desc="Processing TIFF pages") as pbar:
for page in range(page_count):
@@ -436,7 +440,8 @@ class Refacer:
return output_path
else:
bgr_image = cv2.imread(image_path)
bgr_image = cv2.imread(image_path) #
# bgr_image = oss_get_image(oss_client=minio_client, path=image_path, data_type="cv2")
if bgr_image is None:
raise ValueError("Failed to read input image")
@@ -450,7 +455,6 @@ class Refacer:
print(f"Saved refaced image to {output_path}")
return output_path
def extract_faces_from_image(self, image_path, max_faces=5):
frame = cv2.imread(image_path)
if frame is None:

82
refacer_bulk_no_path.py Normal file
View File

@@ -0,0 +1,82 @@
# refacer_bulk.py
#
# Example usage:
# python refacer_bulk.py --input_path ./input --dest_face myface.jpg --facetoreplace face1.jpg --threshold 0.3
#
# Or, to disable similarity check (i.e., just apply the destination face to all detected faces):
# python refacer_bulk.py --input_path ./input --dest_face myface.jpg
import argparse
import os
import time
import cv2
from PIL import Image
from refacer_no_path import Refacer as NoPathRefacer
import pyfiglet
from utils.minio_client import oss_get_image, minio_client, oss_upload_image
def main():
input_path = [
"lanecarford/original_image/7450d6e8-bc54-4c85-940c-4a31c879e02f-0-89.png",
"lanecarford-outfits/outfits/outfit_6420.jpg",
"lanecarford-outfits/outfits/outfit_7579.jpg"
]
dest_face = "lanecarford/input_face/leijun.jpg"
facetoreplace = ""
threshold = 0.2
force_cpu = False
colab_performance = False
input_dir = input_path
refacer = NoPathRefacer(force_cpu=force_cpu, colab_performance=colab_performance)
# Load destination and origin face
dest_img = oss_get_image(oss_client=minio_client, path=dest_face, data_type="cv2")
if dest_img is None:
raise ValueError(f"Destination face image not found: {dest_face}")
origin_img = None
if facetoreplace:
origin_img = oss_get_image(oss_client=minio_client, path=facetoreplace, data_type="cv2")
if origin_img is None:
raise ValueError(f"Face to replace image not found: {facetoreplace}")
disable_similarity = origin_img is None
faces_config = [{
'origin': origin_img,
'destination': dest_img,
'threshold': threshold
}]
refacer.prepare_faces(faces_config, disable_similarity=disable_similarity)
print(f"Processing images from: {input_dir}")
image_files = list(input_dir)
supported_exts = {'jpg', 'jpeg', 'png', 'bmp', 'webp'}
refaced_images_url = []
for i, image_path in enumerate(image_files):
ext = image_path.rsplit(".", 1)[1].lower()
if ext not in supported_exts:
print(f"Skipping non-image file: {image_path}")
continue
print(f"Refacing: {image_path}")
try:
refaced_image = refacer.reface_image(str(image_path), faces_config, disable_similarity=disable_similarity)
refaced_image_rgb = cv2.cvtColor(refaced_image, cv2.COLOR_RGB2BGR)
image_bytes = cv2.imencode('.jpg', refaced_image_rgb)[1].tobytes()
req = oss_upload_image(oss_client=minio_client, bucket="lanecarford", object_name=f"refaced_image/refaced{time.time()}.{ext}", image_bytes=image_bytes)
refaced_images_url.append(f"{req.bucket_name}/{req.object_name}")
print(f"Saved -> {req.bucket_name}/{req.object_name}")
except Exception as e:
print(f"Failed to process {image_path}: {e}")
if __name__ == "__main__":
main()

31
refacer_server.py Normal file
View File

@@ -0,0 +1,31 @@
from refacer import Refacer
from utils.minio_client import oss_get_image, minio_client
class ReFaceServer:
def __init__(self):
self.force_cpu = False
self.threshold = 0.2
self.colab_performance = False
self.refacer = Refacer(force_cpu=self.force_cpu, colab_performance=self.colab_performance)
def run(self, input_list, dest_face, facetoreplace):
dest_img = oss_get_image(oss_client=minio_client, path=dest_face, data_type="cv2")
if dest_img is None:
raise ValueError(f"Destination face image not found: {dest_face}")
origin_img = None
if facetoreplace:
origin_img = oss_get_image(oss_client=minio_client, path=facetoreplace, data_type="cv2")
if origin_img is None:
raise ValueError(f"Face to replace image not found: {facetoreplace}")
disable_similarity = origin_img is None
faces_config = [{
'origin': origin_img,
'destination': dest_img,
'threshold': self.threshold
}]
self.refacer.prepare_faces(faces_config, disable_similarity=disable_similarity)
print(f"Processing images from: {input_dir}")

View File

@@ -18,3 +18,5 @@ pyfiglet==1.0.2
# torchvision==0.21.0
gdown==5.2.0
lpips==0.1.4
opencv-python
minio

81
utils/minio_client.py Normal file
View File

@@ -0,0 +1,81 @@
import io
import logging
from io import BytesIO
import cv2
import numpy as np
import urllib3
from PIL import Image
from minio import Minio
from utils.minio_config import MINIO_URL, MINIO_ACCESS, MINIO_SECRET, MINIO_SECURE
minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=MINIO_SECURE)
# 自定义 Retry 类
class CustomRetry(urllib3.Retry):
def increment(self, method=None, url=None, response=None, error=None, **kwargs):
# 调用父类的 increment 方法
new_retry = super(CustomRetry, self).increment(method, url, response, error, **kwargs)
# 打印重试信息
logger.info(f"重试连接: {method} {url},错误: {error},重试次数: {self.total - new_retry.total}")
return new_retry
logger = logging.getLogger()
timeout = urllib3.Timeout(connect=1, read=10.0) # 连接超时 5 秒,读取超时 10 秒
http_client = urllib3.PoolManager(
num_pools=10, # 设置连接池大小
maxsize=10,
timeout=timeout,
cert_reqs='CERT_REQUIRED', # 需要证书验证
retries=CustomRetry(
total=5,
backoff_factor=0.2,
status_forcelist=[500, 502, 503, 504],
),
)
# 获取图片
def oss_get_image(oss_client, path, data_type):
# cv2 默认全通道读取
bucket = path.split("/", 1)[0]
object_name = path.split("/", 1)[1]
image_object = None
try:
image_data = oss_client.get_object(bucket_name=bucket, object_name=object_name)
if data_type == "cv2":
image_bytes = image_data.read()
image_array = np.frombuffer(image_bytes, np.uint8) # 转成8位无符号整型
image_object = cv2.imdecode(image_array, cv2.IMREAD_UNCHANGED)
if image_object.dtype == np.uint16:
image_object = (image_object / 256).astype('uint8')
else:
data_bytes = BytesIO(image_data.read())
image_object = Image.open(data_bytes)
except Exception as e:
logger.warning(f"获取图片出现异常 ######: {e}")
return image_object
def oss_upload_image(oss_client, bucket, object_name, image_bytes):
req = None
try:
req = oss_client.put_object(bucket_name=bucket, object_name=object_name, data=io.BytesIO(image_bytes), length=len(image_bytes), content_type='image/png')
except Exception as e:
logger.warning(f"上传图片出现异常 ######: {e}")
return req
if __name__ == '__main__':
url = "lanecarford/refaced_image/refaced1760687023.736802.png"
read_type = "1"
img = oss_get_image(oss_client=minio_client, path=url, data_type=read_type)
if read_type == "cv2":
cv2.imshow("", img)
cv2.waitKey(0)
else:
img.show()
img.save("result.png")

5
utils/minio_config.py Normal file
View File

@@ -0,0 +1,5 @@
# minio 配置
MINIO_URL = "www.minio-api.aida.com.hk"
MINIO_ACCESS = 'vXKFLSJkYeEq2DrSZvkB'
MINIO_SECRET = 'uKTZT3x7C43WvPN9QTc99DiRkwddWZrG9Uh3JVlR'
MINIO_SECURE = True