Skip to content

XXL-JOB分片任务与并行调度

分片任务的背景与价值

在实际业务场景中,经常会遇到需要处理大量数据的定时任务,例如:每日对账、批量发送通知、数据清洗等。如果这些任务只在单个执行器上运行,可能需要很长时间才能完成,无法满足业务的时效性要求。

XXL-JOB的分片任务功能允许将一个大任务拆分成多个子任务,分配给不同的执行器并行处理,从而大幅提升任务执行效率。

分片任务的核心原理

分片参数传递机制

当调度中心触发分片任务时,会向每个执行器实例传递两个关键参数:

  • 分片索引(shardIndex):当前执行器的编号,从0开始
  • 分片总数(shardTotal):参与执行的执行器总数
mermaid
flowchart TB
    A([调度中心]) --> B{触发分片任务}
    B --> C([执行器1<br/>shardIndex=0<br/>shardTotal=3])
    B --> D([执行器2<br/>shardIndex=1<br/>shardTotal=3])
    B --> E([执行器3<br/>shardIndex=2<br/>shardTotal=3])
    
    C --> F([处理数据0,3,6,9...])
    D --> G([处理数据1,4,7,10...])
    E --> H([处理数据2,5,8,11...])
    
    style A fill:#FF6B6B,stroke:#C92A2A,stroke-width:2px,rx:10,ry:10
    style C fill:#4ECDC4,stroke:#1A535C,stroke-width:2px,rx:10,ry:10
    style D fill:#45B7D1,stroke:#2980B9,stroke-width:2px,rx:10,ry:10
    style E fill:#6BCB77,stroke:#2D6A4F,stroke-width:2px,rx:10,ry:10
    style F fill:#4ECDC4,stroke:#1A535C,stroke-width:2px,rx:10,ry:10
    style G fill:#45B7D1,stroke:#2980B9,stroke-width:2px,rx:10,ry:10
    style H fill:#6BCB77,stroke:#2D6A4F,stroke-width:2px,rx:10,ry:10

路由策略配置

在XXL-JOB管理后台配置任务时,需要将路由策略设置为"分片广播",这样调度中心会将任务广播到所有在线的执行器实例。

分片任务实现详解

基于取模的数据分片

最常用的分片策略是对数据的唯一标识(如ID)进行取模运算,确保数据被均匀分配到各个执行器:

java
/**
 * 商品库存同步任务
 * 演示基于商品ID取模的分片处理
 */
@Component
public class ProductStockSyncHandler {
    
    @Autowired
    private ProductService productService;
    
    @Autowired
    private InventoryService inventoryService;
    
    @XxlJob("productStockSyncHandler")
    public void syncProductStock() {
        // 获取分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        XxlJobHelper.log("分片参数: 当前分片={}, 总分片数={}", 
            shardIndex, shardTotal);
        
        // 分页查询待同步的商品
        int pageSize = 500;
        int pageNum = 0;
        
        while (true) {
            List<Product> products = productService.queryProducts(pageNum, pageSize);
            
            if (products.isEmpty()) {
                break;
            }
            
            for (Product product : products) {
                // 通过商品ID取模判断是否由当前分片处理
                if (product.getId() % shardTotal == shardIndex) {
                    try {
                        // 同步商品库存到缓存
                        syncToCache(product);
                        XxlJobHelper.log("同步成功: 商品ID={}", product.getId());
                    } catch (Exception e) {
                        XxlJobHelper.log("同步失败: 商品ID={}, 错误={}", 
                            product.getId(), e.getMessage());
                    }
                }
            }
            
            pageNum++;
        }
        
        XxlJobHelper.log("分片{}处理完成", shardIndex);
    }
    
    private void syncToCache(Product product) {
        // 查询实时库存
        Integer stock = inventoryService.getStock(product.getId());
        // 更新缓存
        redisTemplate.opsForValue().set(
            "stock:" + product.getId(), 
            stock, 
            Duration.ofMinutes(30)
        );
    }
}

分片执行流程

mermaid
sequenceDiagram
    participant Admin as 调度中心
    participant E1 as 执行器1
    participant E2 as 执行器2
    participant E3 as 执行器3
    participant DB as 数据库
    
    Admin->>E1: 触发任务(shardIndex=0, shardTotal=3)
    Admin->>E2: 触发任务(shardIndex=1, shardTotal=3)
    Admin->>E3: 触发任务(shardIndex=2, shardTotal=3)
    
    par 并行执行
        E1->>DB: 查询数据
        E1->>E1: 过滤 id%3==0 的数据
        E1->>E1: 处理数据
    and
        E2->>DB: 查询数据
        E2->>E2: 过滤 id%3==1 的数据
        E2->>E2: 处理数据
    and
        E3->>DB: 查询数据
        E3->>E3: 过滤 id%3==2 的数据
        E3->>E3: 处理数据
    end
    
    E1-->>Admin: 执行完成
    E2-->>Admin: 执行完成
    E3-->>Admin: 执行完成

高级分片策略

基于数据库分片查询

直接在数据库查询时进行分片过滤,避免查询全量数据:

java
/**
 * 会员积分过期清理任务
 * 基于数据库分片查询的高效实现
 */
@Component
public class MemberPointsExpireHandler {
    
    @Autowired
    private MemberPointsMapper memberPointsMapper;
    
    @XxlJob("memberPointsExpireHandler")
    public void clearExpiredPoints() {
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        // 查询当前分片负责的过期积分记录
        // SQL: WHERE member_id % #{shardTotal} = #{shardIndex} 
        //      AND expire_time < NOW()
        List<MemberPoints> expiredPoints = memberPointsMapper.queryExpiredBySharding(
            shardIndex, 
            shardTotal,
            LocalDateTime.now()
        );
        
        XxlJobHelper.log("当前分片查询到{}条过期积分记录", expiredPoints.size());
        
        for (MemberPoints points : expiredPoints) {
            try {
                // 记录积分变动日志
                logPointsChange(points);
                
                // 清零过期积分
                memberPointsMapper.clearExpiredPoints(points.getId());
                
                // 发送过期通知
                sendExpireNotification(points.getMemberId());
                
            } catch (Exception e) {
                XxlJobHelper.log("处理失败: 记录ID={}, 错误={}", 
                    points.getId(), e.getMessage());
            }
        }
    }
}

对应的Mapper查询:

java
/**
 * 会员积分Mapper
 */
@Mapper
public interface MemberPointsMapper {
    
    /**
     * 按分片查询过期积分
     */
    @Select("SELECT * FROM member_points " +
            "WHERE member_id % #{shardTotal} = #{shardIndex} " +
            "AND expire_time < #{expireTime} " +
            "AND status = 'ACTIVE'")
    List<MemberPoints> queryExpiredBySharding(
        @Param("shardIndex") int shardIndex,
        @Param("shardTotal") int shardTotal,
        @Param("expireTime") LocalDateTime expireTime
    );
}

基于业务规则的分片

根据业务特性设计分片规则,实现更灵活的任务分配:

java
/**
 * 多地区物流状态同步任务
 * 基于地区编码的业务分片
 */
@Component
public class RegionalLogisticsSyncHandler {
    
    // 地区编码与分片的映射
    private static final Map<String, Integer> REGION_SHARD_MAP = new HashMap<>();
    
    static {
        // 华东地区 -> 分片0
        REGION_SHARD_MAP.put("SH", 0);
        REGION_SHARD_MAP.put("JS", 0);
        REGION_SHARD_MAP.put("ZJ", 0);
        
        // 华北地区 -> 分片1
        REGION_SHARD_MAP.put("BJ", 1);
        REGION_SHARD_MAP.put("TJ", 1);
        REGION_SHARD_MAP.put("HB", 1);
        
        // 华南地区 -> 分片2
        REGION_SHARD_MAP.put("GD", 2);
        REGION_SHARD_MAP.put("GX", 2);
        REGION_SHARD_MAP.put("HN", 2);
    }
    
    @XxlJob("regionalLogisticsSyncHandler")
    public void syncLogisticsStatus() {
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        // 获取当前分片负责的地区
        List<String> myRegions = REGION_SHARD_MAP.entrySet().stream()
            .filter(entry -> entry.getValue() == shardIndex)
            .map(Map.Entry::getKey)
            .collect(Collectors.toList());
        
        XxlJobHelper.log("分片{}负责的地区: {}", shardIndex, myRegions);
        
        for (String regionCode : myRegions) {
            syncRegionLogistics(regionCode);
        }
    }
    
    private void syncRegionLogistics(String regionCode) {
        // 查询该地区的待更新物流单
        List<LogisticsOrder> orders = logisticsService.queryByRegion(regionCode);
        
        for (LogisticsOrder order : orders) {
            // 调用物流API查询最新状态
            LogisticsStatus status = logisticsApi.queryStatus(order.getTrackingNo());
            
            // 更新物流状态
            logisticsService.updateStatus(order.getId(), status);
        }
    }
}

分片任务的数据一致性保障

避免数据遗漏与重复

在分片任务中,必须确保每条数据恰好被一个执行器处理:

mermaid
flowchart TB
    subgraph 正确的分片设计
        A1([数据ID=1]) --> B1([分片0处理<br/>1%3=1?×])
        A1 --> C1([分片1处理<br/>1%3=1?✓])
        A1 --> D1([分片2处理<br/>1%3=1?×])
    end
    
    subgraph 数据覆盖验证
        E([总数据量N]) --> F{N%shardTotal}
        F -->|余数为0| G([完美均分])
        F -->|余数不为0| H([前几个分片多处理1条])
    end
    
    style C1 fill:#6BCB77,stroke:#2D6A4F,stroke-width:2px,rx:10,ry:10
    style G fill:#4ECDC4,stroke:#1A535C,stroke-width:2px,rx:10,ry:10
    style H fill:#45B7D1,stroke:#2980B9,stroke-width:2px,rx:10,ry:10

处理执行器动态扩缩容

当执行器数量发生变化时,需要注意分片参数的变化:

java
/**
 * 消息推送任务
 * 处理执行器扩缩容场景
 */
@Component
public class MessagePushHandler {
    
    @XxlJob("messagePushHandler")
    public void pushMessages() {
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        // 记录本次执行的分片配置
        XxlJobHelper.log("本次执行分片配置: index={}, total={}", 
            shardIndex, shardTotal);
        
        // 使用数据库记录处理进度,支持断点续传
        String progressKey = "message_push_progress_" + shardIndex + "_" + shardTotal;
        Long lastProcessedId = getProgress(progressKey);
        
        while (true) {
            // 查询待推送消息(增量查询)
            List<PushMessage> messages = messageMapper.queryUnpushed(
                shardIndex, 
                shardTotal,
                lastProcessedId,
                1000  // 每批处理1000条
            );
            
            if (messages.isEmpty()) {
                break;
            }
            
            for (PushMessage message : messages) {
                // 推送消息
                pushService.push(message);
                lastProcessedId = message.getId();
            }
            
            // 更新处理进度
            saveProgress(progressKey, lastProcessedId);
        }
    }
}

分片任务最佳实践

分片数量规划

mermaid
flowchart LR
    A([数据量评估]) --> B{数据量级别}
    B -->|万级| C([2-3个分片])
    B -->|十万级| D([5-10个分片])
    B -->|百万级| E([10-20个分片])
    B -->|千万级| F([20-50个分片])
    
    C --> G([执行器数量=分片数])
    D --> G
    E --> G
    F --> G
    
    style A fill:#FF6B6B,stroke:#C92A2A,stroke-width:2px,rx:10,ry:10
    style G fill:#6BCB77,stroke:#2D6A4F,stroke-width:2px,rx:10,ry:10

任务幂等性设计

分片任务应当设计为幂等操作,即使同一数据被重复处理也不会产生副作用:

java
/**
 * 账户余额对账任务
 * 幂等性设计示例
 */
@Component
public class AccountReconciliationHandler {
    
    @Autowired
    private ReconciliationService reconciliationService;
    
    @XxlJob("accountReconciliationHandler")
    public void reconcile() {
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        // 获取今日对账批次号
        String batchNo = generateDailyBatchNo();
        
        List<Account> accounts = accountMapper.queryBySharding(shardIndex, shardTotal);
        
        for (Account account : accounts) {
            // 幂等检查:判断该账户今日是否已对账
            if (reconciliationService.isReconciled(account.getId(), batchNo)) {
                XxlJobHelper.log("账户{}今日已对账,跳过", account.getId());
                continue;
            }
            
            try {
                // 执行对账逻辑
                ReconciliationResult result = doReconcile(account);
                
                // 记录对账结果(包含批次号,保证幂等)
                reconciliationService.saveResult(account.getId(), batchNo, result);
                
            } catch (Exception e) {
                XxlJobHelper.log("账户{}对账失败: {}", account.getId(), e.getMessage());
            }
        }
    }
    
    private String generateDailyBatchNo() {
        return "RECON_" + LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
    }
}

异常处理与告警

java
/**
 * 分片任务异常处理模板
 */
@Component
public class ShardingTaskTemplate {
    
    @Autowired
    private AlertService alertService;
    
    public void executeWithMonitoring(ShardingTask task) {
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        long startTime = System.currentTimeMillis();
        int successCount = 0;
        int failCount = 0;
        List<String> errors = new ArrayList<>();
        
        try {
            // 执行分片任务
            ShardingResult result = task.execute(shardIndex, shardTotal);
            successCount = result.getSuccessCount();
            failCount = result.getFailCount();
            errors = result.getErrors();
            
        } catch (Exception e) {
            // 任务整体异常
            alertService.sendAlert(
                "分片任务执行异常",
                String.format("分片%d/%d执行失败: %s", shardIndex, shardTotal, e.getMessage())
            );
            throw e;
            
        } finally {
            long duration = System.currentTimeMillis() - startTime;
            
            // 记录执行统计
            XxlJobHelper.log("分片{}/{} 执行完成: 耗时={}ms, 成功={}, 失败={}",
                shardIndex, shardTotal, duration, successCount, failCount);
            
            // 失败率告警
            if (failCount > 0 && failCount * 1.0 / (successCount + failCount) > 0.1) {
                alertService.sendAlert(
                    "分片任务失败率过高",
                    String.format("分片%d失败率超过10%%,失败数: %d", shardIndex, failCount)
                );
            }
        }
    }
}

分片任务与普通任务对比

特性普通任务分片任务
执行器选择单个执行器所有在线执行器
处理能力受限于单机性能可水平扩展
适用场景小数据量任务大数据量批处理
实现复杂度简单需要设计分片逻辑
容错能力单点故障影响大局部故障影响小

总结

XXL-JOB的分片任务功能为大数据量的定时任务处理提供了高效的解决方案:

  • 核心机制:调度中心广播任务到所有执行器,每个执行器根据分片参数处理对应的数据切片
  • 分片策略:支持ID取模、数据库分片查询、业务规则分片等多种实现方式
  • 最佳实践:合理规划分片数量、设计幂等性逻辑、完善异常处理机制

通过分片任务,原本需要数小时才能完成的大批量数据处理,可以在多个执行器的并行协作下大幅缩短执行时间,有效提升系统的数据处理能力。

更新: 2025-12-04 17:41:12
原文: https://www.yuque.com/u22210564/zoxfmt/doc-29-xxl-job-03-job

Java 后端面试知识库