提交 4543a339 编写于 作者: W wyb 提交者: EricZeng

[Bugfix]修复job更新中的数组越界报错(#744)

上级 1c4fbef9
package com.xiaojukeji.know.streaming.km.common.bean.entity.reassign;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import lombok.Data;
import org.apache.kafka.common.TopicPartition;
......@@ -20,10 +19,4 @@ public class ReassignResult {
return state.isDone();
}
public boolean checkPreferredReplicaElectionUnNeed(String reassignBrokerIds, String originalBrokerIds) {
Integer targetLeader = CommonUtils.string2IntList(reassignBrokerIds).get(0);
Integer originalLeader = CommonUtils.string2IntList(originalBrokerIds).get(0);
return originalLeader.equals(targetLeader);
}
}
......@@ -261,4 +261,20 @@ public class CommonUtils {
return null;
}
}
/**
* 校验两个list的第一个元素是否相等,以","分隔元素。
* @param str1
* @param str2
* @return
*/
public static boolean checkFirstElementIsEquals(String str1, String str2) {
if (ValidateUtils.anyBlank(str1, str2)) {
return false;
}
Integer targetLeader = CommonUtils.string2IntList(str1).get(0);
Integer originalLeader = CommonUtils.string2IntList(str2).get(0);
return originalLeader.equals(targetLeader);
}
}
......@@ -28,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReassignUtil;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
......@@ -385,11 +386,13 @@ public class ReassignJobServiceImpl implements ReassignJobService {
// 更新任务状态
rv = this.checkAndSetSuccessIfFinished(jobPO, rrr.getData());
if (rv.successful()){
//如果任务还未完成,先返回,不必考虑优先副本的重新选举。
if (!rv.successful()) {
return Result.buildFromIgnoreData(rv);
}
//已完成
//任务已完成,检查是否需要重新选举,并进行选举。
rv = this.preferredReplicaElection(jobId);
......@@ -500,10 +503,8 @@ public class ReassignJobServiceImpl implements ReassignJobService {
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobId);
List<TopicPartition> topicPartitions = new ArrayList<>();
subJobPOList.stream().forEach(reassignPO -> {
Integer targetLeader = CommonUtils.string2IntList(reassignPO.getReassignBrokerIds()).get(0);
Integer originalLeader = CommonUtils.string2IntList(reassignPO.getOriginalBrokerIds()).get(0);
//替换过leader的添加到优先副本选举任务列表
if (!originalLeader.equals(targetLeader)){
if (!CommonUtils.checkFirstElementIsEquals(reassignPO.getReassignBrokerIds(), reassignPO.getOriginalBrokerIds())) {
topicPartitions.add(new TopicPartition(reassignPO.getTopicName(), reassignPO.getPartitionId()));
}
});
......@@ -534,8 +535,12 @@ public class ReassignJobServiceImpl implements ReassignJobService {
if (dbSubPO == null) {
// DB中不存在
reassignSubJobDAO.insert(elem);
return;
}
//补全缺失信息
this.completeInfo(elem,dbSubPO);
// 已存在则进行更新
elem.setId(dbSubPO.getId());
reassignSubJobDAO.updateById(elem);
......@@ -565,13 +570,10 @@ public class ReassignJobServiceImpl implements ReassignJobService {
long now = System.currentTimeMillis();
boolean existNotFinished = false;
boolean unNeedPreferredReplicaElection = true;
boolean jobSucceed = false;
List<ReassignSubJobPO> subJobPOList = this.getSubJobsByJobId(jobPO.getId());
for (ReassignSubJobPO subJobPO: subJobPOList) {
if (!reassignmentResult.checkPreferredReplicaElectionUnNeed(subJobPO.getReassignBrokerIds(),subJobPO.getOriginalBrokerIds())) {
unNeedPreferredReplicaElection = false;
}
if (!reassignmentResult.checkPartitionFinished(subJobPO.getTopicName(), subJobPO.getPartitionId())) {
existNotFinished = true;
......@@ -591,12 +593,13 @@ public class ReassignJobServiceImpl implements ReassignJobService {
// 当前没有分区处于迁移中, 并且没有任务并不处于执行中
ReassignJobPO newJobPO = new ReassignJobPO();
newJobPO.setId(jobPO.getId());
jobSucceed = true;
newJobPO.setStatus(JobStatusEnum.SUCCESS.getStatus());
newJobPO.setFinishedTime(new Date(now));
reassignJobDAO.updateById(newJobPO);
}
return Result.build(unNeedPreferredReplicaElection);
return Result.build(jobSucceed);
}
private Result<List<ReassignSubJobPO>> setJobInRunning(ReassignJobPO jobPO) {
......@@ -861,4 +864,25 @@ public class ReassignJobServiceImpl implements ReassignJobService {
return returnRV;
}
private void completeInfo(ReassignSubJobPO newPO, ReassignSubJobPO dbPO) {
if (newPO.getJobId() == null) {
newPO.setJobId(dbPO.getJobId());
}
if (newPO.getTopicName() == null) {
newPO.setTopicName(dbPO.getTopicName());
}
if (newPO.getClusterPhyId() == null) {
newPO.setClusterPhyId(dbPO.getClusterPhyId());
}
if (newPO.getPartitionId() == null) {
newPO.setPartitionId(dbPO.getPartitionId());
}
if (newPO.getOriginalBrokerIds() == null || newPO.getOriginalBrokerIds().isEmpty()) {
newPO.setOriginalBrokerIds(dbPO.getOriginalBrokerIds());
}
if (newPO.getReassignBrokerIds() == null || newPO.getReassignBrokerIds().isEmpty()) {
newPO.setReassignBrokerIds(dbPO.getReassignBrokerIds());
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册