TASK:1. 通过定时任务,补偿获取第三方api的运行结果;

2. pose 生成,对使用万相模型的生成限流,最大支持5个并发
3. Slogan限流,一分钟内,最多接收3个请求
This commit is contained in:
2025-08-01 17:21:41 +08:00
parent 132f48eb60
commit 4ad2cd027c
9 changed files with 3511 additions and 3326 deletions

View File

@@ -293,7 +293,7 @@ public class GenerateConsumer {
} else {
// 修改redis中的数据状态为exception
String key = generateResultKey + ":" + generateResult.get("tasks_id");
generateService.updatePoseTransferStatus(generateResult.get("tasks_id"), "Fail");
generateService.updatePoseTransferStatus(generateResult.get("tasks_id"), "Fail", null);
redisUtil.addToString(key, new Gson().toJson(new PoseTransformationVO(null, generateResult.get("tasks_id"),null, null, null, (byte)0, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
// 将异常信息存到exception中
HashMap<String, String> exceptionInfo = new HashMap<>();
@@ -529,7 +529,7 @@ public class GenerateConsumer {
} else {
// 修改redis中的数据状态为exception
String key = generateResultKey + ":" + generateResult.getString("task_id");
generateService.updatePoseTransferStatus(generateResult.getString("task_id"), "Fail");
generateService.updatePoseTransferStatus(generateResult.getString("task_id"), "Fail", null);
redisUtil.addToString(key, new Gson().toJson(new PoseTransformationVO(null, generateResult.getString("task_id"),null, null, null, (byte)0, "Fail")), CommonConstant.GENERATE_RESULT_EXPIRE_TIME);
// 将异常信息存到exception中
HashMap<String, String> exceptionInfo = new HashMap<>();

View File

@@ -0,0 +1,134 @@
package com.ai.da.common.task;
import com.ai.da.common.config.exception.BusinessException;
import com.ai.da.common.utils.DateUtil;
import com.ai.da.mapper.primary.PoseTransformationMapper;
import com.ai.da.mapper.primary.ToProductImageResultMapper;
import com.ai.da.mapper.primary.entity.*;
import com.ai.da.service.APIGenerateService;
import com.ai.da.service.CreditsService;
import com.ai.da.service.GenerateService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import io.netty.util.internal.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import static com.ai.da.common.enums.CreditsEventsEnum.TO_PRODUCT_IMAGE;
@Slf4j
@Component
public class GenerateTask {
@Resource
private APIGenerateService apiGenerateService;
@Resource
private CreditsService creditsService;
@Resource
private GenerateService generateService;
@Resource
private ToProductImageResultMapper toProductImageResultMapper;
@Resource
private PoseTransformationMapper poseTransformationMapper;
/*
* 对于使用了第三方api的允许异步获得结果的生成功能可能在第三方接口的结果Ready时没有及时存储结果导致第三方链接失效
* 万相 24h失效
* flux 10mins失效 使用了flux接口的功能 ToProductImage || Relight, Pattern这里不做补偿
* 故这里通过定时任务做补偿
* flux五分钟查询一次万相1小时查询一次
*/
@Scheduled(cron = "0 */4 * * * ?")
public void fluxCompensationMechanism(){
// 1、查所有 任务还没成功、还没失败正在等待或者执行中的任务id有哪些
// 由于获取结果的polling_url在redis中只存一天大部分结果超过一天之后就无法再找到任务小部分可以通过公共路径查到结果
List<APIGenerate> apiGenerates = apiGenerateService.getPendingTaskByStatus("flux");
if (apiGenerates != null && !apiGenerates.isEmpty()){
for (APIGenerate apiGenerate : apiGenerates){
String taskId = apiGenerate.getTaskId();
// 1. 根据taskId查toProductImageResult 判断当前任务状态与超时状态
ToProductImageResult toProductImageResult = toProductImageResultMapper.selectOne(new QueryWrapper<ToProductImageResult>().eq("task_id", taskId));
if (Objects.nonNull(toProductImageResult) && "Pending".equals(toProductImageResult.getStatus())){
// 判断当前任务的超时状态
if (!DateUtil.isMoreThanOneDayApart(toProductImageResult.getCreateTime())){
// 1. 未超时,获取当前任务结果
String fileName = toProductImageResult.getResultType().equals(TO_PRODUCT_IMAGE.getName()) ? "product_image" : "relight_image";
String objectName = apiGenerate.getAccountId() + "/" + fileName +"/" + taskId + ".png";
String fluxResult = generateService.getFluxResult(taskId, objectName);
// 2. 成功获取结果下载图片上传至minio,更新toProductImageResult表
if (StringUtil.isNullOrEmpty(fluxResult) || fluxResult.equals("Fail")){
toProductImageResult.setStatus("Fail");
toProductImageResultMapper.updateById(toProductImageResult);
apiGenerate.setStatus("Fail");
apiGenerate.setUpdateTime(LocalDateTime.now());
apiGenerateService.updateById(apiGenerate);
} else if (!fluxResult.equals("Pending")){
if (StringUtil.isNullOrEmpty(toProductImageResult.getUrl())){
toProductImageResult.setStatus("Success");
toProductImageResult.setUrl(fluxResult);
toProductImageResultMapper.updateById(toProductImageResult);
apiGenerate.setStatus("Success");
apiGenerate.setUpdateTime(LocalDateTime.now());
apiGenerateService.updateById(apiGenerate);
}
// 扣积分
Boolean flag = creditsService.taskCreditsDeduction(apiGenerate.getAccountId(), taskId);
if (flag) creditsService.updateChangedCredits(String.valueOf(apiGenerate.getAccountId()), taskId);
}
} else {
// 超时,设置状态为失败
toProductImageResult.setStatus("Fail");
toProductImageResultMapper.updateById(toProductImageResult);
apiGenerate.setStatus("Fail");
apiGenerate.setUpdateTime(LocalDateTime.now());
apiGenerateService.updateById(apiGenerate);
}
}
}
}
}
// 万相 -> pose transformation 补偿 一小时执行一次
@Scheduled(fixedDelay = 60 * 60 * 1000)
public void wxCompensationMechanism(){
List<APIGenerate> apiGenerates = apiGenerateService.getPendingTaskByStatus("wx");
if (apiGenerates != null && !apiGenerates.isEmpty()){
for (APIGenerate apiGenerate : apiGenerates){
String taskId = apiGenerate.getTaskId();
PoseTransformation poseTransformation = poseTransformationMapper.selectOne(new QueryWrapper<PoseTransformation>().eq("unique_id", taskId));
if (Objects.nonNull(poseTransformation) && "Pending".equals(poseTransformation.getTaskStatus())){
// 判断当前任务的超时状态
if (!DateUtil.isMoreThanOneDayApart(poseTransformation.getCreateTime())){
try {
// 方法中已经完成了pose_transformation和api_generate表的更新不用额外做处理
generateService.getAnimateResult(taskId);
} catch (BusinessException e){
log.warn("万相 animation 生成失败,原因:{}", e.getMessage());
}
}
} else {
poseTransformation.setTaskStatus("Fail");
poseTransformation.setUpdateTime(LocalDateTime.now());
poseTransformationMapper.updateById(poseTransformation);
apiGenerate.setStatus("Fail");
apiGenerate.setUpdateTime(LocalDateTime.now());
apiGenerateService.updateById(apiGenerate);
}
}
}
}
}

View File

@@ -5,10 +5,7 @@ import lombok.extern.slf4j.Slf4j;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Locale;
@@ -101,4 +98,10 @@ public class DateUtil {
return localDate.format(DateTimeFormatter.ofPattern(CommonConstant.TIME_FORMAT_MMM_dd_yyyy_EEEE, Locale.US));
}
public static boolean isMoreThanOneDayApart(LocalDateTime givenDateTime) {
LocalDateTime now = LocalDateTime.now();
Duration duration = Duration.between(givenDateTime, now);
return duration.toHours() >= 24;
}
}

View File

@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
@@ -594,4 +595,19 @@ public class RedisUtil {
return count;
}
public boolean allowRequest(String apiKey) {
String key = "rate_limit:" + apiKey;
ValueOperations<String, String> ops = redisTemplate.opsForValue();
// 使用Redis的INCR命令
Long count = ops.increment(key, 1);
if (count == 1) {
// 第一次调用,设置过期时间
redisTemplate.expire(key, 1, TimeUnit.MINUTES);
}
return count <= 3;
}
}