业务讲解-节目服务的数据初始化统一管理
在讲解关于项目服务的业务时,会有从redis和elasticsearch中获取数据的流程,但并没有将是什么时候将这些数据放入到redis和elasticsearch中的,在本文中,将详细的介绍节目服务中的数据初始化流程
在节目服务的数据初始化操作时,使用了damai-service-initialize初始化统一的组件,来实现对初始化操作的统一管理和执行顺序的管理,关于此组件的详细介绍可跳转到相应的文档
节目服务的初始化操作是使用了组件中的AbstractApplicationPostConstructHandler,也就是**@PostConstruct **注解修饰的方法
初始化的操作有四个:
- ProgramCategoryInitData 将节目类型放到redis中
- ProgramShowTimeRenewal 将节目演出时间进行更新
- ProgramElasticsearchInitData 将节目的数据放到elasticsearch中
- ProgramBloomFilterInit 将节目的id集合放入到布隆过滤器中
并且这三者是由执行顺序的
ProgramCategoryInitData ------>
ProgramShowTimeRenewal ------>
**ProgramElasticsearchInitData **------>
**ProgramBloomFilterInit **
下面来一一讲解各自的执行流程
节目类型缓存初始化
com.damai.service.init.ProgramCategoryInitData
@Component
public class ProgramCategoryInitData extends AbstractApplicationPostConstructHandler {
@Autowired
private ProgramCategoryService programCategoryService;
@Override
public Integer executeOrder() {
return 1;
}
@Override
public void executeInit(final ConfigurableApplicationContext context) {
BusinessThreadPool.execute(() -> {
programCategoryService.programCategoryRedisDataInit();
});
}
}@ServiceLock(lockType= LockType.Write,name = PROGRAM_CATEGORY_LOCK,keys = {"#all"})
public Map<String, ProgramCategory> programCategoryRedisDataInit(){
Map<String, ProgramCategory> programCategoryMap = new HashMap<>(64);
//从数据库中查询
QueryWrapper<ProgramCategory> lambdaQueryWrapper = Wrappers.emptyWrapper();
List<ProgramCategory> programCategoryList = programCategoryMapper.selectList(lambdaQueryWrapper);
if (CollectionUtil.isNotEmpty(programCategoryList)) {
//放入到redis中
programCategoryMap = programCategoryList.stream().collect(
Collectors.toMap(p -> String.valueOf(p.getId()), p -> p, (v1, v2) -> v2));
redisCache.putHash(RedisKeyBuild.createRedisKey(RedisKeyManage.PROGRAM_CATEGORY_HASH),programCategoryMap);
}
return programCategoryMap;
}其实流程并不复杂,ProgramCategoryInitData最先执行,执行流程:
- 使用线程池异步执行
- 加上分布式的写锁,这是为了防止有其他的实例添加到新的节目类型可能会产生的并发问题
- 从数据库中查询所有的数据
- 将数据批量放入到redis中,结构为Hash。Hash的键值为d_mai_program_category_hash,Hash的key为节目类型的id,Hash的value为节目类型对象
有人可能会担心,每次启动都执行一次放入redis的操作,会有性能影响吗?其实多虑了,节目类型作为字典表,数据其实也就几十条,修改的频率更是很小,每次执行放入redis性能并不会有问题,而且还是异步执行,不会影响主流程的启动
节目类型字典表中所有的数据:
{
"code": "0",
"message": "",
"data": [
{
"id": "1",
"parentId": "0",
"name": "演唱会",
"type": "1"
},
{
"id": "2",
"parentId": "0",
"name": "话剧歌剧",
"type": "1"
},
{
"id": "3",
"parentId": "0",
"name": "体育",
"type": "1"
},
{
"id": "4",
"parentId": "0",
"name": "儿童亲子",
"type": "1"
},
{
"id": "5",
"parentId": "0",
"name": "展览休闲",
"type": "1"
},
{
"id": "6",
"parentId": "0",
"name": "音乐会",
"type": "1"
},
{
"id": "7",
"parentId": "0",
"name": "曲苑杂坛",
"type": "1"
},
{
"id": "8",
"parentId": "0",
"name": "舞蹈芭蕾",
"type": "1"
},
{
"id": "9",
"parentId": "0",
"name": "二次元",
"type": "1"
},
{
"id": "10",
"parentId": "0",
"name": "旅游展览",
"type": "1"
},
{
"id": "11",
"parentId": "1",
"name": "livehouse",
"type": "2"
},
{
"id": "12",
"parentId": "1",
"name": "其他",
"type": "2"
},
{
"id": "13",
"parentId": "1",
"name": "音乐节",
"type": "2"
},
{
"id": "14",
"parentId": "2",
"name": "话剧",
"type": "2"
},
{
"id": "15",
"parentId": "2",
"name": "音乐剧",
"type": "2"
},
{
"id": "16",
"parentId": "3",
"name": "其他",
"type": "2"
},
{
"id": "17",
"parentId": "3",
"name": "球类运动",
"type": "2"
},
{
"id": "18",
"parentId": "5",
"name": "展会",
"type": "2"
},
{
"id": "19",
"parentId": "5",
"name": "特色体验",
"type": "2"
},
{
"id": "20",
"parentId": "7",
"name": "其他",
"type": "2"
},
{
"id": "21",
"parentId": "7",
"name": "戏曲",
"type": "2"
}
]
}节目演出时间进行更新
com.damai.service.init.ProgramShowTimeRenewal
@Component
public class ProgramShowTimeRenewal extends AbstractApplicationPostConstructHandler {
@Autowired
private ProgramShowTimeService programShowTimeService;
@Autowired
private ProgramService programService;
@Autowired
private BusinessEsHandle businessEsHandle;
@Override
public Integer executeOrder() {
return 2;
}
@Override
public void executeInit(final ConfigurableApplicationContext context) {
//判断节目演出时间是否过期,如果过期了,则更新时间,并返回已经更新演出时间的节目id
Set<Long> programIdSet = programShowTimeService.renewal();
if (!programIdSet.isEmpty()) {
//如果更新了,将elasticsearch的整个索引和数据都删除
boolean result = businessEsHandle.checkIndex(SpringUtil.getPrefixDistinctionName() + "-" +
ProgramDocumentParamName.INDEX_NAME, ProgramDocumentParamName.INDEX_TYPE);
if (result) {
businessEsHandle.deleteIndex(SpringUtil.getPrefixDistinctionName() + "-" +
ProgramDocumentParamName.INDEX_NAME);
}
for (Long programId : programIdSet) {
//将redis中的数据也删除
programService.delRedisData(programId);
//将本地缓存数据也删除
programService.delLocalCache(programId);
}
}
}
}这个初始化的作用是**为了方便大家在学习项目时,防止时间长了后,节目的演出时间会有过期的情况,**所以每当项目启动时,会去检查节目中的演出时间是否过期,然后将过期时间更新,如果执行了更新的操作,那么就把Elasticsearch中的数据和Redis中的数据都删除掉
真实生产环境中肯定不会这么做的,都是有新的节目上线后来和举办方制定演出时间
执行更新操作
com.damai.service.ProgramShowTimeService#renewal
@Transactional(rollbackFor = Exception.class)
public Set<Long> renewal(){
Set<Long> programIdSet = new HashSet<>();
//查询演出时间小于2天前的节目演出数据
LambdaQueryWrapper<ProgramShowTime> programShowTimeLambdaQueryWrapper =
Wrappers.lambdaQuery(ProgramShowTime.class).
le(ProgramShowTime::getShowTime, DateUtils.addDay(DateUtils.now(), 2));
List<ProgramShowTime> programShowTimes = programShowTimeMapper.selectList(programShowTimeLambdaQueryWrapper);
List<ProgramShowTime> newProgramShowTimes = new ArrayList<>(programShowTimes.size());
for (ProgramShowTime programShowTime : programShowTimes) {
//要更新演出时间的节目id
programIdSet.add(programShowTime.getProgramId());
//记录中现有的演出时间
Date oldShowTime = programShowTime.getShowTime();
//将现有的演出时间加上一个月作为新的演出时间
Date newShowTime = DateUtils.addMonth(oldShowTime, 1);
//当前时间
Date nowDateTime = DateUtils.now();
//如果新的演出时间还是小于当前时间,则继续再加一个月,直到新的演出时间大于当前时间为止
while (newShowTime.before(nowDateTime)) {
newShowTime = DateUtils.addMonth(newShowTime, 1);
}
//构建要更新的数据
Date newShowDayTime = DateUtils.parseDateTime(DateUtils.formatDate(newShowTime) + " 00:00:00");
ProgramShowTime updateProgramShowTime = new ProgramShowTime();
updateProgramShowTime.setShowTime(newShowTime);
updateProgramShowTime.setShowDayTime(newShowDayTime);
updateProgramShowTime.setShowWeekTime(DateUtils.getWeekStr(newShowTime));
LambdaUpdateWrapper<ProgramShowTime> programShowTimeLambdaUpdateWrapper =
Wrappers.lambdaUpdate(ProgramShowTime.class)
.eq(ProgramShowTime::getProgramId, programShowTime.getProgramId())
.eq(ProgramShowTime::getId,programShowTime.getId());
//进行数据库更新节目演出时间
programShowTimeMapper.update(updateProgramShowTime,programShowTimeLambdaUpdateWrapper);
ProgramShowTime newProgramShowTime = new ProgramShowTime();
newProgramShowTime.setProgramId(programShowTime.getProgramId());
newProgramShowTime.setShowTime(newShowTime);
newProgramShowTimes.add(newProgramShowTime);
}
//节目组最近演出时间更新
Map<Long,Date> programGroupMap = new HashMap<>(newProgramShowTimes.size());
for (ProgramShowTime newProgramShowTime : newProgramShowTimes) {
Program program = programMapper.selectById(newProgramShowTime.getProgramId());
if (Objects.isNull(program)) {
continue;
}
Long programGroupId = program.getProgramGroupId();
Date showTime = programGroupMap.get(programGroupId);
if (Objects.isNull(showTime)) {
//如果programGroupMap中没有节目演出时间,则直接放入
programGroupMap.put(programGroupId,newProgramShowTime.getShowTime());
}else {
//如果programGroupMap中有节目演出时间,则比较现有的和新的那个更小,取更小的
if (DateUtil.compare(newProgramShowTime.getShowTime(),showTime) < 0) {
programGroupMap.put(programGroupId,newProgramShowTime.getShowTime());
}
}
}
if (CollectionUtil.isNotEmpty(programGroupMap)) {
programGroupMap.forEach((k,v) -> {
ProgramGroup programGroup = new ProgramGroup();
programGroup.setRecentShowTime(v);
LambdaUpdateWrapper<ProgramGroup> programGroupLambdaUpdateWrapper =
Wrappers.lambdaUpdate(ProgramGroup.class)
.eq(ProgramGroup::getId,k);
//进行数据库更新节目组最近演出时间
programGroupMapper.update(programGroup,programGroupLambdaUpdateWrapper);
});
}
//返回更新了演出时间的节目id集合
return programIdSet;
}如果确实有演出时间过期了,那么把演出时间过期的节目id返回,用于Elasticsearch中的数据和Redis中的数据的删除
Elasticsearch的数据初始化
com.damai.service.init.ProgramElasticsearchInitData
@Slf4j
@Component
public class ProgramElasticsearchInitData extends AbstractApplicationPostConstructHandler {
@Autowired
private BusinessEsHandle businessEsHandle;
@Autowired
private ProgramService programService;
@Override
public Integer executeOrder() {
return 3;
}
/**
* 执行顺序第3执行
* 项目启动后,异步将program的数据更新到Elasticsearch中,当数据量特别大时,生产环境绝对不会这么做
* 会每个一个节目到数据库后,就会添加到Elasticsearch中,以及用定时任务来更新到Elasticsearch中
* */
@Override
public void executeInit(final ConfigurableApplicationContext context) {
BusinessThreadPool.execute(() -> {
try {
initElasticsearchData();
}catch (Exception e) {
log.error("executeInit error",e);
}
});
}
public void initElasticsearchData(){
//如果索引存在不往下执行
if (!indexAdd()) {
return;
}
//查询所有节目id集合
List<Long> allProgramIdList = programService.getAllProgramIdList();
//根据节目id统计出票档的最低价和最高价的集合map, key:节目id,value:票档
Map<Long, TicketCategoryAggregate> ticketCategorieMap = programService.selectTicketCategorieMap(allProgramIdList);
for (Long programId : allProgramIdList) {
//从数据库中查询节目相关的数据
ProgramVo programVo = programService.getDetailFromDb(programId);
Map<String,Object> map = new HashMap<>(32);
//构建数据参数
map.put(ProgramDocumentParamName.ID,programVo.getId());
map.put(ProgramDocumentParamName.PROGRAM_GROUP_ID,programVo.getProgramGroupId());
map.put(ProgramDocumentParamName.PRIME,programVo.getPrime());
map.put(ProgramDocumentParamName.TITLE,programVo.getTitle());
map.put(ProgramDocumentParamName.ACTOR,programVo.getActor());
map.put(ProgramDocumentParamName.PLACE,programVo.getPlace());
map.put(ProgramDocumentParamName.ITEM_PICTURE,programVo.getItemPicture());
map.put(ProgramDocumentParamName.AREA_ID,programVo.getAreaId());
map.put(ProgramDocumentParamName.AREA_NAME,programVo.getAreaName());
map.put(ProgramDocumentParamName.PROGRAM_CATEGORY_ID,programVo.getProgramCategoryId());
map.put(ProgramDocumentParamName.PROGRAM_CATEGORY_NAME,programVo.getProgramCategoryName());
map.put(ProgramDocumentParamName.PARENT_PROGRAM_CATEGORY_ID,programVo.getParentProgramCategoryId());
map.put(ProgramDocumentParamName.PARENT_PROGRAM_CATEGORY_NAME,programVo.getParentProgramCategoryName());
map.put(ProgramDocumentParamName.HIGH_HEAT,programVo.getHighHeat());
map.put(ProgramDocumentParamName.ISSUE_TIME,programVo.getIssueTime());
map.put(ProgramDocumentParamName.SHOW_TIME, programVo.getShowTime());
map.put(ProgramDocumentParamName.SHOW_DAY_TIME,programVo.getShowDayTime());
map.put(ProgramDocumentParamName.SHOW_WEEK_TIME,programVo.getShowWeekTime());
//最低价
map.put(ProgramDocumentParamName.MIN_PRICE,
Optional.ofNullable(ticketCategorieMap.get(programVo.getId()))
.map(TicketCategoryAggregate::getMinPrice).orElse(null));
//最高价
map.put(ProgramDocumentParamName.MAX_PRICE,
Optional.ofNullable(ticketCategorieMap.get(programVo.getId()))
.map(TicketCategoryAggregate::getMaxPrice).orElse(null));
//执行添加
businessEsHandle.add(SpringUtil.getPrefixDistinctionName() + "-" +
ProgramDocumentParamName.INDEX_NAME, ProgramDocumentParamName.INDEX_TYPE,map);
}
}
public boolean indexAdd(){
//检查索引是否存在
boolean result = businessEsHandle.checkIndex(SpringUtil.getPrefixDistinctionName() + "-" +
ProgramDocumentParamName.INDEX_NAME, ProgramDocumentParamName.INDEX_TYPE);
if (result) {
return false;
}
try {
//如果不存在则创建索引
businessEsHandle.createIndex(SpringUtil.getPrefixDistinctionName() + "-" +
ProgramDocumentParamName.INDEX_NAME, ProgramDocumentParamName.INDEX_TYPE,getEsMapping());
return true;
}catch (Exception e) {
log.error("createIndex error",e);
}
return false;
}
/**
* 组装索引的mapping结构,相当于数据库的表结构
* */
public List<EsDocumentMappingDto> getEsMapping(){
List<EsDocumentMappingDto> list = new ArrayList<>();
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.ID,"long"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.PROGRAM_GROUP_ID,"integer"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.PRIME,"long"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.TITLE,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.ACTOR,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.PLACE,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.ITEM_PICTURE,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.AREA_ID,"long"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.AREA_NAME,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.PROGRAM_CATEGORY_ID,"long"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.PROGRAM_CATEGORY_NAME,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.PARENT_PROGRAM_CATEGORY_ID,"long"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.PARENT_PROGRAM_CATEGORY_NAME,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.HIGH_HEAT,"integer"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.ISSUE_TIME,"date"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.SHOW_TIME,"date"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.SHOW_DAY_TIME,"date"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.SHOW_WEEK_TIME,"text"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.MIN_PRICE,"integer"));
list.add(new EsDocumentMappingDto(ProgramDocumentParamName.MAX_PRICE,"integer"));
return list;
}
}此初始化的作用就是创建Elasticsearch的节目索引以及填充节目数据了,这也是为了大家方便学习项目,直接就能体验到Elasticsearch提供的业务功能所以选择项目启动直接就把数据放入了
生产环境也同样解决不会这么做的,一般都是在后台管理系统中,在数据库里添加好新的节目后,再往Elasticsearch中存放
整个流程也不算是复杂:
- 项目启动后,异步执行初始化方法
- 在初始化方法中,先是检查索引是否存在,如果存在了,则停止执行。如果不存在,创建索引的mapping结构
- 接着就是添加节目的数据的操作
- 从数据库中查询所有节目id的集合
- 通过节目id统计出票档的最低价和最高价
- 循环节目id,从数据库查询出节目详情和构建完整的节目相关信息
public ProgramVo getDetailFromDb(Long programId) {
//从数据库查询节目数据
ProgramVo programVo = createProgramVo(programId);
//设置节目类型相关信息
ProgramCategory programCategory = getProgramCategory(programVo.getProgramCategoryId());
if (Objects.nonNull(programCategory)) {
programVo.setProgramCategoryName(programCategory.getName());
}
ProgramCategory parentProgramCategory = getProgramCategory(programVo.getParentProgramCategoryId());
if (Objects.nonNull(parentProgramCategory)) {
programVo.setParentProgramCategoryName(parentProgramCategory.getName());
}
//查询节目演出时间
LambdaQueryWrapper<ProgramShowTime> programShowTimeLambdaQueryWrapper =
Wrappers.lambdaQuery(ProgramShowTime.class).eq(ProgramShowTime::getProgramId, programId);
ProgramShowTime programShowTime = Optional.ofNullable(programShowTimeMapper.selectOne(programShowTimeLambdaQueryWrapper))
.orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_SHOW_TIME_NOT_EXIST));
//组装演出时间信息
programVo.setShowTime(programShowTime.getShowTime());
programVo.setShowDayTime(programShowTime.getShowDayTime());
programVo.setShowWeekTime(programShowTime.getShowWeekTime());
return programVo;
}private ProgramVo createProgramVo(long programId){
ProgramVo programVo = new ProgramVo();
//根据id查询到节目
Program program =
Optional.ofNullable(programMapper.selectById(programId))
.orElseThrow(() -> new DaMaiFrameException(BaseCode.PROGRAM_NOT_EXIST));
BeanUtil.copyProperties(program,programVo);
//查询区域
AreaGetDto areaGetDto = new AreaGetDto();
areaGetDto.setId(program.getAreaId());
ApiResponse<AreaVo> areaResponse = baseDataClient.getById(areaGetDto);
if (Objects.equals(areaResponse.getCode(), ApiResponse.ok().getCode())) {
if (Objects.nonNull(areaResponse.getData())) {
programVo.setAreaName(areaResponse.getData().getName());
}
}else {
log.error("base-data rpc getById error areaResponse:{}", JSON.toJSONString(areaResponse));
}
return programVo;
}- 构建节目数据参数
- 执行向Elasticsearch添加的操作
可以看到在组装数据时,除了节目表的数据外,还有节目类型的名字、父节目类型的名字、节目的演出时间、最低价、最高价的信息也进行了组装,所以在进行分页查询、主页查询、搜索查询的功能中,可以从Elasticsearch中查询然后直接返回给前端
节目详情布隆过滤器的初始化
com.damai.service.init.ProgramBloomFilterInit
@Component
public class ProgramBloomFilterInit extends AbstractApplicationPostConstructHandler {
@Autowired
private ProgramService programService;
@Autowired
private BloomFilterHandler bloomFilterHandler;
@Override
public Integer executeOrder() {
return 4;
}
@Override
public void executeInit(final ConfigurableApplicationContext context) {
//查询所有的节目id集合
List<Long> allProgramIdList = programService.getAllProgramIdList();
if (CollectionUtil.isEmpty(allProgramIdList)) {
return;
}
//将这些节目id都放入到布隆过滤器中
allProgramIdList.forEach(programId -> bloomFilterHandler.add(String.valueOf(programId)));
}
}public List<Long> getAllProgramIdList(){
LambdaQueryWrapper<Program> programLambdaQueryWrapper =
Wrappers.lambdaQuery(Program.class).eq(Program::getProgramStatus, BusinessStatus.YES.getCode())
.select(Program::getId);
List<Program> programs = programMapper.selectList(programLambdaQueryWrapper);
return programs.stream().map(Program::getId).collect(Collectors.toList());
}从节目表中查询所有上线的节目id集合,然后放入到布隆过滤器中
更新: 2026-04-13 15:20:29
原文: https://www.yuque.com/u22210564/ykdrdh/vmvabhdvn1a30mi5