未验证 提交 203859b7 编写于 作者: H haoqi123 提交者: GitHub

Merge branch 'didi:dev' into dev

...@@ -9,7 +9,7 @@ error_exit () ...@@ -9,7 +9,7 @@ error_exit ()
[ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME [ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME
if [ -z "$JAVA_HOME" ]; then if [ -z "$JAVA_HOME" ]; then
if $darwin; then if [ "Darwin" = "$(uname -s)" ]; then
if [ -x '/usr/libexec/java_home' ] ; then if [ -x '/usr/libexec/java_home' ] ; then
export JAVA_HOME=`/usr/libexec/java_home` export JAVA_HOME=`/usr/libexec/java_home`
......
...@@ -40,8 +40,7 @@ thread-pool: ...@@ -40,8 +40,7 @@ thread-pool:
``` ```
**SQL 变更**
**SQL变更**
```sql ```sql
-- 多集群管理权限2022-09-06新增 -- 多集群管理权限2022-09-06新增
...@@ -80,12 +79,11 @@ ALTER TABLE `logi_security_oplog` ...@@ -80,12 +79,11 @@ ALTER TABLE `logi_security_oplog`
### 6.2.2、升级至 `v3.0.0-beta.1`版本 ### 6.2.2、升级至 `v3.0.0-beta.1`版本
**SQL 变更**
**SQL变更**
1、在`ks_km_broker`表增加了一个监听信息字段。 1、在`ks_km_broker`表增加了一个监听信息字段。
2、为`logi_security_oplog`operation_methods字段设置默认值''。 2、为`logi_security_oplog` operation_methods 字段设置默认值''。
因此需要执行下面的sql对数据库表进行更新。 因此需要执行下面的 sql 对数据库表进行更新。
```sql ```sql
ALTER TABLE `ks_km_broker` ALTER TABLE `ks_km_broker`
...@@ -98,7 +96,6 @@ ALTER COLUMN `operation_methods` set default ''; ...@@ -98,7 +96,6 @@ ALTER COLUMN `operation_methods` set default '';
--- ---
### 6.2.3、`2.x`版本 升级至 `v3.0.0-beta.0`版本 ### 6.2.3、`2.x`版本 升级至 `v3.0.0-beta.0`版本
**升级步骤:** **升级步骤:**
...@@ -123,14 +120,14 @@ ALTER COLUMN `operation_methods` set default ''; ...@@ -123,14 +120,14 @@ ALTER COLUMN `operation_methods` set default '';
UPDATE ks_km_topic UPDATE ks_km_topic
INNER JOIN INNER JOIN
(SELECT (SELECT
topic.cluster_id AS cluster_id, topic.cluster_id AS cluster_id,
topic.topic_name AS topic_name, topic.topic_name AS topic_name,
topic.description AS description topic.description AS description
FROM topic WHERE description != '' FROM topic WHERE description != ''
) AS t ) AS t
ON ks_km_topic.cluster_phy_id = t.cluster_id ON ks_km_topic.cluster_phy_id = t.cluster_id
AND ks_km_topic.topic_name = t.topic_name AND ks_km_topic.topic_name = t.topic_name
AND ks_km_topic.id > 0 AND ks_km_topic.id > 0
SET ks_km_topic.description = t.description; SET ks_km_topic.description = t.description;
``` ```
\ No newline at end of file
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
下面是用户第一次使用我们产品的典型体验路径: 下面是用户第一次使用我们产品的典型体验路径:
![text](http://img-ys011.didistatic.com/static/dc2img/do1_YehqxqmsVaqU5gf3XphI) ![text](http://img-ys011.didistatic.com/static/dc2img/do1_qgqPsAY46sZeBaPUCwXY)
## 5.3、常用功能 ## 5.3、常用功能
......
...@@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedD ...@@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicConsumedD
import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.group.GroupTopicOverviewVO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum; import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
...@@ -199,12 +199,12 @@ public class GroupManagerImpl implements GroupManager { ...@@ -199,12 +199,12 @@ public class GroupManagerImpl implements GroupManager {
return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(dto.getClusterId(), dto.getTopicName())); return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(dto.getClusterId(), dto.getTopicName()));
} }
if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType() if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()
&& ValidateUtils.isEmptyList(dto.getOffsetList())) { && ValidateUtils.isEmptyList(dto.getOffsetList())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定offset重置需传offset信息"); return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定offset重置需传offset信息");
} }
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType() if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()
&& ValidateUtils.isNull(dto.getTimestamp())) { && ValidateUtils.isNull(dto.getTimestamp())) {
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定时间重置需传时间信息"); return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "参数错误,指定时间重置需传时间信息");
} }
...@@ -213,7 +213,7 @@ public class GroupManagerImpl implements GroupManager { ...@@ -213,7 +213,7 @@ public class GroupManagerImpl implements GroupManager {
} }
private Result<Map<TopicPartition, Long>> getPartitionOffset(GroupOffsetResetDTO dto) { private Result<Map<TopicPartition, Long>> getPartitionOffset(GroupOffsetResetDTO dto) {
if (GroupOffsetResetEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) { if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getResetType()) {
return Result.buildSuc(dto.getOffsetList().stream().collect(Collectors.toMap( return Result.buildSuc(dto.getOffsetList().stream().collect(Collectors.toMap(
elem -> new TopicPartition(dto.getTopicName(), elem.getPartitionId()), elem -> new TopicPartition(dto.getTopicName(), elem.getPartitionId()),
PartitionOffsetDTO::getOffset, PartitionOffsetDTO::getOffset,
...@@ -222,9 +222,9 @@ public class GroupManagerImpl implements GroupManager { ...@@ -222,9 +222,9 @@ public class GroupManagerImpl implements GroupManager {
} }
OffsetSpec offsetSpec = null; OffsetSpec offsetSpec = null;
if (GroupOffsetResetEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) { if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getResetType()) {
offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp()); offsetSpec = OffsetSpec.forTimestamp(dto.getTimestamp());
} else if (GroupOffsetResetEnum.EARLIEST.getResetType() == dto.getResetType()) { } else if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getResetType()) {
offsetSpec = OffsetSpec.earliest(); offsetSpec = OffsetSpec.earliest();
} else { } else {
offsetSpec = OffsetSpec.latest(); offsetSpec = OffsetSpec.latest();
......
package com.xiaojukeji.know.streaming.km.biz.topic; package com.xiaojukeji.know.streaming.km.biz.topic;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.TopicBrokersPartitionsSummaryVO;
......
...@@ -22,25 +22,26 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart ...@@ -22,25 +22,26 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart
import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.converter.PartitionConverter;
import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.TopicMetricVersionItems;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -160,8 +161,31 @@ public class TopicStateManagerImpl implements TopicStateManager { ...@@ -160,8 +161,31 @@ public class TopicStateManagerImpl implements TopicStateManager {
} }
maxMessage = Math.min(maxMessage, dto.getMaxRecords()); maxMessage = Math.min(maxMessage, dto.getMaxRecords());
kafkaConsumer.assign(partitionList); kafkaConsumer.assign(partitionList);
Map<TopicPartition, OffsetAndTimestamp> partitionOffsetAndTimestampMap = new HashMap<>();
// 获取指定时间每个分区的offset(按指定开始时间查询消息时)
if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
partitionList.forEach(topicPartition -> {
timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs());
});
partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
}
for (TopicPartition partition : partitionList) { for (TopicPartition partition : partitionList) {
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) {
// 重置到最旧
kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition));
} else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定时间
kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset());
} else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) {
// 重置到指定位置
} else {
// 默认,重置到最新
kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords()));
}
} }
// 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间
...@@ -185,6 +209,15 @@ public class TopicStateManagerImpl implements TopicStateManager { ...@@ -185,6 +209,15 @@ public class TopicStateManagerImpl implements TopicStateManager {
} }
} }
// 排序
if (ObjectUtils.isNotEmpty(voList)) {
// 默认按时间倒序排序
if (StringUtils.isBlank(dto.getSortType())) {
dto.setSortType(SortTypeEnum.DESC.getSortType());
}
PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType());
}
return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size())));
} catch (Exception e) { } catch (Exception e) {
log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e); log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e);
......
...@@ -7,12 +7,14 @@ import com.didiglobal.logi.log.LogFactory; ...@@ -7,12 +7,14 @@ import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.config.ConfigDTO; import com.didiglobal.logi.security.common.dto.config.ConfigDTO;
import com.didiglobal.logi.security.service.ConfigService; import com.didiglobal.logi.security.service.ConfigService;
import com.xiaojukeji.know.streaming.km.biz.version.VersionControlManager; import com.xiaojukeji.know.streaming.km.biz.version.VersionControlManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDetailDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.UserMetricConfigDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.UserMetricConfigDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric.UserMetricConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric.UserMetricConfig;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionControlItem;
import com.xiaojukeji.know.streaming.km.common.bean.vo.config.metric.UserMetricConfigVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.config.metric.UserMetricConfigVO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.version.VersionItemVO;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.VersionUtil; import com.xiaojukeji.know.streaming.km.common.utils.VersionUtil;
...@@ -159,6 +161,9 @@ public class VersionControlManagerImpl implements VersionControlManager { ...@@ -159,6 +161,9 @@ public class VersionControlManagerImpl implements VersionControlManager {
UserMetricConfig umc = userMetricConfigMap.get(itemType + "@" + metric); UserMetricConfig umc = userMetricConfigMap.get(itemType + "@" + metric);
userMetricConfigVO.setSet(null != umc && umc.isSet()); userMetricConfigVO.setSet(null != umc && umc.isSet());
if (umc != null) {
userMetricConfigVO.setRank(umc.getRank());
}
userMetricConfigVO.setName(itemVO.getName()); userMetricConfigVO.setName(itemVO.getName());
userMetricConfigVO.setType(itemVO.getType()); userMetricConfigVO.setType(itemVO.getType());
userMetricConfigVO.setDesc(itemVO.getDesc()); userMetricConfigVO.setDesc(itemVO.getDesc());
...@@ -178,13 +183,30 @@ public class VersionControlManagerImpl implements VersionControlManager { ...@@ -178,13 +183,30 @@ public class VersionControlManagerImpl implements VersionControlManager {
@Override @Override
public Result<Void> updateUserMetricItem(Long clusterId, Integer type, UserMetricConfigDTO dto, String operator) { public Result<Void> updateUserMetricItem(Long clusterId, Integer type, UserMetricConfigDTO dto, String operator) {
Map<String, Boolean> metricsSetMap = dto.getMetricsSet(); Map<String, Boolean> metricsSetMap = dto.getMetricsSet();
if(null == metricsSetMap || metricsSetMap.isEmpty()){
//转换metricDetailDTOList
List<MetricDetailDTO> metricDetailDTOList = dto.getMetricDetailDTOList();
Map<String, MetricDetailDTO> metricDetailMap = new HashMap<>();
if (metricDetailDTOList != null && !metricDetailDTOList.isEmpty()) {
metricDetailMap = metricDetailDTOList.stream().collect(Collectors.toMap(MetricDetailDTO::getMetric, Function.identity()));
}
//转换metricsSetMap
if (metricsSetMap != null && !metricsSetMap.isEmpty()) {
for (Map.Entry<String, Boolean> metricAndShowEntry : metricsSetMap.entrySet()) {
if (metricDetailMap.containsKey(metricAndShowEntry.getKey())) continue;
metricDetailMap.put(metricAndShowEntry.getKey(), new MetricDetailDTO(metricAndShowEntry.getKey(), metricAndShowEntry.getValue(), null));
}
}
if (metricDetailMap.isEmpty()) {
return Result.buildSuc(); return Result.buildSuc();
} }
Set<UserMetricConfig> userMetricConfigs = getUserMetricConfig(operator); Set<UserMetricConfig> userMetricConfigs = getUserMetricConfig(operator);
for(Map.Entry<String, Boolean> metricAndShowEntry : metricsSetMap.entrySet()){ for(Map.Entry<String, Boolean> metricAndShowEntry : metricsSetMap.entrySet()){
UserMetricConfig userMetricConfig = new UserMetricConfig(type, metricAndShowEntry.getKey(), metricAndShowEntry.getValue()); UserMetricConfig userMetricConfig = new UserMetricConfig(type, metricAndShowEntry.getKey(), metricAndShowEntry.getValue(), metricDetailMap.get(metricAndShowEntry.getKey()).getRank());
userMetricConfigs.remove(userMetricConfig); userMetricConfigs.remove(userMetricConfig);
userMetricConfigs.add(userMetricConfig); userMetricConfigs.add(userMetricConfig);
} }
...@@ -228,7 +250,15 @@ public class VersionControlManagerImpl implements VersionControlManager { ...@@ -228,7 +250,15 @@ public class VersionControlManagerImpl implements VersionControlManager {
return defaultMetrics; return defaultMetrics;
} }
return JSON.parseObject(value, new TypeReference<Set<UserMetricConfig>>(){}); Set<UserMetricConfig> userMetricConfigs = JSON.parseObject(value, new TypeReference<Set<UserMetricConfig>>() {});
//补充rank不存在情况
for (UserMetricConfig userMetricConfig : userMetricConfigs) {
if (userMetricConfig.getRank() == 0) {
userMetricConfig.setRank(Constant.DEFAULT_METRIC_RANK);
}
}
return userMetricConfigs;
} }
public static void main(String[] args){ public static void main(String[] args){
......
...@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.group; ...@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.dto.group;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.ClusterTopicDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.ClusterTopicDTO;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
...@@ -23,7 +24,7 @@ public class GroupOffsetResetDTO extends ClusterTopicDTO { ...@@ -23,7 +24,7 @@ public class GroupOffsetResetDTO extends ClusterTopicDTO {
private String groupName; private String groupName;
/** /**
* @see com.xiaojukeji.know.streaming.km.common.enums.GroupOffsetResetEnum * @see OffsetTypeEnum
*/ */
@NotNull(message = "resetType不允许为空") @NotNull(message = "resetType不允许为空")
@ApiModelProperty(value = "重置方式", example = "1") @ApiModelProperty(value = "重置方式", example = "1")
......
package com.xiaojukeji.know.streaming.km.common.bean.dto.metrices;
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author didi
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ApiModel(description = "指标详细属性信息")
public class MetricDetailDTO extends BaseDTO {
@ApiModelProperty("指标名称")
private String metric;
@ApiModelProperty("指标是否显示")
private Boolean set;
@ApiModelProperty("指标优先级")
private Integer rank;
}
...@@ -7,6 +7,7 @@ import lombok.AllArgsConstructor; ...@@ -7,6 +7,7 @@ import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -17,4 +18,7 @@ import java.util.Map; ...@@ -17,4 +18,7 @@ import java.util.Map;
public class UserMetricConfigDTO extends BaseDTO { public class UserMetricConfigDTO extends BaseDTO {
@ApiModelProperty("指标展示设置项,key:指标名;value:是否展现(true展现/false不展现)") @ApiModelProperty("指标展示设置项,key:指标名;value:是否展现(true展现/false不展现)")
private Map<String, Boolean> metricsSet; private Map<String, Boolean> metricsSet;
@ApiModelProperty("指标自定义属性列表")
private List<MetricDetailDTO> metricDetailDTOList;
} }
package com.xiaojukeji.know.streaming.km.common.bean.dto.topic; package com.xiaojukeji.know.streaming.km.common.bean.dto.topic;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
...@@ -15,7 +16,7 @@ import javax.validation.constraints.NotNull; ...@@ -15,7 +16,7 @@ import javax.validation.constraints.NotNull;
@Data @Data
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "Topic记录") @ApiModel(description = "Topic记录")
public class TopicRecordDTO extends BaseDTO { public class TopicRecordDTO extends PaginationSortDTO {
@NotNull(message = "truncate不允许为空") @NotNull(message = "truncate不允许为空")
@ApiModelProperty(value = "是否截断", example = "true") @ApiModelProperty(value = "是否截断", example = "true")
private Boolean truncate; private Boolean truncate;
...@@ -34,4 +35,13 @@ public class TopicRecordDTO extends BaseDTO { ...@@ -34,4 +35,13 @@ public class TopicRecordDTO extends BaseDTO {
@ApiModelProperty(value = "预览超时时间", example = "10000") @ApiModelProperty(value = "预览超时时间", example = "10000")
private Long pullTimeoutUnitMs = 8000L; private Long pullTimeoutUnitMs = 8000L;
/**
* @see OffsetTypeEnum
*/
@ApiModelProperty(value = "offset", example = "")
private Integer filterOffsetReset = 0;
@ApiModelProperty(value = "开始日期时间戳", example = "")
private Long startTimestampUnitMs;
} }
package com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric; package com.xiaojukeji.know.streaming.km.common.bean.entity.config.metric;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor
public class UserMetricConfig { public class UserMetricConfig {
private int type; private int type;
...@@ -15,6 +15,22 @@ public class UserMetricConfig { ...@@ -15,6 +15,22 @@ public class UserMetricConfig {
private boolean set; private boolean set;
private int rank;
public UserMetricConfig(int type, String metric, boolean set, Integer rank) {
this.type = type;
this.metric = metric;
this.set = set;
this.rank = rank == null ? Constant.DEFAULT_METRIC_RANK : rank;
}
public UserMetricConfig(int type, String metric, boolean set) {
this.type = type;
this.metric = metric;
this.set = set;
this.rank = Constant.DEFAULT_METRIC_RANK;
}
@Override @Override
public int hashCode(){ public int hashCode(){
return metric.hashCode() << 1 + type; return metric.hashCode() << 1 + type;
......
...@@ -14,4 +14,7 @@ import lombok.NoArgsConstructor; ...@@ -14,4 +14,7 @@ import lombok.NoArgsConstructor;
public class UserMetricConfigVO extends VersionItemVO { public class UserMetricConfigVO extends VersionItemVO {
@ApiModelProperty(value = "该指标用户是否设置展现", example = "true") @ApiModelProperty(value = "该指标用户是否设置展现", example = "true")
private Boolean set; private Boolean set;
@ApiModelProperty(value = "该指标展示优先级", example = "1")
private Integer rank;
} }
...@@ -42,6 +42,8 @@ public class Constant { ...@@ -42,6 +42,8 @@ public class Constant {
*/ */
public static final Integer DEFAULT_CLUSTER_HEALTH_SCORE = 90; public static final Integer DEFAULT_CLUSTER_HEALTH_SCORE = 90;
public static final Integer DEFAULT_METRIC_RANK = 10000;
public static final String DEFAULT_USER_NAME = "know-streaming-app"; public static final String DEFAULT_USER_NAME = "know-streaming-app";
public static final int INVALID_CODE = -1; public static final int INVALID_CODE = -1;
...@@ -64,4 +66,5 @@ public class Constant { ...@@ -64,4 +66,5 @@ public class Constant {
public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F; public static final Float COLLECT_METRICS_ERROR_COST_TIME = -1.0F;
public static final Integer DEFAULT_RETRY_TIME = 3; public static final Integer DEFAULT_RETRY_TIME = 3;
} }
...@@ -3,19 +3,19 @@ package com.xiaojukeji.know.streaming.km.common.enums; ...@@ -3,19 +3,19 @@ package com.xiaojukeji.know.streaming.km.common.enums;
import lombok.Getter; import lombok.Getter;
/** /**
* 重置offset * offset类型
* @author zengqiao * @author zengqiao
* @date 19/4/8 * @date 19/4/8
*/ */
@Getter @Getter
public enum GroupOffsetResetEnum { public enum OffsetTypeEnum {
LATEST(0, "重置到最新"), LATEST(0, "最新"),
EARLIEST(1, "重置到最旧"), EARLIEST(1, "最旧"),
PRECISE_TIMESTAMP(2, "按时间进行重置"), PRECISE_TIMESTAMP(2, "指定时间"),
PRECISE_OFFSET(3, "重置到指定位置"), PRECISE_OFFSET(3, "指定位置"),
; ;
...@@ -23,7 +23,7 @@ public enum GroupOffsetResetEnum { ...@@ -23,7 +23,7 @@ public enum GroupOffsetResetEnum {
private final String message; private final String message;
GroupOffsetResetEnum(int resetType, String message) { OffsetTypeEnum(int resetType, String message) {
this.resetType = resetType; this.resetType = resetType;
this.message = message; this.message = message;
} }
......
...@@ -90,6 +90,8 @@ public class JmxConnectorWrap { ...@@ -90,6 +90,8 @@ public class JmxConnectorWrap {
} }
try { try {
jmxConnector.close(); jmxConnector.close();
jmxConnector = null;
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e); LOGGER.warn("close JmxConnector exception, physicalClusterId:{} brokerId:{} host:{} port:{}.", physicalClusterId, brokerId, host, port, e);
} }
...@@ -105,6 +107,11 @@ public class JmxConnectorWrap { ...@@ -105,6 +107,11 @@ public class JmxConnectorWrap {
acquire(); acquire();
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
return mBeanServerConnection.getAttribute(name, attribute); return mBeanServerConnection.getAttribute(name, attribute);
} catch (IOException ioe) {
// 如果是因为连接断开,则进行重新连接,并抛出异常
reInitDueIOException();
throw ioe;
} finally { } finally {
atomicInteger.incrementAndGet(); atomicInteger.incrementAndGet();
} }
...@@ -120,6 +127,11 @@ public class JmxConnectorWrap { ...@@ -120,6 +127,11 @@ public class JmxConnectorWrap {
acquire(); acquire();
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
return mBeanServerConnection.getAttributes(name, attributes); return mBeanServerConnection.getAttributes(name, attributes);
} catch (IOException ioe) {
// 如果是因为连接断开,则进行重新连接,并抛出异常
reInitDueIOException();
throw ioe;
} finally { } finally {
atomicInteger.incrementAndGet(); atomicInteger.incrementAndGet();
} }
...@@ -131,6 +143,11 @@ public class JmxConnectorWrap { ...@@ -131,6 +143,11 @@ public class JmxConnectorWrap {
acquire(); acquire();
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection(); MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
return mBeanServerConnection.queryNames(name, query); return mBeanServerConnection.queryNames(name, query);
} catch (IOException ioe) {
// 如果是因为连接断开,则进行重新连接,并抛出异常
reInitDueIOException();
throw ioe;
} finally { } finally {
atomicInteger.incrementAndGet(); atomicInteger.incrementAndGet();
} }
...@@ -186,4 +203,26 @@ public class JmxConnectorWrap { ...@@ -186,4 +203,26 @@ public class JmxConnectorWrap {
} }
} }
} }
private synchronized void reInitDueIOException() {
try {
if (jmxConnector == null) {
return;
}
// 检查是否正常
jmxConnector.getConnectionId();
// 如果正常则直接返回
return;
} catch (Exception e) {
// ignore
}
// 关闭旧的
this.close();
// 重新创建
this.checkJmxConnectionAndInitIfNeed();
}
} }
...@@ -10,6 +10,7 @@ const defaultParams: any = { ...@@ -10,6 +10,7 @@ const defaultParams: any = {
maxRecords: 100, maxRecords: 100,
pullTimeoutUnitMs: 5000, pullTimeoutUnitMs: 5000,
// filterPartitionId: 1, // filterPartitionId: 1,
filterOffsetReset: 0
}; };
const defaultpaPagination = { const defaultpaPagination = {
current: 1, current: 1,
...@@ -29,12 +30,20 @@ const TopicMessages = (props: any) => { ...@@ -29,12 +30,20 @@ const TopicMessages = (props: any) => {
const [pagination, setPagination] = useState<any>(defaultpaPagination); const [pagination, setPagination] = useState<any>(defaultpaPagination);
const [form] = Form.useForm(); const [form] = Form.useForm();
// 获取消息开始位置
const offsetResetList = [
{ 'label': 'latest', value: 0 },
{ 'label': 'earliest', value: 1 }
];
// 默认排序 // 默认排序
const defaultSorter = { const defaultSorter = {
sortField: 'timestampUnitMs', sortField: 'timestampUnitMs',
sortType: 'desc', sortType: 'desc',
}; };
const [sorter, setSorter] = useState<any>(defaultSorter);
// 请求接口获取数据 // 请求接口获取数据
const genData = async () => { const genData = async () => {
if (urlParams?.clusterId === undefined || hashData?.topicName === undefined) return; if (urlParams?.clusterId === undefined || hashData?.topicName === undefined) return;
...@@ -49,7 +58,7 @@ const TopicMessages = (props: any) => { ...@@ -49,7 +58,7 @@ const TopicMessages = (props: any) => {
}); });
setPartitionIdList(newPartitionIdList || []); setPartitionIdList(newPartitionIdList || []);
}); });
request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...defaultSorter }, method: 'POST' }) request(Api.getTopicMessagesList(hashData?.topicName, urlParams?.clusterId), { data: { ...params, ...sorter }, method: 'POST' })
.then((res: any) => { .then((res: any) => {
// setPagination({ // setPagination({
// current: res.pagination?.pageNo, // current: res.pagination?.pageNo,
...@@ -87,8 +96,15 @@ const TopicMessages = (props: any) => { ...@@ -87,8 +96,15 @@ const TopicMessages = (props: any) => {
history.push(`/cluster/${urlParams?.clusterId}/testing/consumer`); history.push(`/cluster/${urlParams?.clusterId}/testing/consumer`);
}; };
const onTableChange = (pagination: any, filters: any, sorter: any) => { const onTableChange = (pagination: any, filters: any, sorter: any, extra: any) => {
setPagination(pagination); setPagination(pagination);
// 只有排序事件时,触发重新请求后端数据
if(extra.action === 'sort') {
setSorter({
sortField: sorter.field || '',
sortType: sorter.order ? sorter.order.substring(0, sorter.order.indexOf('end')) : ''
});
}
// const asc = sorter?.order && sorter?.order === 'ascend' ? true : false; // const asc = sorter?.order && sorter?.order === 'ascend' ? true : false;
// const sortColumn = sorter.field && toLine(sorter.field); // const sortColumn = sorter.field && toLine(sorter.field);
// genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams }); // genData({ pageNo: pagination.current, pageSize: pagination.pageSize, filters, asc, sortColumn, queryTerm: searchResult, ...allParams });
...@@ -96,7 +112,7 @@ const TopicMessages = (props: any) => { ...@@ -96,7 +112,7 @@ const TopicMessages = (props: any) => {
useEffect(() => { useEffect(() => {
props.positionType === 'Messages' && genData(); props.positionType === 'Messages' && genData();
}, [props, params]); }, [props, params, sorter]);
return ( return (
<> <>
...@@ -119,6 +135,15 @@ const TopicMessages = (props: any) => { ...@@ -119,6 +135,15 @@ const TopicMessages = (props: any) => {
</div> </div>
<div className="messages-query"> <div className="messages-query">
<Form form={form} layout="inline" onFinish={onFinish}> <Form form={form} layout="inline" onFinish={onFinish}>
<Form.Item name="filterOffsetReset">
<Select
options={offsetResetList}
size="small"
style={{ width: '120px' }}
className={'detail-table-select'}
placeholder="请选择offset"
/>
</Form.Item>
<Form.Item name="filterPartitionId"> <Form.Item name="filterPartitionId">
<Select <Select
options={partitionIdList} options={partitionIdList}
...@@ -158,7 +183,7 @@ const TopicMessages = (props: any) => { ...@@ -158,7 +183,7 @@ const TopicMessages = (props: any) => {
showQueryForm={false} showQueryForm={false}
tableProps={{ tableProps={{
showHeader: false, showHeader: false,
rowKey: 'path', rowKey: 'offset',
loading: loading, loading: loading,
columns: getTopicMessagesColmns(), columns: getTopicMessagesColmns(),
dataSource: data, dataSource: data,
...@@ -169,6 +194,7 @@ const TopicMessages = (props: any) => { ...@@ -169,6 +194,7 @@ const TopicMessages = (props: any) => {
bordered: false, bordered: false,
onChange: onTableChange, onChange: onTableChange,
scroll: { x: 'max-content' }, scroll: { x: 'max-content' },
sortDirections: ['descend', 'ascend', 'default']
}, },
}} }}
/> />
......
...@@ -85,7 +85,8 @@ export const getTopicMessagesColmns = () => { ...@@ -85,7 +85,8 @@ export const getTopicMessagesColmns = () => {
title: 'Timestamp', title: 'Timestamp',
dataIndex: 'timestampUnitMs', dataIndex: 'timestampUnitMs',
key: 'timestampUnitMs', key: 'timestampUnitMs',
render: (t: number) => (t ? moment(t).format(timeFormat) : '-'), sorter: true,
render: (t: number) => (t ? moment(t).format(timeFormat) + '.' + moment(t).millisecond() : '-'),
}, },
{ {
title: 'Key', title: 'Key',
......
...@@ -751,8 +751,8 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust ...@@ -751,8 +751,8 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
private Result<ClusterMetrics> getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){ private Result<ClusterMetrics> getMetricFromKafkaByTotalTopics(Long clusterId, String metric, String topicMetric){
List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterId); List<Topic> topics = topicService.listTopicsFromCacheFirst(clusterId);
float metricsSum = 0f; float sumMetricValue = 0f;
for(Topic topic : topics){ for(Topic topic : topics) {
Result<List<TopicMetrics>> ret = topicMetricService.collectTopicMetricsFromKafkaWithCacheFirst( Result<List<TopicMetrics>> ret = topicMetricService.collectTopicMetricsFromKafkaWithCacheFirst(
clusterId, clusterId,
topic.getTopicName(), topic.getTopicName(),
...@@ -763,14 +763,15 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust ...@@ -763,14 +763,15 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
continue; continue;
} }
List<TopicMetrics> topicMetrics = ret.getData(); for (TopicMetrics metrics : ret.getData()) {
for (TopicMetrics metrics : topicMetrics) { if(metrics.isBBrokerAgg()) {
if(metrics.isBBrokerAgg()){ Float metricValue = metrics.getMetric(topicMetric);
metricsSum += Double.valueOf(metrics.getMetrics().get(topicMetric)); sumMetricValue += (metricValue == null? 0f: metricValue);
break;
} }
} }
} }
return Result.buildSuc(initWithMetrics(clusterId, metric, metricsSum)); return Result.buildSuc(initWithMetrics(clusterId, metric, sumMetricValue));
} }
} }
...@@ -4,13 +4,13 @@ description: knowstreaming-manager Helm chart ...@@ -4,13 +4,13 @@ description: knowstreaming-manager Helm chart
type: application type: application
version: 0.1.3 version: 0.1.4
maintainers: maintainers:
- email: didicloud@didiglobal.com - email: didicloud@didiglobal.com
name: didicloud name: didicloud
appVersion: "3.0.0-beta.1" appVersion: "3.0.0-beta.2"
dependencies: dependencies:
- name: knowstreaming-web - name: knowstreaming-web
......
...@@ -173,8 +173,8 @@ antiAffinityTopologyKey: "kubernetes.io/hostname" ...@@ -173,8 +173,8 @@ antiAffinityTopologyKey: "kubernetes.io/hostname"
# Hard means that by default pods will only be scheduled if there are enough nodes for them # Hard means that by default pods will only be scheduled if there are enough nodes for them
# and that they will never end up on the same node. Setting this to soft will do this "best effort" # and that they will never end up on the same node. Setting this to soft will do this "best effort"
antiAffinity: "hard" antiAffinity: ""
#antiAffinity: "hard"
# This is the node affinity settings as defined in # This is the node affinity settings as defined in
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#node-affinity-beta-feature # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#node-affinity-beta-feature
nodeAffinity: {} nodeAffinity: {}
......
...@@ -21,7 +21,7 @@ spec: ...@@ -21,7 +21,7 @@ spec:
{{- include "ksmysql.selectorLabels" . | nindent 8 }} {{- include "ksmysql.selectorLabels" . | nindent 8 }}
spec: spec:
containers: containers:
- image: knowstreaming/knowstreaming-mysql:0.1.0 - image: knowstreaming/knowstreaming-mysql:0.2.0
name: {{ .Chart.Name }} name: {{ .Chart.Name }}
env: env:
- name: MYSQL_DATABASE - name: MYSQL_DATABASE
......
...@@ -71,6 +71,7 @@ data: ...@@ -71,6 +71,7 @@ data:
driver-class-name: org.mariadb.jdbc.Driver driver-class-name: org.mariadb.jdbc.Driver
app-name: know-streaming app-name: know-streaming
resource-extend-bean-name: myResourceExtendImpl resource-extend-bean-name: myResourceExtendImpl
login-extend-bean-name: logiSecurityDefaultLoginExtendImpl
logging: logging:
config: classpath:logback-spring.xml config: classpath:logback-spring.xml
...@@ -85,11 +86,16 @@ data: ...@@ -85,11 +86,16 @@ data:
queue-size: 10000 # 每个线程池队列大小 queue-size: 10000 # 每个线程池队列大小
select-suitable-enable: true # 任务是否自动选择合适的线程池,非主要,可不修改 select-suitable-enable: true # 任务是否自动选择合适的线程池,非主要,可不修改
suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改 suitable-queue-size: 1000 # 线程池理想的队列大小,非主要,可不修改
task: # 任务模块的配置 task: # 任务模块的配置
heaven: # 采集任务配置 metrics: # metrics采集任务配置
thread-num: 20 # 采集任务线程池核心线程数 thread-num: 18 # metrics采集任务线程池核心线程数
queue-size: 1000 # 采集任务线程池队列大小 queue-size: 180 # metrics采集任务线程池队列大小
metadata: # metadata同步任务配置
thread-num: 27 # metadata同步任务线程池核心线程数
queue-size: 270 # metadata同步任务线程池队列大小
common: # 剩余其他任务配置
thread-num: 15 # 剩余其他任务线程池核心线程数
queue-size: 150 # 剩余其他任务线程池队列大小
client-pool: client-pool:
...@@ -99,17 +105,16 @@ data: ...@@ -99,17 +105,16 @@ data:
max-total-client-num: 20 # 最大客户端数 max-total-client-num: 20 # 最大客户端数
borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒 borrow-timeout-unit-ms: 5000 # 租借超时时间,单位秒
es:
client:
{{ if .Values.elasticsearch.enabled }} {{ if .Values.elasticsearch.enabled }}
es.client.address: elasticsearch-master:9200 address: elasticsearch-master:9200
#es.client.address: {{ .Release.Name }}-elasticsearch:9200
{{- else }} {{- else }}
es.client.address: {{ .Values.elasticsearch.esClientAddress }}:{{ .Values.elasticsearch.esProt }} address: {{ .Values.elasticsearch.esClientAddress }}:{{ .Values.elasticsearch.esProt }}
{{- end }} {{- end }}
# es.client.pass: knowstreaming-manager client-cnt: 10
# 集群自动均衡相关配置 io-thread-cnt: 2
cluster-balance: max-retry-cnt: 5
ignored-topics:
time-second: 300
# 普罗米修斯指标导出相关配置 # 普罗米修斯指标导出相关配置
management: management:
...@@ -158,4 +163,3 @@ data: ...@@ -158,4 +163,3 @@ data:
curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_topic_metric${logdate} || \ curl -s -o /dev/null -X PUT http://${esaddr}:${port}/ks_kafka_topic_metric${logdate} || \
exit 2 exit 2
done done
...@@ -3,7 +3,7 @@ replicaCount: 2 ...@@ -3,7 +3,7 @@ replicaCount: 2
image: image:
repository: knowstreaming/knowstreaming-manager repository: knowstreaming/knowstreaming-manager
pullPolicy: IfNotPresent pullPolicy: IfNotPresent
tag: "0.1.0" tag: "0.2.0"
imagePullSecrets: [] imagePullSecrets: []
nameOverride: "" nameOverride: ""
...@@ -73,7 +73,7 @@ knowstreaming-web: ...@@ -73,7 +73,7 @@ knowstreaming-web:
image: image:
repository: knowstreaming/knowstreaming-ui repository: knowstreaming/knowstreaming-ui
pullPolicy: IfNotPresent pullPolicy: IfNotPresent
tag: "0.1.0" tag: "0.2.0"
service: service:
type: NodePort type: NodePort
......
...@@ -48,7 +48,7 @@ INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `l ...@@ -48,7 +48,7 @@ INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `l
-- 初始化用户 -- 初始化用户
--INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOSGxOUkVsNVdETjBRVlp0Y0V0T1IwWnlaVEZ6YWxGRVJrRkpNVEU1VTJwYVUySkhlRzlSU0RBOWUwQldha28wWVd0N1d5TkFNa0FqWFgxS05sSnNiR2hBZlE9PXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming'); -- INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOSGxOUkVsNVdETjBRVlp0Y0V0T1IwWnlaVEZ6YWxGRVJrRkpNVEU1VTJwYVUySkhlRzlSU0RBOWUwQldha28wWVd0N1d5TkFNa0FqWFgxS05sSnNiR2hBZlE9PXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming');
INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOVGRSUmxweFUycFNhR0V6ZEdKSk1FRjRVVU5PWkdaVmJ6SlZiWGh6WVVWQ09YdEFWbXBLTkdGcmUxc2pRREpBSTExOVNqWlNiR3hvUUgwPXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming'); INSERT INTO `logi_security_user` (`id`, `user_name`, `pw`, `real_name`, `is_delete`, `app_name`) VALUES ('1', 'admin', 'V1ZkU2RHRlhOVGRSUmxweFUycFNhR0V6ZEdKSk1FRjRVVU5PWkdaVmJ6SlZiWGh6WVVWQ09YdEFWbXBLTkdGcmUxc2pRREpBSTExOVNqWlNiR3hvUUgwPXtAVmpKNGFre1sjQDNAI119SjZSbGxoQH0=Mv{#cdRgJ45Lqx}3IubEW87!==', '系统管理员', '0', 'know-streaming');
-- 初始化角色 -- 初始化角色
...@@ -96,4 +96,4 @@ INSERT INTO `logi_security_user_role` (`id`, `user_id`, `role_id`, `is_delete`, ...@@ -96,4 +96,4 @@ INSERT INTO `logi_security_user_role` (`id`, `user_id`, `role_id`, `is_delete`,
INSERT INTO `logi_security_config` INSERT INTO `logi_security_config`
(`value_group`,`value_name`,`value`,`edit`,`status`,`memo`,`is_delete`,`app_name`,`operator`) (`value_group`,`value_name`,`value`,`edit`,`status`,`memo`,`is_delete`,`app_name`,`operator`)
VALUES VALUES
('SECURITY.LOGIN','SECURITY.TRICK_USERS','[\n \"admin\"\n]',1,1,'允许跳过登录的用户',0,'know-streaming','admin'); ('SECURITY.LOGIN','SECURITY.TRICK_USERS','[\n \"admin\"\n]',1,1,'允许跳过登录的用户',0,'know-streaming','admin');
\ No newline at end of file
...@@ -191,6 +191,10 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler { ...@@ -191,6 +191,10 @@ public class KafkaJMXClient extends AbstractClusterLoadedChangedHandler {
lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE); lambdaQueryWrapper.eq(BrokerPO::getStatus, Constant.ALIVE);
BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper); BrokerPO brokerPO = brokerDAO.selectOne(lambdaQueryWrapper);
if (brokerPO == null) {
return null;
}
return Broker.buildFrom(brokerPO); return Broker.buildFrom(brokerPO);
} }
} }
...@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic; ...@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.topic;
import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager; import com.xiaojukeji.know.streaming.km.biz.topic.TopicStateManager;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicRecordDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
</parent> </parent>
<properties> <properties>
<km.revision>3.0.0-beta.2</km.revision> <km.revision>3.0.0-beta.3</km.revision>
<maven.compiler.source>8</maven.compiler.source> <maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target> <maven.compiler.target>8</maven.compiler.target>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册