ソースを参照

Merge remote-tracking branch 'origin/saas-api' into saas-api

ct 1 週間 前
コミット
cb6c12bf18

+ 6 - 0
fs-service/src/main/java/com/fs/quartz/mapper/TenantJobConfigMapper.java

@@ -45,4 +45,10 @@ public interface TenantJobConfigMapper {
                          @Param("syncMessage") String syncMessage);
 
     int updateStatus(@Param("id") Long id, @Param("status") String status);
+
+    /**
+     * 根据模板 ID 批量更新所有租户配置的状态
+     * 用于模板 defaultStatus 变更时同步更新已有配置
+     */
+    int updateStatusByTemplateId(@Param("templateId") Long templateId, @Param("status") String status);
 }

+ 21 - 0
fs-service/src/main/java/com/fs/quartz/service/impl/SysJobTemplateServiceImpl.java

@@ -7,20 +7,27 @@ import com.fs.common.utils.bean.BeanUtils;
 import com.fs.quartz.domain.SysJob;
 import com.fs.quartz.domain.SysJobTemplate;
 import com.fs.quartz.mapper.SysJobTemplateMapper;
+import com.fs.quartz.mapper.TenantJobConfigMapper;
 import com.fs.quartz.service.ISysJobService;
 import com.fs.quartz.service.ISysJobTemplateService;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
 import java.util.List;
 
+@Slf4j
 @Service
 public class SysJobTemplateServiceImpl implements ISysJobTemplateService {
 
     @Resource
     private SysJobTemplateMapper sysJobTemplateMapper;
 
+    @Resource
+    private TenantJobConfigMapper tenantJobConfigMapper;
+
     @Autowired
     private ISysJobService sysJobService;
 
@@ -62,8 +69,14 @@ public class SysJobTemplateServiceImpl implements ISysJobTemplateService {
     }
 
     @Override
+    @Transactional
     @DataSource(DataSourceType.MASTER)
     public int updateTemplate(SysJobTemplate template) {
+        // 查询旧模板,判断 defaultStatus 是否变更
+        SysJobTemplate oldTemplate = sysJobTemplateMapper.selectTemplateById(template.getTemplateId());
+        String oldDefaultStatus = oldTemplate != null ? oldTemplate.getDefaultStatus() : null;
+        String newDefaultStatus = template.getDefaultStatus();
+
         int rows = sysJobTemplateMapper.updateTemplate(template);
 
         // 修改同步Job表
@@ -79,6 +92,14 @@ public class SysJobTemplateServiceImpl implements ISysJobTemplateService {
             }
         }
 
+        // 关键优化:如果 defaultStatus 变更,同步更新所有租户的 config 状态
+        if (rows > 0 && newDefaultStatus != null && !newDefaultStatus.equals(oldDefaultStatus)) {
+            int updatedConfigs = tenantJobConfigMapper.updateStatusByTemplateId(
+                template.getTemplateId(), newDefaultStatus);
+            log.info("[SysJobTemplate] defaultStatus 变更: templateId={}, old={}, new={}, 同步更新 {} 条租户配置",
+                template.getTemplateId(), oldDefaultStatus, newDefaultStatus, updatedConfigs);
+        }
+
         return rows;
     }
 

+ 6 - 0
fs-service/src/main/resources/mapper/quartz/TenantJobConfigMapper.xml

@@ -148,4 +148,10 @@
     <update id="updateStatus">
         update tenant_job_config set status = #{status} where id = #{id}
     </update>
+
+    <!-- 根据模板 ID 批量更新所有租户配置的状态 -->
+    <update id="updateStatusByTemplateId">
+        update tenant_job_config set status = #{status}, update_time = sysdate()
+        where template_id = #{templateId}
+    </update>
 </mapper>

+ 17 - 4
fs-task/src/main/java/com/fs/quartz/config/ScheduleConfig.java

@@ -25,17 +25,30 @@ public class ScheduleConfig {
         factory.setDataSource(dataSource);
 
         Properties prop = new Properties();
+        // 调度器基础配置
         prop.put("org.quartz.scheduler.instanceName", "FsScheduler");
         prop.put("org.quartz.scheduler.instanceId", "AUTO");
+        
+        // 线程池配置 - 增加线程数以避免任务排队
         prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
-        prop.put("org.quartz.threadPool.threadCount", "20");
+        prop.put("org.quartz.threadPool.threadCount", "30");  // 从20增加到30
         prop.put("org.quartz.threadPool.threadPriority", "5");
+        
+        // JobStore 集群配置 - 优化以减少锁竞争
         prop.put("org.quartz.jobStore.isClustered", "true");
-        prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000");
-        prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
+        prop.put("org.quartz.jobStore.clusterCheckinInterval", "10000");  // 从15秒改为10秒,更快检测节点失效
+        prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "5");  // 从1增加到5,批量处理失火
         prop.put("org.quartz.jobStore.txIsolationLevelSerializable", "true");
-        prop.put("org.quartz.jobStore.misfireThreshold", "12000");
+        prop.put("org.quartz.jobStore.misfireThreshold", "60000");  // 从12秒增加到60秒,减少误判失火
         prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
+        
+        // 关键优化:批量获取触发器,减少锁竞争
+        prop.put("org.quartz.scheduler.batchTriggerAcquisitionMaxCount", "10");
+        prop.put("org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow", "30000");  // 30秒窗口
+        
+        // 优化:使用集群锁获取触发器,避免等待锁导致的延迟
+        prop.put("org.quartz.jobStore.acquireTriggersWithinLock", "true");
+        
         factory.setQuartzProperties(prop);
 
         factory.setSchedulerName("FsScheduler");

+ 3 - 2
fs-task/src/main/java/com/fs/quartz/config/ScheduleJobRedisConfig.java

@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit;
 @ConditionalOnProperty(name = "spring.redis.listener.enabled", havingValue = "true", matchIfMissing = false)
 public class ScheduleJobRedisConfig {
 
-    @Bean(initMethod = "start", destroyMethod = "stop")
+    @Bean
     public RedisMessageListenerContainer scheduleJobRedisListenerContainer(
             RedisConnectionFactory connectionFactory,
             ScheduleJobSyncSubscriber scheduleJobSyncSubscriber) {
@@ -31,7 +31,8 @@ public class ScheduleJobRedisConfig {
         container.setConnectionFactory(connectionFactory);
         container.addMessageListener(scheduleJobSyncSubscriber,
                 new ChannelTopic(ScheduleConstants.REDIS_CHANNEL_JOB_SYNC));
-        container.setMaxSubscriptionRegistrationWaitingTime(TimeUnit.SECONDS.toMillis(10));
+        // 增加超时时间到30秒,避免启动时 Redis 连接未就绪导致超时
+        container.setMaxSubscriptionRegistrationWaitingTime(TimeUnit.SECONDS.toMillis(30));
         container.setTaskExecutor(Executors.newFixedThreadPool(2, r -> new Thread(r, "job-sync-listener")));
         log.info("[ScheduleJobRedis] Redis pub/sub 监听器已启用,频道={}", ScheduleConstants.REDIS_CHANNEL_JOB_SYNC);
         return container;

+ 13 - 3
fs-task/src/main/java/com/fs/quartz/util/AbstractQuartzJob.java

@@ -60,8 +60,14 @@ public abstract class AbstractQuartzJob implements Job {
 
         try {
             before(context, sysJob);
-            doExecute(context, sysJob);
-            after(context, sysJob, null);
+            boolean hasExecuted = doExecute(context, sysJob);
+            // 只有实际执行了才记录日志(租户级任务所有租户暂停时不记录)
+            if (hasExecuted) {
+                after(context, sysJob, null);
+            } else {
+                log.debug("任务未实际执行,跳过日志记录: jobId={}, jobName={}",
+                        sysJob.getJobId(), sysJob.getJobName());
+            }
         } catch (Exception e) {
             log.error("任务执行异常:jobId={}, jobName={}", sysJob.getJobId(), sysJob.getJobName(), e);
             after(context, sysJob, e);
@@ -103,7 +109,11 @@ public abstract class AbstractQuartzJob implements Job {
         }
     }
 
-    protected abstract void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception;
+    /**
+     * 执行具体任务
+     * @return true 表示实际执行了任务,需要记录日志;false 表示未实际执行(如所有租户暂停),不记录日志
+     */
+    protected abstract boolean doExecute(JobExecutionContext context, SysJob sysJob) throws Exception;
 
     /** 从主库 sys_job 读取最新状态(Redis 未同步时也能生效) */
     private SysJob refreshJobStatus(SysJob cached) {

+ 41 - 11
fs-task/src/main/java/com/fs/quartz/util/MultiScopeJobDispatcher.java

@@ -49,19 +49,20 @@ public class MultiScopeJobDispatcher {
     @Value("${saas.task.parallel.threads:4}") private int parallelThreads;
     private volatile ExecutorService tenantExecutor;
 
-    public void dispatch(JobExecutionContext context, SysJob sysJob, ScopedJobExecutor executor) throws Exception {
+    public boolean dispatch(JobExecutionContext context, SysJob sysJob, ScopedJobExecutor executor) throws Exception {
         if (!JobInvokeUtil.isInvokeTargetAvailable(sysJob)) {
             log.warn("[MultiScopeJob] invokeTarget 在当前进程不可用,跳过执行: jobName={}, invokeTarget={}, jobGroup={}",
                     sysJob.getJobName(), sysJob.getInvokeTarget(), sysJob.getJobGroup());
-            return;
+            return false;
         }
         String scope = resolveScope(sysJob);
         if (ScheduleConstants.JobScope.TENANT.getValue().equals(scope)) {
             log.info("[MultiScopeJob] 租户级任务: jobName={}, templateId={}", sysJob.getJobName(), sysJob.getJobId());
-            executeTenantJob(context, sysJob, executor);
+            return executeTenantJob(context, sysJob, executor);
         } else {
             log.debug("[MultiScopeJob] 平台级任务: jobName={}", sysJob.getJobName());
             executePlatformJob(context, sysJob, executor);
+            return true;
         }
     }
 
@@ -103,19 +104,40 @@ public class MultiScopeJobDispatcher {
         }
     }
 
-    private void executeTenantJob(JobExecutionContext context, SysJob sysJob, ScopedJobExecutor executor) throws Exception {
+    private boolean executeTenantJob(JobExecutionContext context, SysJob sysJob, ScopedJobExecutor executor) throws Exception {
         List<TenantJobConfig> tenantConfigs = queryAssignedTenants(sysJob);
         if (tenantConfigs.isEmpty()) {
             log.warn("[MultiScopeJob] 租户级任务未分配任何租户: jobName={}, templateId={}",
                     sysJob.getJobName(), sysJob.getJobId());
-            return;
+            return false;
+        }
+        
+        // 关键优化:过滤掉已暂停的租户配置
+        List<TenantJobConfig> enabledConfigs = tenantConfigs.stream()
+            .filter(config -> "0".equals(config.getStatus()))
+            .collect(Collectors.toList());
+        
+        List<Long> pausedTenantIds = tenantConfigs.stream()
+            .filter(config -> !"0".equals(config.getStatus()))
+            .map(TenantJobConfig::getTenantId)
+            .collect(Collectors.toList());
+        
+        if (!pausedTenantIds.isEmpty()) {
+            log.info("[MultiScopeJob] 跳过已暂停的租户: jobName={}, tenantIds={}", 
+                sysJob.getJobName(), pausedTenantIds);
+        }
+        
+        if (enabledConfigs.isEmpty()) {
+            log.warn("[MultiScopeJob] 所有租户配置均已暂停: jobName={}, templateId={}",
+                sysJob.getJobName(), sysJob.getJobId());
+            return false;
         }
-        log.info("[MultiScopeJob] 开始并行执行租户任务: jobName={}, templateId={}, 租户数={}, tenantIds={}",
-                sysJob.getJobName(), sysJob.getJobId(), tenantConfigs.size(),
-                tenantConfigs.stream().map(TenantJobConfig::getTenantId).collect(Collectors.toList()));
-        CountDownLatch latch = new CountDownLatch(tenantConfigs.size());
+        
+        log.info("[MultiScopeJob] 开始并行执行租户任务: jobName={}, templateId={}, 总租户数={}, 启用租户数={}",
+                sysJob.getJobName(), sysJob.getJobId(), tenantConfigs.size(), enabledConfigs.size());
+        CountDownLatch latch = new CountDownLatch(enabledConfigs.size());
         AtomicInteger failCount = new AtomicInteger(0);
-        for (TenantJobConfig config : tenantConfigs) {
+        for (TenantJobConfig config : enabledConfigs) {
             getTenantExecutor().submit(() -> {
                 if (executeForOneTenant(context, sysJob, config, executor)) failCount.incrementAndGet();
                 latch.countDown();
@@ -132,18 +154,26 @@ public class MultiScopeJobDispatcher {
         if (failCount.get() > 0) {
             throw new IllegalStateException("租户任务部分失败: jobName=" + sysJob.getJobId() + ", 失败数=" + failCount.get());
         }
+        return true;
     }
 
     private List<TenantJobConfig> queryAssignedTenants(SysJob sysJob) {
         try {
             DynamicDataSourceContextHolder.setDataSourceType(DataSourceType.MASTER.name());
             Long templateId = resolveTemplateId(sysJob);
+            Long jobId = sysJob.getJobId();
+            log.info("[MultiScopeJob] 查询租户配置: jobId={}, resolvedTemplateId={}, jobName={}",
+                    jobId, templateId, sysJob.getJobName());
             if (templateId == null) {
                 log.warn("[MultiScopeJob] 无法解析 templateId,跳过租户分发: jobName={}, jobId={}",
-                        sysJob.getJobName(), sysJob.getJobId());
+                        sysJob.getJobName(), jobId);
                 return Collections.emptyList();
             }
             List<TenantJobConfig> list = tenantJobConfigMapper.selectTenantsByTemplateId(templateId);
+            int enabledCount = list != null ? (int) list.stream().filter(c -> "0".equals(c.getStatus())).count() : 0;
+            int pausedCount = list != null ? (int) list.stream().filter(c -> !"0".equals(c.getStatus())).count() : 0;
+            log.info("[MultiScopeJob] 查询结果: templateId={}, 总数={}, 启用={}, 暂停={}",
+                    templateId, list != null ? list.size() : 0, enabledCount, pausedCount);
             return list != null ? list : Collections.emptyList();
         } finally {
             DynamicDataSourceContextHolder.clearDataSourceType();

+ 2 - 2
fs-task/src/main/java/com/fs/quartz/util/QuartzDisallowConcurrentExecution.java

@@ -15,8 +15,8 @@ import org.quartz.JobExecutionContext;
 public class QuartzDisallowConcurrentExecution extends AbstractQuartzJob {
 
     @Override
-    protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
+    protected boolean doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
         MultiScopeJobDispatcher dispatcher = SpringUtils.getBean(MultiScopeJobDispatcher.class);
-        dispatcher.dispatch(context, sysJob, (ctx, job) -> JobInvokeUtil.invokeMethod(job));
+        return dispatcher.dispatch(context, sysJob, (ctx, job) -> JobInvokeUtil.invokeMethod(job));
     }
 }

+ 2 - 2
fs-task/src/main/java/com/fs/quartz/util/QuartzJobExecution.java

@@ -13,8 +13,8 @@ import org.quartz.JobExecutionContext;
 public class QuartzJobExecution extends AbstractQuartzJob {
 
     @Override
-    protected void doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
+    protected boolean doExecute(JobExecutionContext context, SysJob sysJob) throws Exception {
         MultiScopeJobDispatcher dispatcher = SpringUtils.getBean(MultiScopeJobDispatcher.class);
-        dispatcher.dispatch(context, sysJob, (ctx, job) -> JobInvokeUtil.invokeMethod(job));
+        return dispatcher.dispatch(context, sysJob, (ctx, job) -> JobInvokeUtil.invokeMethod(job));
     }
 }

+ 116 - 0
fs-task/src/main/java/com/fs/quartz/util/QuartzMonitor.java

@@ -0,0 +1,116 @@
+package com.fs.quartz.util;
+
+import com.fs.quartz.domain.SysJob;
+import com.fs.quartz.mapper.SysJobMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerKey;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+
+/**
+ * Quartz Monitor Tool
+ */
+@Slf4j
+@Component
+public class QuartzMonitor {
+
+    @Autowired
+    private Scheduler scheduler;
+
+    @Autowired
+    private SysJobMapper sysJobMapper;
+
+    private static final DateTimeFormatter TIME_FORMATTER =
+        DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
+
+    /**
+     * Diagnose job execution status
+     */
+    public void diagnoseJobExecution(Long jobId) {
+        log.info("========== Quartz Job Diagnose Start [jobId={}] ==========", jobId);
+        try {
+            SysJob dbJob = sysJobMapper.selectJobById(jobId);
+            if (dbJob == null) {
+                log.warn("Job not found in database: jobId={}", jobId);
+                return;
+            }
+            log.info("Database status: jobId={}, jobName={}, status={}, concurrent={}",
+                jobId, dbJob.getJobName(), dbJob.getStatus(), dbJob.getConcurrent());
+
+            String jobGroup = dbJob.getJobGroup();
+            JobKey jobKey = ScheduleUtils.getJobKey(jobId, jobGroup);
+            TriggerKey triggerKey = ScheduleUtils.getTriggerKey(jobId, jobGroup);
+
+            if (scheduler.checkExists(triggerKey)) {
+                Trigger trigger = scheduler.getTrigger(triggerKey);
+                log.info("Quartz Trigger exists: nextFireTime={}, prevFireTime={}",
+                    trigger.getNextFireTime() != null ? TIME_FORMATTER.format(trigger.getNextFireTime().toInstant()) : "null",
+                    trigger.getPreviousFireTime() != null ? TIME_FORMATTER.format(trigger.getPreviousFireTime().toInstant()) : "null");
+            } else {
+                log.warn("Quartz Trigger not exists");
+            }
+
+            if (scheduler.checkExists(jobKey)) {
+                List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
+                boolean isExecuting = executingJobs.stream()
+                    .anyMatch(ctx -> ctx.getJobDetail().getKey().equals(jobKey));
+                log.info("Quartz Job exists, executing: {}", isExecuting);
+            } else {
+                log.warn("Quartz Job not exists");
+            }
+
+            if ("1".equals(dbJob.getConcurrent())) {
+                log.info("Job is non-concurrent, only one instance can run at same time");
+            }
+        } catch (SchedulerException e) {
+            log.error("Diagnose error", e);
+        }
+        log.info("========== Quartz Job Diagnose End [jobId={}] ==========", jobId);
+    }
+
+    /**
+     * Print executing jobs
+     */
+    public void printExecutingJobs() {
+        log.info("========== Current Executing Jobs ==========");
+        try {
+            List<JobExecutionContext> jobs = scheduler.getCurrentlyExecutingJobs();
+            if (jobs.isEmpty()) {
+                log.info("No executing jobs");
+            } else {
+                jobs.forEach(ctx -> log.info("Executing: jobKey={}", ctx.getJobDetail().getKey()));
+            }
+        } catch (SchedulerException e) {
+            log.error("Get executing jobs failed", e);
+        }
+    }
+
+    /**
+     * Force trigger job
+     */
+    public void forceTriggerJob(Long jobId, String jobGroup) {
+        log.info("Force trigger job: jobId={}", jobId);
+        try {
+            JobKey jobKey = ScheduleUtils.getJobKey(jobId, jobGroup);
+            if (!scheduler.checkExists(jobKey)) {
+                SysJob job = sysJobMapper.selectJobById(jobId);
+                if (job != null) {
+                    ScheduleUtils.createScheduleJob(scheduler, job);
+                }
+            }
+            scheduler.triggerJob(jobKey);
+            log.info("Force trigger success");
+        } catch (Exception e) {
+            log.error("Force trigger failed", e);
+        }
+    }
+}