未验证 提交 2f72cbb6 编写于 作者: Z ZQKC 提交者: GitHub

Merge pull request #38 from yangbajing/feature/postgresql

管理端存储添加 PostgreSQL 数据库支持。
......@@ -45,12 +45,14 @@
- `Maven 3.5.0+`(后端打包依赖)
- `node v8.12.0+`(前端打包依赖)
- `Java 8+`(运行环境需要)
- `MySQL`(数据存储)
- `MySQL``PostgreSQL`(数据存储)
---
### 环境初始化
**MySQL**
执行[create_mysql_table.sql](doc/create_mysql_table.sql)中的SQL命令,从而创建所需的MySQL库及表,默认创建的库名是`kafka_manager`
```
......@@ -58,6 +60,24 @@
mysql -uXXXX -pXXX -h XXX.XXX.XXX.XXX -PXXXX < ./create_mysql_table.sql
```
**PostgreSQL**
执行[create_postgresql_table.sql](doc/create_postgresql_table.sql)中的SQL命令,从而创建所需的PostgreSQL表。
```
############# 示例:
psql -h XXX.XXX.XXX.XXX -U XXXX -d kafka_manager -f ./create_postgresql_table.sql
```
*PostgreSQL 用户、数据库创建方式*
```sql
create user admin encrypted password 'admin';
create database kafka_manager owner=admin template=template0 encoding='UTF-8' lc_collate='zh_CN.UTF-8' lc_ctype='zh_CN.UTF-8';
```
***默认配置使用 MySQL 数据库,若要使用 PostgreSQL 数据库,使用 `-Dspring.profiles.active=pg` 指定 `application-pg.yml` 配置文件。***
---
......
......@@ -751,8 +751,7 @@
"version": "1.0.0",
"resolved": "http://registry.npm.taobao.org/assert-plus/download/assert-plus-1.0.0.tgz",
"integrity": "sha1-8S4PPF13sLHN2RRpQuTpbB5N1SU=",
"dev": true,
"optional": true
"dev": true
},
"assign-symbols": {
"version": "1.0.0",
......@@ -1505,7 +1504,6 @@
"resolved": "http://registry.npm.taobao.org/combined-stream/download/combined-stream-1.0.7.tgz",
"integrity": "sha1-LR0kMXr7ir6V1tLAsHtXgTU52Cg=",
"dev": true,
"optional": true,
"requires": {
"delayed-stream": "~1.0.0"
}
......@@ -2222,8 +2220,7 @@
"version": "1.0.0",
"resolved": "http://registry.npm.taobao.org/delayed-stream/download/delayed-stream-1.0.0.tgz",
"integrity": "sha1-3zrhmayt+31ECqrgsp4icrJOxhk=",
"dev": true,
"optional": true
"dev": true
},
"depd": {
"version": "1.1.2",
......@@ -2941,8 +2938,7 @@
"version": "1.3.0",
"resolved": "http://registry.npm.taobao.org/extsprintf/download/extsprintf-1.3.0.tgz",
"integrity": "sha1-lpGEQOMEGnpBT4xS48V06zw+HgU=",
"dev": true,
"optional": true
"dev": true
},
"fast-deep-equal": {
"version": "2.0.1",
......@@ -3358,8 +3354,7 @@
"ansi-regex": {
"version": "2.1.1",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"aproba": {
"version": "1.2.0",
......@@ -3380,14 +3375,12 @@
"balanced-match": {
"version": "1.0.0",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"brace-expansion": {
"version": "1.1.11",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"balanced-match": "^1.0.0",
"concat-map": "0.0.1"
......@@ -3402,20 +3395,17 @@
"code-point-at": {
"version": "1.1.0",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"concat-map": {
"version": "0.0.1",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"console-control-strings": {
"version": "1.1.0",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"core-util-is": {
"version": "1.0.2",
......@@ -3532,8 +3522,7 @@
"inherits": {
"version": "2.0.3",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"ini": {
"version": "1.3.5",
......@@ -3545,7 +3534,6 @@
"version": "1.0.0",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"number-is-nan": "^1.0.0"
}
......@@ -3560,7 +3548,6 @@
"version": "3.0.4",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"brace-expansion": "^1.1.7"
}
......@@ -3568,14 +3555,12 @@
"minimist": {
"version": "0.0.8",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"minipass": {
"version": "2.3.5",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"safe-buffer": "^5.1.2",
"yallist": "^3.0.0"
......@@ -3594,7 +3579,6 @@
"version": "0.5.1",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"minimist": "0.0.8"
}
......@@ -3675,8 +3659,7 @@
"number-is-nan": {
"version": "1.0.1",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"object-assign": {
"version": "4.1.1",
......@@ -3688,7 +3671,6 @@
"version": "1.4.0",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"wrappy": "1"
}
......@@ -3774,8 +3756,7 @@
"safe-buffer": {
"version": "5.1.2",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"safer-buffer": {
"version": "2.1.2",
......@@ -3811,7 +3792,6 @@
"version": "1.0.2",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"code-point-at": "^1.0.0",
"is-fullwidth-code-point": "^1.0.0",
......@@ -3831,7 +3811,6 @@
"version": "3.0.1",
"bundled": true,
"dev": true,
"optional": true,
"requires": {
"ansi-regex": "^2.0.0"
}
......@@ -3875,14 +3854,12 @@
"wrappy": {
"version": "1.0.2",
"bundled": true,
"dev": true,
"optional": true
"dev": true
},
"yallist": {
"version": "3.0.3",
"bundled": true,
"dev": true,
"optional": true
"dev": true
}
}
},
......@@ -4864,8 +4841,7 @@
"version": "0.1.1",
"resolved": "http://registry.npm.taobao.org/jsbn/download/jsbn-0.1.1.tgz",
"integrity": "sha1-peZUwuWi3rXyAdls77yoDA7y9RM=",
"dev": true,
"optional": true
"dev": true
},
"json-parse-better-errors": {
"version": "1.0.2",
......@@ -8883,8 +8859,7 @@
"version": "0.14.5",
"resolved": "http://registry.npm.taobao.org/tweetnacl/download/tweetnacl-0.14.5.tgz",
"integrity": "sha1-WuaBd/GS1EViadEIr6k/+HQ/T2Q=",
"dev": true,
"optional": true
"dev": true
},
"type-is": {
"version": "1.6.16",
......
......@@ -43,5 +43,9 @@
<artifactId>mariadb-java-client</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -18,6 +18,9 @@ public class AccountDaoImpl implements AccountDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
......@@ -25,7 +28,7 @@ public class AccountDaoImpl implements AccountDao {
@Override
public int addNewAccount(AccountDO accountDO) {
accountDO.setStatus(DBStatusEnum.NORMAL.getStatus());
return sqlSession.insert("AccountDao.insert", accountDO);
return updateAccount(accountDO);
}
@Override
......@@ -35,6 +38,9 @@ public class AccountDaoImpl implements AccountDao {
@Override
public int updateAccount(AccountDO accountDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("AccountDao.insertOnPG", accountDO);
}
return sqlSession.insert("AccountDao.insert", accountDO);
}
......
......@@ -19,12 +19,18 @@ public class BrokerDaoImpl implements BrokerDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int replace(BrokerDO brokerInfoDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("BrokerDao.replaceOnPG", brokerInfoDO);
}
return sqlSession.insert("BrokerDao.replace", brokerInfoDO);
}
......
package com.xiaojukeji.kafka.manager.dao.impl;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("spring.datasource.kafka-manager")
public class KafkaManagerProperties {
private String jdbcUrl;
public String getJdbcUrl() {
return jdbcUrl;
}
public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
public boolean hasPG() {
return jdbcUrl.startsWith("jdbc:postgres");
}
}
......@@ -17,12 +17,18 @@ public class RegionDaoImpl implements RegionDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int insert(RegionDO regionDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("RegionDao.insertOnPG", regionDO);
}
return sqlSession.insert("RegionDao.insert", regionDO);
}
......
......@@ -19,12 +19,18 @@ public class TopicDaoImpl implements TopicDao {
@Autowired
private SqlSessionTemplate sqlSession;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int replace(TopicDO topicDO) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("TopicDao.replaceOnPG", topicDO);
}
return sqlSession.insert("TopicDao.replace", topicDO);
}
......
......@@ -25,12 +25,18 @@ public class TopicFavoriteDaoImpl implements TopicFavoriteDao {
@Autowired
private TransactionTemplate transactionTemplate;
@Autowired
private KafkaManagerProperties kafkaManagerProperties;
public void setSqlSession(SqlSessionTemplate sqlSession) {
this.sqlSession = sqlSession;
}
@Override
public int batchAdd(List<TopicFavoriteDO> topicFavoriteDOList) {
if (kafkaManagerProperties.hasPG()) {
return sqlSession.insert("TopicFavoriteDao.batchAddOnPG", topicFavoriteDOList);
}
return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList);
}
......
......@@ -22,6 +22,18 @@
]]>
</insert>
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.AccountDO">
<![CDATA[
insert into account
(username, password, role, status)
values
(#{username}, #{password}, #{role}, #{status})
on conflict (username) do update set password = excluded.password,
role = excluded.role,
status = excluded.status
]]>
</insert>
<delete id="deleteByName" parameterType="java.lang.String">
DELETE FROM account WHERE username = #{username}
</delete>
......
......@@ -20,6 +20,16 @@
(#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
</insert>
<insert id="replaceOnPG" parameterType="BrokerDO">
insert into broker
(cluster_id, broker_id, host, port, timestamp, status)
values (#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
on conflict (cluster_id, broker_id) do update set host = excluded.host,
port = excluded.port,
timestamp = excluded.timestamp,
status = excluded.status
</insert>
<delete id="deleteById" parameterType="java.util.Map">
DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId}
</delete>
......
......@@ -21,6 +21,15 @@
(#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
</insert>
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.RegionDO">
insert into region
(region_name, cluster_id, broker_list, description, operator)
values (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
on conflict (region_name, cluster_id) do update set broker_list = excluded.broker_list,
description = excluded.description,
operator = excluded.operator
</insert>
<delete id="deleteById" parameterType="java.lang.Long">
DELETE FROM region WHERE id = #{id}
</delete>
......
......@@ -19,6 +19,15 @@
(#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
</insert>
<insert id="replaceOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.TopicDO">
insert into topic
(cluster_id, topic_name, principals, description, status)
values (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
on conflict (cluster_id, topic_name) do update set principals = excluded.principals,
description = excluded.description,
status = excluded.status
</insert>
<delete id="deleteById" parameterType="java.lang.Long">
DELETE FROM topic WHERE id = #{id}
</delete>
......
......@@ -21,6 +21,15 @@
</foreach>
</insert>
<insert id="batchAddOnPG" parameterType="java.util.List">
insert into topic_favorite (cluster_id, topic_name, username)
values
<foreach item="TopicFavoriteDO" index="index" collection="list" separator=",">
(#{TopicFavoriteDO.clusterId}, #{TopicFavoriteDO.topicName}, #{TopicFavoriteDO.username})
</foreach>
on conflict (cluster_id, topic_name, username) do update set gmt_modify = now();
</insert>
<delete id="deleteById" parameterType="java.lang.Long">
DELETE FROM topic_favorite WHERE id=#{id}
</delete>
......
-- CREATE DATABASE kafka_manager;
-- \c kafka_manager;
SET TIME ZONE 'Asia/Chongqing';
SET CLIENT_ENCODING TO 'UTF-8';
CREATE OR REPLACE FUNCTION on_update_timestamp() RETURNS TRIGGER AS
$$
BEGIN
new.gmt_modify = current_timestamp;
return new;
END;
$$ LANGUAGE plpgsql;
-- 账号表
CREATE TABLE account
(
id bigserial NOT NULL, -- 'ID',
username varchar(64) NOT NULL UNIQUE DEFAULT '', -- '用户名',
password varchar(128) NOT NULL DEFAULT '', -- '密码',
role int NOT NULL DEFAULT 0, -- '角色类型, 0:普通用户',
status int NOT NULL DEFAULT 0, -- '0标识使用中,-1标识已废弃',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT account_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX account_uniq_username ON account (username);
INSERT INTO account(username, password, role)
VALUES ('admin', '21232f297a57a5a743894a0e4a801fc3', 2);
CREATE TRIGGER account_trig_gmt_modify
BEFORE UPDATE
ON account
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- 告警规则表
CREATE TABLE alarm_rule
(
id bigserial, -- '自增ID',
alarm_name varchar(128) NOT NULL DEFAULT '', -- '告警名字',
strategy_expressions text, -- '表达式',
strategy_filters text, -- '过滤条件',
strategy_actions text, -- '响应',
principals varchar(512) NOT NULL DEFAULT '', -- '负责人',
status int2 NOT NULL DEFAULT 1, -- '-1:逻辑删除, 0:关闭, 1:正常',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT alarm_rule_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX alarm_rule_uniq_alarm_name ON alarm_rule (alarm_name);
CREATE TRIGGER alarm_rule_trig_gmt_modify
BEFORE UPDATE
ON alarm_rule
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Broker信息表
CREATE TABLE broker
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
host varchar(128) NOT NULL DEFAULT '', -- 'Broker主机名',
port int NOT NULL DEFAULT '-1', -- 'Broker端口',
timestamp bigint NOT NULL DEFAULT '-1', -- '启动时间',
status int NOT NULL DEFAULT '0', -- '状态0有效,-1无效',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT broker_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX broker_uniq_cluster_id_broker_id ON broker (cluster_id, broker_id);
CREATE TRIGGER broker_trig_gmt_modify
BEFORE UPDATE
ON broker
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- BrokerMetric信息表
CREATE TABLE broker_metrics
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerID',
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒被拒绝字节数',
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数流入',
fail_fetch_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费失败数',
fail_produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒失败生产数',
fetch_consumer_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消费请求数',
produce_request decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒生产数',
request_handler_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '请求处理器繁忙百分比',
network_processor_idl_percent decimal(53, 2) NOT NULL DEFAULT '0.00', -- '网络处理器繁忙百分比',
request_queue_size bigint NOT NULL DEFAULT '0', -- '请求列表大小',
response_queue_size bigint NOT NULL DEFAULT '0', -- '响应列表大小',
log_flush_time decimal(53, 2) NOT NULL DEFAULT '0.00', -- '刷日志时间',
total_time_produce_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-平均值',
total_time_produce_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'produce请求处理总时间-99分位',
total_time_fetch_consumer_mean decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-平均值',
total_time_fetch_consumer_99th decimal(53, 2) NOT NULL DEFAULT '0.00', -- 'fetch请求总时间-99分位',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT broker_metrics_pk PRIMARY KEY (id)
);
CREATE INDEX broker_metrics_idx_cluster_id_broker_id_gmt_create ON broker_metrics (cluster_id, broker_id, gmt_create);
-- Cluster表
CREATE TABLE cluster
(
id bigserial, -- '集群ID',
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
zookeeper varchar(512) NOT NULL DEFAULT '', -- 'ZK地址',
bootstrap_servers varchar(512) NOT NULL DEFAULT '', -- 'Server地址',
kafka_version varchar(32) NOT NULL DEFAULT '', -- 'Kafka版本',
alarm_flag int2 NOT NULL DEFAULT '0', -- '0:不开启告警, 1开启告警',
security_protocol varchar(512) NOT NULL DEFAULT '', -- '安全协议',
sasl_mechanism varchar(512) NOT NULL DEFAULT '', -- '安全机制',
sasl_jaas_config varchar(512) NOT NULL DEFAULT '', -- 'Jaas配置',
status int2 NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT cluster_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX cluster_uniq_cluster_name ON cluster (cluster_name);
CREATE TRIGGER cluster_trig_gmt_modify
BEFORE UPDATE
ON cluster
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- ClusterMetrics信息
CREATE TABLE cluster_metrics
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
topic_num int NOT NULL DEFAULT '0', -- 'Topic数',
partition_num int NOT NULL DEFAULT '0', -- '分区数',
broker_num int NOT NULL DEFAULT '0', -- 'Broker数',
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流入(B)',
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒流出(B)',
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝(B)',
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒消息数(条)',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT cluster_metrics_pk PRIMARY KEY (id)
);
CREATE INDEX cluster_metrics_idx_cluster_id_gmt_create ON cluster_metrics (cluster_id, gmt_create);
-- Controller历史变更记录表
CREATE TABLE controller
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
broker_id int NOT NULL DEFAULT '-1', -- 'BrokerId',
host varchar(256) NOT NULL DEFAULT '', -- '主机名',
timestamp bigint NOT NULL DEFAULT '-1', -- 'Controller变更时间',
version int NOT NULL DEFAULT '-1', -- 'Controller格式版本',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT controller_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX controller_uniq_cluster_id_broker_id_timestamp ON controller (cluster_id, broker_id, timestamp);
-- Topic迁移信息
CREATE TABLE migration_task
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '0', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
reassignment_json text, -- '任务参数',
real_throttle bigint NOT NULL DEFAULT '0', -- '实际限流值(B/s)',
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
description varchar(256) NOT NULL DEFAULT '', -- '备注说明',
status int NOT NULL DEFAULT '0', -- '任务状态',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '任务修改时间',
CONSTRAINT migration_task_pk PRIMARY KEY (id)
);
CREATE TRIGGER migration_task_trig_gmt_modify
BEFORE UPDATE
ON migration_task
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
CREATE TABLE operation_history
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
operator varchar(128) NOT NULL DEFAULT '', -- '操作人',
operation varchar(256) NOT NULL DEFAULT '', -- '操作描述',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
PRIMARY KEY (id)
);
--='操作记录表';
-- 分区申请工单
CREATE TABLE order_partition
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流量',
description text, -- '备注信息',
order_status int NOT NULL DEFAULT '0', -- '工单状态',
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT order_partition_pk PRIMARY KEY (id)
);
CREATE TRIGGER order_partition_trig_gmt_modify
BEFORE UPDATE
ON order_partition
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Topic申请工单
CREATE TABLE order_topic
(
id bigserial, -- 'ID',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
cluster_name varchar(128) NOT NULL DEFAULT '', -- '集群名称',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
retention_time bigint NOT NULL DEFAULT '-1', -- '保留时间(ms)',
partition_num int NOT NULL DEFAULT '-1', -- '分区数',
replica_num int NOT NULL DEFAULT '-1', -- '副本数',
regions varchar(128) NOT NULL DEFAULT '', -- 'RegionId列表',
brokers varchar(128) NOT NULL DEFAULT '', -- 'Broker列表',
peak_bytes_in bigint NOT NULL DEFAULT '0', -- '峰值流入流量(KB)',
applicant varchar(128) NOT NULL DEFAULT '', -- '申请人',
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
description text, -- '备注信息',
order_status int NOT NULL DEFAULT '0', -- '工单状态',
approver varchar(128) NOT NULL DEFAULT '', -- '审批人',
opinion varchar(256) NOT NULL DEFAULT '', -- '审批意见',
status int NOT NULL DEFAULT '0', -- '状态,0标识有效,-1无效',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT order_topic_pk PRIMARY KEY (id)
);
CREATE TRIGGER order_topic_trig_gmt_modify
BEFORE UPDATE
ON order_topic
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Region信息表
CREATE TABLE region
(
id bigserial, -- 'id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
region_name varchar(128) NOT NULL DEFAULT '', -- 'Region名称',
broker_list varchar(256) NOT NULL DEFAULT '', -- 'Broker列表',
level int NOT NULL DEFAULT '0', -- 'Region重要等级, 0级普通, 1极重要,2级极重要',
operator varchar(45) NOT NULL DEFAULT '', -- '操作人',
description text, -- '备注说明',
status int NOT NULL DEFAULT '0', -- '状态,0正常,-1废弃,1容量已满',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT region_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX region_uniq_cluster_id_region_name ON region (cluster_id, region_name);
CREATE TRIGGER region_trig_gmt_modify
BEFORE UPDATE
ON region
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- Topic信息表
CREATE TABLE topic
(
id bigserial, -- 'ID',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
applicant varchar(256) NOT NULL DEFAULT '', -- '申请人',
principals varchar(256) NOT NULL DEFAULT '', -- '负责人',
description text, -- '备注信息',
status int NOT NULL DEFAULT '0', -- '0标识使用中,-1标识已废弃',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT topic_pk PRIMARY KEY (id)
); --='';
CREATE UNIQUE INDEX topic_uniq_cluster_id_topic_name ON topic (cluster_id, topic_name);
CREATE TRIGGER topic_trig_gmt_modify
BEFORE UPDATE
ON topic
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- 用户收藏的Topic表
CREATE TABLE topic_favorite
(
id bigserial, -- '自增Id',
username varchar(64) NOT NULL DEFAULT '', -- '用户名',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
status int NOT NULL DEFAULT '0', -- '删除标记, 0表示未删除, -1表示删除',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
gmt_modify timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '修改时间',
CONSTRAINT topic_favorite_pk PRIMARY KEY (id)
);
CREATE UNIQUE INDEX topic_favorite_uniq_username_cluster_id_topic_name ON topic_favorite (username, cluster_id, topic_name);
CREATE TRIGGER topic_favorite_trig_gmt_modify
BEFORE UPDATE
ON topic_favorite
FOR EACH ROW
EXECUTE PROCEDURE on_update_timestamp();
-- TopicMetrics表
CREATE TABLE topic_metrics
(
id bigserial, -- '自增id',
cluster_id bigint NOT NULL DEFAULT '-1', -- '集群ID',
topic_name varchar(192) NOT NULL DEFAULT '', -- 'Topic名称',
messages_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒进入消息条数',
bytes_in decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流入',
bytes_out decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒字节流出',
bytes_rejected decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒拒绝字节数',
total_produce_requests decimal(53, 2) NOT NULL DEFAULT '0.00', -- '每秒请求数',
gmt_create timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, -- '创建时间',
CONSTRAINT topic_metrics_pk PRIMARY KEY (id)
);
CREATE INDEX topic_metrics_idx_cluster_id_topic_name_gmt_create ON topic_metrics (cluster_id, topic_name, gmt_create);
......@@ -76,6 +76,7 @@ public class AdminTopicServiceImpl implements AdminTopicService {
OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message);
operationHistoryDao.insert(operationHistoryDO);
topicDao.replace(topicDO);
} catch (Exception e) {
return AdminTopicStatusEnum.REPLACE_DB_FAILED;
}
......@@ -188,4 +189,5 @@ public class AdminTopicServiceImpl implements AdminTopicService {
}
return AdminTopicStatusEnum.SUCCESS;
}
}
server:
port: 8080
tomcat:
accept-count: 100
max-connections: 1000
max-threads: 20
min-spare-threads: 20
spring:
application:
name: kafkamanager
datasource:
kafka-manager:
driver-class-name: org.postgresql.Driver
jdbc-url: jdbc:postgresql://localhost:5432/kafka_manager?reWriteBatchedInserts=true
username: admin
password: admin
type: com.zaxxer.hikari.HikariDataSource
hikari:
connection-init-sql: "SET TIME ZONE 'Asia/Chongqing';SET CLIENT_ENCODING TO 'UTF-8';"
connection-test-query: "select 1;"
main:
allow-bean-definition-overriding: true
logging:
config: classpath:logback-spring.xml
# kafka监控
kafka-monitor:
enabled: true
notify-kafka:
cluster-id: 95
topic-name: kmo_monitor
......@@ -11,16 +11,13 @@ spring:
name: kafkamanager
datasource:
kafka-manager:
driver-class-name: org.mariadb.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/kafka_manager?characterEncoding=UTF-8&serverTimezone=GMT%2B8
username: admin
password: admin
driver-class-name: org.mariadb.jdbc.Driver
main:
allow-bean-definition-overriding: true
profiles:
active: dev
logging:
config: classpath:logback-spring.xml
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册