@@ -1,134 +1,137 @@
package com.ai.da.common.task ;
import com.ai.da.common.config.exception.BusinessException ;
import com.ai.da.common.utils.DateUtil ;
import com.ai.da.mapper.primary.PoseTransformationMapper ;
import com.ai.da.mapper.primary.ToProductImageResultMapper ;
import com.ai.da.mapper.primary.entity.* ;
import com.ai.da.service.APIGenerateService ;
import com.ai.da.service.CreditsService ;
import com.ai.da.service.GenerateService ;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper ;
import io.netty.util.internal.StringUtil ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.scheduling.annotation.Scheduled ;
import org.springframework.stereotype.Component ;
import javax.annotation.Resource ;
import java.time.LocalDateTime ;
import java.util.List ;
import java.util.Objects ;
import static com.ai.da.common.enums.CreditsEventsEnum.TO_PRODUCT_IMAGE ;
@Slf4j
@Component
public class GenerateTask {
@Resource
private APIGenerateService apiGenerateService ;
@Resource
private CreditsService creditsService ;
@Resource
private GenerateService generateService ;
@Resource
private ToProductImageResultMapper toProductImageResultMapper ;
@Resource
private PoseTransformationMapper poseTransformationMapper ;
/*
* 对于使用了第三方api的允许异步获得结果的生成功能, 可能在第三方接口的结果Ready时没有及时存储结果, 导致第三方链接失效
* 万相 24h失效,
* flux 10mins失效 ( 使用了flux接口的功能 ToProductImage || Relight, Pattern这里不做补偿)
* 故这里通过定时任务做补偿
* flux五分钟查询一次, 万相1小时查询一次
*/
@Scheduled ( cron = " 0 */4 * * * ? " )
public void fluxCompensationMechanism ( ) {
// 1、查所有 任务还没成功、还没失败, 正在等待或者执行中的任务id有哪些
// ( 由于获取结果的polling_url在redis中只存一天, 大部分结果超过一天之后就无法再找到任务, 小部分可以通过公共路径查到结果)
List < APIGenerate > apiGenerates = apiGenerateService . getPendingTaskByStatus ( " flux " ) ;
if ( apiGenerates ! = null & & ! apiGenerates . isEmpty ( ) ) {
for ( APIGenerate apiGenerate : apiGenerates ) {
String taskId = apiGenerate . getTaskId ( ) ;
// 1. 根据taskId查toProductImageResult, 判断当前任务状态与超时状态
ToProductImageResult toProductImageResult = toProductImageResultMapper . selectOne ( new QueryWrapper < ToProductImageResult > ( ) . eq ( " task_id " , taskId ) ) ;
if ( Objects . nonNull ( toProductImageResult ) & & " Pending " . equals ( toProductImageResult . getStatus ( ) ) ) {
// 判断当前任务的超时状态
if ( ! DateUtil . isMoreThanOneDayApart ( toProductImageResult . getCreateTime ( ) ) ) {
// 1. 未超时,获取当前任务结果
String fileName = toProductImageResult . getResultType ( ) . equals ( TO_PRODUCT_IMAGE . getName ( ) ) ? " product_image " : " relight_image " ;
String objectName = apiGenerate . getAccountId ( ) + " / " + fileName + " / " + taskId + " .png " ;
String fluxResult = generateService . getFluxResult ( taskId , objectName ) ;
// 2. 成功, 获取结果, 下载图片, 上传至minio,更新toProductImageResult表
if ( StringUtil . isNullOrEmpty ( fluxResult ) | | fluxResult . equals ( " Fail " ) ) {
toProductImageResult . setStatus ( " Fail " ) ;
toProductImageResultMapper . updateById ( toProductImageResult ) ;
apiGenerate . setStatus ( " Fail " ) ;
apiGenerate . setUpdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
} else if ( ! fluxResult . equals ( " Pending " ) ) {
if ( StringUtil . isNullOrEmpty ( toProductImageResult . getUrl ( ) ) ) {
toProductImageResult . setStatus ( " Success " ) ;
toProductImageResult . setUrl ( fluxResult ) ;
toProductImageResultMapper . updateById ( toProductImageResult ) ;
apiGenerate . setStatus ( " Success " ) ;
apiGenerate . setUpdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
}
// 扣积分
Boolean flag = creditsService . taskCreditsDeduction ( apiGenerate . getAccountId ( ) , taskId ) ;
if ( flag ) creditsService . updateChangedCredits ( String . valueOf ( apiGenerate . getAccountId ( ) ) , taskId ) ;
}
} else {
// 超时,设置状态为失败
toProductImageResult . setStatus ( " Fail " ) ;
toProductImageResultMapper . updateById ( toProductImageResult ) ;
apiGenerate . setStatus ( " Fail " ) ;
apiGenerate . setUpdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
}
}
}
}
}
// 万相 -> pose transformation 补偿 一小时执行一次
@Scheduled ( fixedDelay = 60 * 60 * 1000 )
public void wxCompensationMechanism ( ) {
List < APIGenerate > apiGenerates = apiGenerateService . getPendingTaskByStatus ( " wx " ) ;
if ( apiGenerates ! = null & & ! apiGenerates . isEmpty ( ) ) {
for ( APIGenerate apiGenerate : apiGenerates ) {
String taskId = apiGenerate . getTaskId ( ) ;
PoseTransformation poseTransformation = poseTransformationMapper . selectOne ( new QueryWrapper < PoseTransformation > ( ) . eq ( " unique_id " , taskId ) ) ;
if ( Objects . nonNull ( poseTransformation ) & & " Pending " . equals ( poseTransformation . getTaskStatus ( ) ) ) {
// 判断当前任务的超时状态
if ( ! DateUtil . isMoreThanOneDayApart ( poseTransformation . getCreateTime ( ) ) ) {
try {
// 方法中已经完成了pose_transformation和api_generate表的更新, 不用额外做处理
generateService . getAnimateResult ( taskId ) ;
} catch ( BusinessException e ) {
log . warn ( " 万相 animation 生成失败,原因:{} " , e . getMessage ( ) ) ;
}
}
} else {
poseTransformation . setTaskStatus ( " Fail " ) ;
poseTransformation . setUpdateTime ( LocalDateTime . now ( ) ) ;
poseTransformationMapper . updateById ( poseTransformation ) ;
apiGenerate . setStatus ( " Fail " ) ;
apiGenerate . setU pdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
}
}
}
}
}
package com.ai.da.common.task ;
import com.ai.da.common.config.exception.BusinessException ;
import com.ai.da.common.utils.DateUtil ;
import com.ai.da.mapper.primary.PoseTransformationMapper ;
import com.ai.da.mapper.primary.ToProductImageResultMapper ;
import com.ai.da.mapper.primary.entity.* ;
import com.ai.da.service.APIGenerateService ;
import com.ai.da.service.CreditsService ;
import com.ai.da.service.GenerateService ;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper ;
import io.netty.util.internal.StringUtil ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.scheduling.annotation.Scheduled ;
import org.springframework.stereotype.Component ;
import javax.annotation.Resource ;
import java.time.LocalDateTime ;
import java.util.List ;
import java.util.Objects ;
import static com.ai.da.common.enums.CreditsEventsEnum.TO_PRODUCT_IMAGE ;
@Slf4j
@Component
public class GenerateTask {
@Resource
private APIGenerateService apiGenerateService ;
@Resource
private CreditsService creditsService ;
@Resource
private GenerateService generateService ;
@Resource
private ToProductImageResultMapper toProductImageResultMapper ;
@Resource
private PoseTransformationMapper poseTransformationMapper ;
/*
* 对于使用了第三方api的允许异步获得结果的生成功能, 可能在第三方接口的结果Ready时没有及时存储结果, 导致第三方链接失效
* 万相 24h失效,
* flux 10mins失效 ( 使用了flux接口的功能 ToProductImage || Relight, Pattern这里不做补偿)
* 故这里通过定时任务做补偿
* flux五分钟查询一次, 万相1小时查询一次
*/
@Scheduled ( cron = " 0 */4 * * * ? " )
public void fluxCompensationMechanism ( ) {
// 1、查所有 任务还没成功、还没失败, 正在等待或者执行中的任务id有哪些
// ( 由于获取结果的polling_url在redis中只存一天, 大部分结果超过一天之后就无法再找到任务, 小部分可以通过公共路径查到结果)
List < APIGenerate > apiGenerates = apiGenerateService . getPendingTaskByStatus ( " flux " ) ;
if ( apiGenerates ! = null & & ! apiGenerates . isEmpty ( ) ) {
for ( APIGenerate apiGenerate : apiGenerates ) {
String taskId = apiGenerate . getTaskId ( ) ;
// 1. 根据taskId查toProductImageResult, 判断当前任务状态与超时状态
ToProductImageResult toProductImageResult = toProductImageResultMapper . selectOne ( new QueryWrapper < ToProductImageResult > ( ) . eq ( " task_id " , taskId ) ) ;
if ( Objects . nonNull ( toProductImageResult ) & & " Pending " . equals ( toProductImageResult . getStatus ( ) ) ) {
// 判断当前任务的超时状态
if ( ! DateUtil . isMoreThanOneDayApart ( toProductImageResult . getCreateTime ( ) ) ) {
// 1. 未超时,获取当前任务结果
String fileName = toProductImageResult . getResultType ( ) . equals ( TO_PRODUCT_IMAGE . getName ( ) ) ? " product_image " : " relight_image " ;
String objectName = apiGenerate . getAccountId ( ) + " / " + fileName + " / " + taskId + " .png " ;
String fluxResult = generateService . getFluxResult ( taskId , objectName ) ;
// 2. 成功, 获取结果, 下载图片, 上传至minio,更新toProductImageResult表
if ( StringUtil . isNullOrEmpty ( fluxResult ) | | fluxResult . equals ( " Fail " ) ) {
toProductImageResult . setStatus ( " Fail " ) ;
toProductImageResultMapper . updateById ( toProductImageResult ) ;
apiGenerate . setStatus ( " Fail " ) ;
apiGenerate . setUpdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
} else if ( ! fluxResult . equals ( " Pending " ) ) {
if ( StringUtil . isNullOrEmpty ( toProductImageResult . getUrl ( ) ) ) {
toProductImageResult . setStatus ( " Success " ) ;
toProductImageResult . setUrl ( fluxResult ) ;
toProductImageResultMapper . updateById ( toProductImageResult ) ;
apiGenerate . setStatus ( " Success " ) ;
apiGenerate . setUpdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
}
// 扣积分
Boolean flag = creditsService . taskCreditsDeduction ( apiGenerate . getAccountId ( ) , taskId ) ;
if ( flag ) creditsService . updateChangedCredits ( String . valueOf ( apiGenerate . getAccountId ( ) ) , taskId ) ;
}
} else {
// 超时,设置状态为失败
toProductImageResult . setStatus ( " Fail " ) ;
toProductImageResultMapper . updateById ( toProductImageResult ) ;
apiGenerate . setStatus ( " Fail " ) ;
apiGenerate . setUpdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
}
}
}
}
}
// 万相 -> pose transformation 补偿 一小时执行一次
@Scheduled ( fixedDelay = 60 * 60 * 1000 )
public void wxCompensationMechanism ( ) {
List < APIGenerate > apiGenerates = apiGenerateService . getPendingTaskByStatus ( " wx " ) ;
if ( apiGenerates ! = null & & ! apiGenerates . isEmpty ( ) ) {
for ( APIGenerate apiGenerate : apiGenerates ) {
String taskId = apiGenerate . getTaskId ( ) ;
PoseTransformation poseTransformation = poseTransformationMapper . selectOne ( new QueryWrapper < PoseTransformation > ( ) . eq ( " unique_id " , taskId ) ) ;
if ( Objects . nonNull ( poseTransformation ) & & " Pending " . equals ( poseTransformation . getTaskStatus ( ) ) ) {
// 判断当前任务的超时状态
if ( ! DateUtil . isMoreThanOneDayApart ( poseTransformation . getCreateTime ( ) ) ) {
try {
// 方法中已经完成了pose_transformation和api_generate表的更新, 不用额外做处理
PoseTransformationVO animateResult = generateService. getAnimateResult ( taskId ) ;
if ( animateResult . getStatus ( ) . equals ( " Success " ) ) {
sendSysMsgToUser ( poseTransformation . getAccountId ( ) , " 您的姿势变换生成任务已完成 " ) ;
}
} catch ( BusinessException e ) {
log . warn ( " 万相 animation 生成失败,原因:{} " , e . getMessage ( ) ) ;
}
}
} else {
poseTransformation . setTaskStatus ( " Fail " ) ;
poseTransformation . setUpdateTime ( LocalDateTime . now ( ) ) ;
poseTransformationMapper . u pdateById ( poseTransformation ) ;
apiGenerate . setStatus ( " Fail " ) ;
apiGenerate . setUpdateTime ( LocalDateTime . now ( ) ) ;
apiGenerateService . updateById ( apiGenerate ) ;
}
}
}
}
}