index.go 36.5 KB
Newer Older
martianzhang's avatar
martianzhang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
/*
 * Copyright 2018 Xiaomi, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package advisor

import (
	"fmt"
	"strings"

	"github.com/XiaoMi/soar/ast"
	"github.com/XiaoMi/soar/common"
	"github.com/XiaoMi/soar/database"
	"github.com/XiaoMi/soar/env"

	"github.com/dchest/uniuri"
	"vitess.io/vitess/go/vt/sqlparser"
)

W
wu.sphinx 已提交
32
const (
W
wu.sphinx 已提交
33
	// IndexNameMaxLength Ref. https://dev.mysql.com/doc/refman/8.0/en/identifiers.html
W
wu.sphinx 已提交
34 35 36
	IndexNameMaxLength = 64
)

martianzhang's avatar
martianzhang 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
// IndexAdvisor 索引建议需要使用到的所有信息
type IndexAdvisor struct {
	vEnv      *env.VirtualEnv     // 线下虚拟测试环境(测试环境)
	rEnv      database.Connector  // 线上真实环境
	Ast       sqlparser.Statement // Vitess Parser生成的抽象语法树
	where     []*common.Column    // 所有where条件中用到的列
	whereEQ   []*common.Column    // where条件中可以加索引的等值条件列
	whereINEQ []*common.Column    // where条件中可以加索引的非等值条件列
	groupBy   []*common.Column    // group by可以加索引列
	orderBy   []*common.Column    // order by可以加索引列
	joinCond  [][]*common.Column  // 由于join condition跨层级间索引不可共用,需要多一个维度用来维护层级关系
	IndexMeta map[string]map[string]*database.TableIndexInfo
}

// IndexInfo 创建一条索引需要的信息
type IndexInfo struct {
	Name          string           `json:"name"`           // 索引名称
	Database      string           `json:"database"`       // 数据库名
	Table         string           `json:"table"`          // 表名
56
	DDL           string           `json:"ddl"`            // ALTER, CREATE 等类型的 DDL 语句
martianzhang's avatar
martianzhang 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
	ColumnDetails []*common.Column `json:"column_details"` // 列详情
}

// IndexAdvises IndexAdvises列表
type IndexAdvises []IndexInfo

// mergeAdvices 合并索引建议
func mergeAdvices(dst []IndexInfo, src ...IndexInfo) IndexAdvises {
	if len(src) == 0 {
		return dst
	}

	for _, newIdx := range src {
		has := false
		for _, idx := range dst {
			if newIdx.DDL == idx.DDL {
				common.Log.Debug("merge index %s and %s", idx.Name, newIdx.Name)
				has = true
			}
		}

		if !has {
			dst = append(dst, newIdx)
		}
	}

	return dst
}

// NewAdvisor 构造一个 IndexAdvisor 的时候就会对其本身结构初始化
// 获取 condition 中的等值条件、非等值条件,以及group by 、 order by信息
func NewAdvisor(env *env.VirtualEnv, rEnv database.Connector, q Query4Audit) (*IndexAdvisor, error) {
	common.Log.Debug("Enter: NewAdvisor(), Caller: %s", common.Caller())
	if common.Config.TestDSN.Disable {
		return nil, fmt.Errorf("TestDSN is Disabled: %s", common.Config.TestDSN.Addr)
	}
	// DDL 检测
	switch stmt := q.Stmt.(type) {
	case *sqlparser.DDL:
		// 获取ast中用到的库表
		sqlMeta := ast.GetMeta(q.Stmt, nil)
		for db := range sqlMeta {
			dbRef := db
			if db == "" {
				dbRef = rEnv.Database
			}

104
			// DDL 在 Env 初始化的时候已经执行过了
martianzhang's avatar
martianzhang 已提交
105 106 107 108 109 110 111 112 113
			if _, ok := env.TableMap[dbRef]; !ok {
				env.TableMap[dbRef] = make(map[string]string)
			}

			for _, tb := range sqlMeta[db].Table {
				env.TableMap[dbRef][tb.TableName] = tb.TableName
			}
		}

L
liipx 已提交
114 115 116 117 118
		return &IndexAdvisor{
			vEnv: env,
			rEnv: rEnv,
			Ast:  q.Stmt,
		}, nil
martianzhang's avatar
martianzhang 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141

	case *sqlparser.DBDDL:
		// 忽略建库语句
		return nil, nil

	case *sqlparser.Use:
		// 如果是use,切基础环境
		env.Database = env.DBHash(stmt.DBName.String())
		return nil, nil
	}

	return &IndexAdvisor{
		vEnv: env,
		rEnv: rEnv,
		Ast:  q.Stmt,

		// 所有的FindXXXXCols尽最大可能先排除不需要加索引的列,但由于元数据在此阶段尚未补齐,给出的列有可能也无法添加索引
		// 后续需要通过CompleteColumnsInfo + calcCardinality补全后再进一步判断
		joinCond:  ast.FindJoinCols(q.Stmt),
		whereEQ:   ast.FindWhereEQ(q.Stmt),
		whereINEQ: ast.FindWhereINEQ(q.Stmt),
		groupBy:   ast.FindGroupByCols(q.Stmt),
		orderBy:   ast.FindOrderByCols(q.Stmt),
L
liipx 已提交
142
		where:     ast.FindAllCols(q.Stmt, ast.WhereExpression),
martianzhang's avatar
martianzhang 已提交
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
		IndexMeta: make(map[string]map[string]*database.TableIndexInfo),
	}, nil
}

/*

关于如何添加索引:
在《Relational Database Index Design and the Optimizers》一书中,作者提出著名的的三星索引理论(Three-Star Index)

To Qualify for the First Star:
Pick the columns from all equal predicates (WHERE COL = . . .).
Make these the first columns of the index—in any order. For CURSOR41, the three-star index will begin with
columns LNAME, CITY or CITY, LNAME. In both cases the index slice that must be scanned will be as thin as possible.

To Qualify for the Second Star:
Add the ORDER BY columns. Do not change the order of these columns, but ignore columns that were already
picked in step 1. For example, if CURSOR41 had redundant columns in the ORDER BY, say ORDER BY LNAME,
FNAME or ORDER BY FNAME, CITY, only FNAME would be added in this step. When FNAME is the third index column,
the result table will be in the right order without sorting. The first FETCH call will return the row with
the smallest FNAME value.

To Qualify for the Third Star:
Add all the remaining columns from the SELECT statement. The order of the columns added in this step
has no impact on the performance of the SELECT, but the cost of updates should be reduced by placing volatile
columns at the end. Now the index contains all the columns required for an index-only access path.

索引添加算法正是以这个理想化索策略添为基础,尽可能的给予"三星"索引建议。

但又如《High Performance MySQL》一书中所说,索引并不总是最好的工具。只有当索引帮助存储引擎快速查找到记录带来的好处大于其
带来的额外工作时,索引才是有效的。

因此,在三星索引理论的基础上引入启发式索引算法,在第二颗星的实现上做了部分改进,对于非等值条件只会添加散粒度最高的一列到索引中,
并基于总体列的使用情况作出判断,按需对order by、group by添加索引,由此来想`增强索引建议的通用性。

*/

// IndexAdvise 索引优化建议算法入口主函数
// TODO 索引顺序该如何确定
func (idxAdv *IndexAdvisor) IndexAdvise() IndexAdvises {
	// 支持不依赖DB的索引建议分析
	if common.Config.TestDSN.Disable {
		// 未开启Env原数据依赖,信息不全的情况下可能会给予错误的索引建议,请人工进行核查。
		common.Log.Warn("TestDSN.Disable = true")
	}

	// 检查否是否含有子查询
	subQueries := ast.FindSubquery(0, idxAdv.Ast)
	var subQueryAdvises []IndexInfo
	// 含有子查询对子查询进行单独评审,子查询评审建议报错忽略
	if len(subQueries) > 0 {
		for _, subSQL := range subQueries {
			stmt, err := sqlparser.Parse(subSQL)
			if err != nil {
				continue
			}
			q := Query4Audit{
				Query: subSQL,
				Stmt:  stmt,
			}
			subIdxAdv, _ := NewAdvisor(idxAdv.vEnv, idxAdv.rEnv, q)
			subQueryAdvises = append(subQueryAdvises, subIdxAdv.IndexAdvise()...)
		}
	}

	// 变量初始化,用于存放索引信息,按照db.tb.[cols]组织
	indexList := make(map[string]map[string][]*common.Column)

	// 为用到的每一列填充库名,表名等信息
	var joinCond [][]*common.Column
	for _, joinCols := range idxAdv.joinCond {
		joinCond = append(joinCond, CompleteColumnsInfo(idxAdv.Ast, joinCols, idxAdv.vEnv))
	}
	idxAdv.joinCond = joinCond

	idxAdv.where = CompleteColumnsInfo(idxAdv.Ast, idxAdv.where, idxAdv.vEnv)
	idxAdv.whereEQ = CompleteColumnsInfo(idxAdv.Ast, idxAdv.whereEQ, idxAdv.vEnv)
	idxAdv.whereINEQ = CompleteColumnsInfo(idxAdv.Ast, idxAdv.whereINEQ, idxAdv.vEnv)
	idxAdv.groupBy = CompleteColumnsInfo(idxAdv.Ast, idxAdv.groupBy, idxAdv.vEnv)
	idxAdv.orderBy = CompleteColumnsInfo(idxAdv.Ast, idxAdv.orderBy, idxAdv.vEnv)

	// 只要在开启使用env元数据的时候才会计算散粒度
	if !common.Config.TestDSN.Disable {
		// 计算joinCond, whereEQ, whereINEQ用到的每一列的散粒度,并排序,方便后续添加复合索引
		// groupBy, orderBy列按书写顺序给索引建议,不需要按散粒度排序
		idxAdv.calcCardinality(idxAdv.whereEQ)
		idxAdv.calcCardinality(idxAdv.whereINEQ)
		idxAdv.calcCardinality(idxAdv.orderBy)
		idxAdv.calcCardinality(idxAdv.groupBy)

		for i, joinCols := range idxAdv.joinCond {
			idxAdv.calcCardinality(joinCols)
			joinCols = common.ColumnSort(joinCols)
			idxAdv.joinCond[i] = joinCols
		}

		// 根据散粒度进行排序
		// 对所有列进行排序,按散粒度由大到小排序
		idxAdv.whereEQ = common.ColumnSort(idxAdv.whereEQ)
		idxAdv.whereINEQ = common.ColumnSort(idxAdv.whereINEQ)
		idxAdv.orderBy = common.ColumnSort(idxAdv.orderBy)
		idxAdv.groupBy = common.ColumnSort(idxAdv.groupBy)

	}

	// 是否指定Where条件,打标签
	hasWhere := false
	err := sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
		switch where := node.(type) {
		case *sqlparser.Subquery:
			return false, nil
		case *sqlparser.Where:
			if where != nil {
				hasWhere = true
			}
		}
		return true, nil
	}, idxAdv.Ast)
	common.LogIfError(err, "")
	// 获取哪些列被忽略
	var ignore []*common.Column
	usedCols := append(idxAdv.whereINEQ, idxAdv.whereEQ...)

	for _, whereCol := range idxAdv.where {
		isUsed := false
		for _, used := range usedCols {
			if whereCol.Equal(used) {
				isUsed = true
			}
		}

		if !isUsed {
			common.Log.Debug("column %s in `%s`.`%s` will ignore when adding index", whereCol.DB, whereCol.Table, whereCol.Name)
			ignore = append(ignore, whereCol)
		}

	}

	// 索引优化算法入口,从这里开始放大招
	if hasWhere {
		// 有Where条件的先分析 等值条件
		for _, index := range idxAdv.whereEQ {
			// 对应列在前面已经按散粒度由大到小排序好了
			mergeIndex(indexList, index)
		}
		// 若存在非等值查询条件,可以给第一个非等值条件添加索引
		if len(idxAdv.whereINEQ) > 0 {
			mergeIndex(indexList, idxAdv.whereINEQ[0])
		}
291
		// 有WHERE条件,但 WHERE 条件未能给出索引建议就不能再加 GROUP BY 和 ORDER BY 建议了
martianzhang's avatar
martianzhang 已提交
292
		if len(ignore) == 0 {
293
			// 没有非等值查询条件时可以再为 GroupBy 和 OrderBy 添加索引
martianzhang's avatar
martianzhang 已提交
294 295 296 297 298
			for _, index := range idxAdv.groupBy {
				mergeIndex(indexList, index)
			}

			// OrderBy
299
			// 没有 GroupBy 时可以为 OrderBy 加索引
martianzhang's avatar
martianzhang 已提交
300 301 302 303 304 305 306
			if len(idxAdv.groupBy) == 0 {
				for _, index := range idxAdv.orderBy {
					mergeIndex(indexList, index)
				}
			}
		}
	} else {
307
		// 未指定 Where 条件的,只需要 GroupBy 和 OrderBy 的索引建议
martianzhang's avatar
martianzhang 已提交
308 309 310 311 312
		for _, index := range idxAdv.groupBy {
			mergeIndex(indexList, index)
		}

		// OrderBy
313 314
		// 没有GroupBy 时可以为 OrderBy 加索引
		// 没有 where 条件时 OrderBy 的索引仅能够在索引覆盖的情况下被使用
L
fix #12  
liipx 已提交
315 316 317 318 319 320

		// if len(idxAdv.groupBy) == 0 {
		// 	for _, index := range idxAdv.orderBy {
		// 		mergeIndex(indexList, index)
		// 	}
		// }
martianzhang's avatar
martianzhang 已提交
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
	}

	// 开始整合索引信息,添加索引
	var indexes []IndexInfo

	// 为join添加索引
	// 获取 join condition 中需要加索引的表有哪些
	defaultDB := ""
	if !common.Config.TestDSN.Disable {
		defaultDB = idxAdv.vEnv.RealDB(idxAdv.vEnv.Database)
	}
	if !common.Config.OnlineDSN.Disable {
		defaultDB = idxAdv.rEnv.Database
	}

	// 根据join table的信息给予优化建议
	joinTableMeta := ast.FindJoinTable(idxAdv.Ast, nil).SetDefault(idxAdv.rEnv.Database).SetDefault(defaultDB)
	indexes = mergeAdvices(indexes, idxAdv.buildJoinIndex(joinTableMeta)...)

	if common.Config.TestDSN.Disable || common.Config.OnlineDSN.Disable {
341 342
		// 无 env 环境下只提供单列索引,无法确定 table 时不给予优化建议
		// 仅有 table 信息时给出的建议不包含 DB 信息
martianzhang's avatar
martianzhang 已提交
343 344 345 346 347 348 349 350
		indexes = mergeAdvices(indexes, idxAdv.buildIndexWithNoEnv(indexList)...)
	} else {
		// 给出尽可能详细的索引建议
		indexes = mergeAdvices(indexes, idxAdv.buildIndex(indexList)...)
	}

	indexes = mergeAdvices(indexes, subQueryAdvises...)

351
	// 在开启 env 的情况下,检查数据库版本,字段类型,索引总长度
martianzhang's avatar
martianzhang 已提交
352 353
	indexes = idxAdv.idxColsTypeCheck(indexes)

354 355
	// 在开启 env 的情况下,会对索引进行检查,对全索引进行过滤
	// 在前几步都不会对 idx 生成 DDL 语句,DDL语句在这里生成
martianzhang's avatar
martianzhang 已提交
356 357 358 359
	return idxAdv.mergeIndexes(indexes)
}

// idxColsTypeCheck 对超长的字段添加前缀索引,剔除无法添索引字段的列
360
// TODO: 暂不支持 fulltext 索引,
martianzhang's avatar
martianzhang 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374
func (idxAdv *IndexAdvisor) idxColsTypeCheck(idxList []IndexInfo) []IndexInfo {
	if common.Config.TestDSN.Disable {
		return rmSelfDupIndex(idxList)
	}

	var indexes []IndexInfo

	for _, idx := range idxList {
		var newCols []*common.Column
		var newColInfo []string
		// 索引总长度
		idxBytesTotal := 0
		isOverFlow := false
		for _, col := range idx.ColumnDetails {
375
			// 获取字段 bytes
martianzhang's avatar
martianzhang 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
			bytes := col.GetDataBytes(common.Config.OnlineDSN.Version)
			tmpCol := col.Name
			overFlow := 0
			// 加上该列后是否索引长度过长
			if bytes < 0 {
				// bytes < 0 说明字段的长度是无法计算的
				common.Log.Warning("%s.%s data type not support %s, can't add index",
					col.Table, col.Name, col.DataType)
				continue
			}

			// idx bytes over flow
			if total := idxBytesTotal + bytes; total > common.Config.MaxIdxBytes {

				common.Log.Debug("bytes: %d, idxBytesTotal: %d, total: %d, common.Config.MaxIdxBytes: %d",
					bytes, idxBytesTotal, total, common.Config.MaxIdxBytes)

				overFlow = total - common.Config.MaxIdxBytes
				isOverFlow = true

			} else {
				idxBytesTotal = total
			}

			// common.Config.MaxIdxColBytes 默认大小 767
			if bytes > common.Config.MaxIdxBytesPerColumn || isOverFlow {
				// In 5.6, you may not include a column that equates to
				// bigger than 767 bytes: VARCHAR(255) CHARACTER SET utf8 or VARCHAR(191) CHARACTER SET utf8mb4.
				// In 5.7  you may not include a column that equates to
				// bigger than 3072 bytes.

				// v : 在 col.Character 字符集下每个字符占用 v bytes
				v, ok := common.CharSets[strings.ToLower(col.Character)]
				if !ok {
					// 找不到对应字符集,不添加索引
					// 如果出现不认识的字符集,认为每个字符占用4个字节
					common.Log.Warning("%s.%s(%s) charset not support yet %s, use default 4 bytes length",
						col.Table, col.Name, col.DataType, col.Character)
					v = 4
				}

				// 保留两个字节的安全余量
				length := (common.Config.MaxIdxBytesPerColumn - 2) / v
				if isOverFlow {
					// 在索引中添加该列会导致索引长度过长,建议根据需求转换为合理的前缀索引
					// _OPR_SPLIT_ 是自定的用于后续处理的特殊分隔符
					common.Log.Warning("adding index '%s(%s)' to table '%s' causes the index to be too long, overflow is %d",
						col.Name, col.DataType, col.Table, overFlow)
					tmpCol += fmt.Sprintf("_OPR_SPLIT_(N)")
				} else {
					// 索引没有过长,可以加一个最长的前缀索引
					common.Log.Warning("index column too large: %s.%s --> %s.%s(%d), data type: %s",
						col.Table, col.Name, col.Table, tmpCol, length, col.DataType)
					tmpCol += fmt.Sprintf("_OPR_SPLIT_(%d)", length)
				}

			}

			newCols = append(newCols, col)
			newColInfo = append(newColInfo, tmpCol)
		}

		// 为新索引重建索引语句
		idxName := "idx_"
		idxCols := ""
		for i, newCol := range newColInfo {
			// 对名称和可能存在的长度进行拼接
			// 用等号进行分割
			tmp := strings.Split(newCol, "_OPR_SPLIT_")
			idxName += tmp[0]
			if len(tmp) > 1 {
				idxCols += tmp[0] + "`" + tmp[1]
			} else {
				idxCols += tmp[0] + "`"
			}

			if i+1 < len(newColInfo) {
				idxName += "_"
				idxCols += ",`"
			}
		}

		// 索引名称最大长度64
W
wu.sphinx 已提交
459 460 461
		if len(idxName) > IndexNameMaxLength {
			common.Log.Warn("index '%s' name large than IndexNameMaxLength", idxName)
			idxName = strings.TrimRight(idxName[:IndexNameMaxLength], "_")
martianzhang's avatar
martianzhang 已提交
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
		}

		// 新的alter语句
		newDDL := fmt.Sprintf("alter table `%s`.`%s` add index `%s` (`%s)", idxAdv.vEnv.RealDB(idx.Database),
			idx.Table, idxName, idxCols)

		// 将筛选改造后的索引信息信息加入到新的索引列表中
		idx.ColumnDetails = newCols
		idx.DDL = newDDL
		indexes = append(indexes, idx)
	}

	return indexes
}

// mergeIndexes 与线上环境对比,将给出的索引建议进行去重
func (idxAdv *IndexAdvisor) mergeIndexes(idxList []IndexInfo) []IndexInfo {
	// TODO 暂不支持前缀索引去重
	if common.Config.TestDSN.Disable {
		return rmSelfDupIndex(idxList)
	}

	var indexes []IndexInfo
	for _, idx := range idxList {
486
		// 将 DB 替换成 vEnv 中的数据库名称
martianzhang's avatar
martianzhang 已提交
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
		dbInVEnv := idx.Database
		if _, ok := idxAdv.vEnv.DBRef[idx.Database]; ok {
			dbInVEnv = idxAdv.vEnv.DBRef[idx.Database]
		}

		// 检测索引添加的表是否是视图
		if idxAdv.vEnv.IsView(idx.Table) {
			common.Log.Info("%s.%s is a view. no need indexed", idx.Database, idx.Table)
			continue
		}

		// 检测是否存在重复索引
		indexMeta := idxAdv.IndexMeta[dbInVEnv][idx.Table]
		isExisted := false

		// 检测无索引列的情况
		if len(idx.ColumnDetails) < 1 {
			continue
		}

		if existedIndexes := indexMeta.FindIndex(database.IndexColumnName, idx.ColumnDetails[0].Name); len(existedIndexes) > 0 {
			for _, existedIdx := range existedIndexes {
				// flag: 用于标记已存在的索引是否是约束条件
				isConstraint := false

				var cols []string
				var colsDetail []*common.Column

515
				// 把已经存在的 key 摘出来遍历一遍对比是否是包含关系
martianzhang's avatar
martianzhang 已提交
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543
				for _, col := range indexMeta.FindIndex(database.IndexKeyName, existedIdx.KeyName) {
					cols = append(cols, col.ColumnName)
					colsDetail = append(colsDetail, &common.Column{
						Name:  col.ColumnName,
						Table: idx.Table,
						DB:    idx.ColumnDetails[0].DB,
					})
				}

				// 判断已存在的索引是否属于约束条件(唯一索引、主键)
				// 这里可以忽略是否含有外键的情况,因为索引已经重复了,添加了新索引后原先重复的索引是可以删除的。
				if existedIdx.NonUnique == 0 {
					common.Log.Debug("%s.%s表%s为约束条件", dbInVEnv, idx.Table, existedIdx.KeyName)
					isConstraint = true
				}

				// 如果已存在的索引与索引建议存在重叠,则说明无需添加新索引或可能需要给出删除索引的建议
				if common.IsColsPart(colsDetail, idx.ColumnDetails) {
					idxName := existedIdx.KeyName
					// 如果已经存在的索引包含需要添加的索引,则无需添加
					if len(colsDetail) >= len(idx.ColumnDetails) {
						common.Log.Info(" `%s`.`%s` %s already had a index `%s`",
							idx.Database, idx.Table, strings.Join(cols, ","), idxName)
						isExisted = true
						continue
					}

					// 库、表、列名需要用反撇转义
544
					// TODO: 关于外键索引去重的优雅解决方案
martianzhang's avatar
martianzhang 已提交
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
					if !isConstraint {
						if common.Config.AllowDropIndex {
							alterSQL := fmt.Sprintf("alter table `%s`.`%s` drop index `%s`", idx.Database, idx.Table, idxName)
							indexes = append(indexes, IndexInfo{
								Name:          idxName,
								Database:      idx.Database,
								Table:         idx.Table,
								DDL:           alterSQL,
								ColumnDetails: colsDetail,
							})
						} else {
							common.Log.Warning("In table `%s`, the new index of column `%s` contains index `%s`,"+
								" maybe you could drop one of them.", existedIdx.Table,
								strings.Join(cols, ","), idxName)
						}
					}
				}
			}
		}

		if !isExisted {
			// 检测索引名称是否重复?
			if existedIndexes := indexMeta.FindIndex(database.IndexKeyName, idx.Name); len(existedIndexes) > 0 {
				var newName string
W
wu.sphinx 已提交
569
				idxSuffix := getRandomIndexSuffix()
W
wu.sphinx 已提交
570 571
				if len(idx.Name) < IndexNameMaxLength-len(idxSuffix) {
					newName = idx.Name + idxSuffix
martianzhang's avatar
martianzhang 已提交
572
				} else {
W
wu.sphinx 已提交
573
					newName = idx.Name[:IndexNameMaxLength-len(idxSuffix)] + idxSuffix
martianzhang's avatar
martianzhang 已提交
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
				}

				common.Log.Warning("duplicate index name '%s', new name is '%s'", idx.Name, newName)
				idx.DDL = strings.Replace(idx.DDL, idx.Name, newName, -1)
				idx.Name = newName
			}

			// 添加合并
			indexes = mergeAdvices(indexes, idx)
		}

	}

	// 对索引进行去重
	return rmSelfDupIndex(indexes)
}

W
wu.sphinx 已提交
591 592
// getRandomIndexSuffix format: _xxxx, length: 5
func getRandomIndexSuffix() string {
W
wu.sphinx 已提交
593 594 595
	return fmt.Sprintf("_%s", uniuri.New()[:4])
}

martianzhang's avatar
martianzhang 已提交
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670
// rmSelfDupIndex 去重传入的[]IndexInfo中重复的索引
func rmSelfDupIndex(indexes []IndexInfo) []IndexInfo {
	var resultIndex []IndexInfo
	tmpIndexList := indexes
	for _, a := range indexes {
		tmp := a
		for i, b := range tmpIndexList {
			if common.IsColsPart(tmp.ColumnDetails, b.ColumnDetails) && tmp.Name != b.Name {
				if len(b.ColumnDetails) > len(tmp.ColumnDetails) {
					common.Log.Debug("remove duplicate index: %s", tmp.Name)
					tmp = b
				}

				if i < len(tmpIndexList) {
					tmpIndexList = append(tmpIndexList[:i], tmpIndexList[i+1:]...)
				} else {
					tmpIndexList = tmpIndexList[:i]
				}

			}
		}
		resultIndex = mergeAdvices(resultIndex, tmp)
	}

	return resultIndex
}

// buildJoinIndex 检查Join中使用的库表是否需要添加索引并给予索引建议
func (idxAdv *IndexAdvisor) buildJoinIndex(meta common.Meta) []IndexInfo {
	var indexes []IndexInfo
	for _, IndexCols := range idxAdv.joinCond {
		// 如果该列的库表为join condition中需要添加索引的库表
		indexColsList := make(map[string]map[string][]*common.Column)
		for _, col := range IndexCols {
			mergeIndex(indexColsList, col)
		}

		if common.Config.TestDSN.Disable || common.Config.OnlineDSN.Disable {
			indexes = mergeAdvices(indexes, idxAdv.buildIndexWithNoEnv(indexColsList)...)
			continue
		}

		indexes = mergeAdvices(indexes, idxAdv.buildIndex(indexColsList)...)
	}
	return indexes
}

// buildIndex 尽可能的将 map[string]map[string][]*common.Column 转换成 []IndexInfo
// 此处不判断索引是否重复
func (idxAdv *IndexAdvisor) buildIndex(idxList map[string]map[string][]*common.Column) []IndexInfo {
	var indexes []IndexInfo
	for db, tbs := range idxList {
		for tb, cols := range tbs {

			// 单个索引中含有的列收 config 中参数限制
			if len(cols) > common.Config.MaxIdxColsCount {
				cols = cols[:common.Config.MaxIdxColsCount]
			}

			var colNames []string
			for _, col := range cols {
				if col.DB == "" || col.Table == "" {
					common.Log.Warn("can not get the meta info of column '%s'", col.Name)
					continue
				}
				colNames = append(colNames, col.Name)
			}

			if len(colNames) == 0 {
				continue
			}

			idxName := "idx_" + strings.Join(colNames, "_")

			// 索引名称最大长度64
W
wu.sphinx 已提交
671 672 673
			if len(idxName) > IndexNameMaxLength {
				common.Log.Warn("index '%s' name large than IndexNameMaxLength", idxName)
				idxName = strings.TrimRight(idxName[:IndexNameMaxLength], "_")
martianzhang's avatar
martianzhang 已提交
674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
			}

			alterSQL := fmt.Sprintf("alter table `%s`.`%s` add index `%s` (`%s`)", idxAdv.vEnv.RealDB(db), tb,
				idxName, strings.Join(colNames, "`,`"))

			indexes = append(indexes, IndexInfo{
				Name:          idxName,
				Database:      idxAdv.vEnv.RealDB(db),
				Table:         tb,
				DDL:           alterSQL,
				ColumnDetails: cols,
			})
		}
	}
	return indexes
}

// buildIndexWithNoEnv 忽略原数据,给予最基础的索引
func (idxAdv *IndexAdvisor) buildIndexWithNoEnv(indexList map[string]map[string][]*common.Column) []IndexInfo {
	// 如果不获取数据库原信息,则不去判断索引是否重复,且只给单列加索引
	var indexes []IndexInfo
	for _, tableIndex := range indexList {
		for _, indexCols := range tableIndex {
			for _, col := range indexCols {
				if col.Table == "" {
					common.Log.Warn("can not get the meta info of column '%s'", col.Name)
					continue
				}
				idxName := "idx_" + col.Name
				// 库、表、列名需要用反撇转义
				alterSQL := fmt.Sprintf("alter table `%s`.`%s` add index `%s` (`%s`)", idxAdv.vEnv.RealDB(col.DB), col.Table, idxName, col.Name)
				if col.DB == "" {
					alterSQL = fmt.Sprintf("alter table `%s` add index `%s` (`%s`)", col.Table, idxName, col.Name)
				}

				indexes = append(indexes, IndexInfo{
					Name:          idxName,
					Database:      idxAdv.vEnv.RealDB(col.DB),
					Table:         col.Table,
					DDL:           alterSQL,
					ColumnDetails: []*common.Column{col},
				})
			}

		}
	}
	return indexes
}

// mergeIndex 将索引用到的列去重后合并到一起
func mergeIndex(idxList map[string]map[string][]*common.Column, column *common.Column) {
L
liipx 已提交
725
	// 散粒度低于阈值将不会添加索引
L
fix  
liipx 已提交
726
	if common.Config.MinCardinality/100 > column.Cardinality {
L
liipx 已提交
727 728 729
		return
	}

martianzhang's avatar
martianzhang 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
	db := column.DB
	tb := column.Table
	if idxList[db] == nil {
		idxList[db] = make(map[string][]*common.Column)
	}
	if idxList[db][tb] == nil {
		idxList[db][tb] = make([]*common.Column, 0)
	}

	// 去除重复列Append
	exist := false
	for _, cl := range idxList[db][tb] {
		if cl.Name == column.Name {
			exist = true
		}
	}
	if !exist {
		idxList[db][tb] = append(idxList[db][tb], column)
	}
}

// CompleteColumnsInfo 补全索引可能会用到列的所属库名、表名等信息
func CompleteColumnsInfo(stmt sqlparser.Statement, cols []*common.Column, env *env.VirtualEnv) []*common.Column {
	// 如果传过来的列是空的,没必要跑逻辑
	if len(cols) == 0 {
		return cols
	}

758
	// 从 Ast 中拿到 DBStructure,包含所有表的相关信息
martianzhang's avatar
martianzhang 已提交
759 760
	dbs := ast.GetMeta(stmt, nil)

761
	// 此处生成的 meta 信息中不应该含有""db的信息,若 DB 为空则认为是已传入的 db 为默认 db 并进行信息补全
martianzhang's avatar
martianzhang 已提交
762
	// BUG Fix:
763
	// 修补 dbs 中空 DB 的导致后续补全列信息时无法获取正确 table 名称的问题
martianzhang's avatar
martianzhang 已提交
764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850
	if _, ok := dbs[""]; ok {
		dbs[env.Database] = dbs[""]
		delete(dbs, "")
	}

	tableCount := 0
	for db := range dbs {
		for tb := range dbs[db].Table {
			if tb != "" {
				tableCount++
			}
		}
	}

	var noEnvTmp []*common.Column
	for _, col := range cols {
		for db := range dbs {
			// 对每一列进行比对,将别名转换为正确的名称
			find := false
			for _, tb := range dbs[db].Table {
				for _, tbAlias := range tb.TableAliases {
					if col.Table != "" && col.Table == tbAlias {
						common.Log.Debug("column '%s' prefix change: %s --> %s", col.Name, col.Table, tb.TableName)
						find = true
						col.Table = tb.TableName
						col.DB = db
						break
					}
				}
				if find {
					break
				}

			}

			// 如果不依赖env环境,利用ast中包含的信息推理列的库表信息
			if common.Config.TestDSN.Disable {
				if tableCount == 1 {
					for _, tb := range dbs[db].Table {
						col.Table = tb.TableName

						// 因为tableMeta是按照库表组织的树状结构,db变量贯穿全局
						// 只有在最终赋值前才能根据逻辑变更补全
						if db == "" {
							db = env.Database
						}
						col.DB = db
					}
				}

				// 如果SQL中含有的表大于一个,则使用的列中必须含有前缀,不然无法判断该列属于哪个表
				// 如果某一列未含有前缀信息,则认为两张表中都含有该列,需要由人去判断
				if tableCount > 1 {
					if col.Table == "" {
						for _, tb := range dbs[db].Table {
							if tb.TableName == "" {
								common.Log.Warn("can not get the meta info of column '%s'", col.Name)
							}

							if db == "" {
								db = env.RealDB(env.Database)
							}
							col.Table = tb.TableName
							col.DB = db

							tmp := *col
							tmp.Table = tb.TableName
							tmp.DB = db

							noEnvTmp = append(noEnvTmp, &tmp)
						}
					}

					if col.DB == "" {
						if db == "" {
							db = env.Database
						}
						col.DB = db
					}
				}

				break
			}

			// 将已经获取到正确表信息的列信息带入到env中,利用show columns where table 获取库表信息
			// 此出会传入之前从ast中,该 db 下获取的所有表来作为where限定条件,
			// 防止与SQL无关的库表信息干扰准确性
851
			// 此处传入的是测试环境,DB 是经过变换的,所以在寻找列名的时候需要将 DB 名称转换成测试环境中经过 hash 的 DB 名称
martianzhang's avatar
martianzhang 已提交
852 853 854 855 856 857 858 859 860 861
			// 不然会找不到col的信息
			realCols, err := env.FindColumn(col.Name, env.DBHash(db), dbs.Tables(db)...)
			if err != nil {
				common.Log.Warn("%v", err)
				continue
			}

			// 对比 column 信息中的表名与从 env 中获取的库表名的一致性
			for _, realCol := range realCols {
				if col.Name == realCol.Name {
862
					// 如果查询到了列名一致,但从 ast 中获取的列的前缀与 env 中的表信息不符
martianzhang's avatar
martianzhang 已提交
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918
					// 1.存在一个同名列,但不同表,该情况下忽略
					// 2.存在一个未正确转换的别名(如表名为),该情况下修正,大概率是正确的
					if col.Table != "" && col.Table != realCol.Table {
						has, _ := env.FindColumn(col.Name, env.DBHash(db), col.Table)
						if len(has) > 0 {
							realCol = has[0]
						}
					}

					col.DataType = realCol.DataType
					col.Table = realCol.Table
					col.DB = env.RealDB(realCol.DB)
					col.Character = realCol.Character
					col.Collation = realCol.Collation

				}
			}
		}

	}

	// 如果不依赖env环境,将可能存在的列也加入到索引预处理列表中
	if common.Config.TestDSN.Disable {
		cols = append(cols, noEnvTmp...)
	}

	return cols
}

// calcCardinality 计算每一列的散粒度
// 这个函数需要在补全列的库表信息之后再调用,否则无法确定要计算列的归属
func (idxAdv *IndexAdvisor) calcCardinality(cols []*common.Column) []*common.Column {
	common.Log.Debug("Enter: calcCardinality(), Caller: %s", common.Caller())
	tmpDB := *idxAdv.vEnv
	for _, col := range cols {
		// 补全对应列的库->表->索引信息到IndexMeta
		// 这将在后面用于判断某一列是否为主键或单列唯一索引,快速返回散粒度
		if col.DB == "" {
			col.DB = idxAdv.vEnv.Database
		}
		realDB := idxAdv.vEnv.DBHash(col.DB)
		if idxAdv.IndexMeta[realDB] == nil {
			idxAdv.IndexMeta[realDB] = make(map[string]*database.TableIndexInfo)
		}

		if idxAdv.IndexMeta[realDB][col.Table] == nil {
			tmpDB.Database = realDB
			indexInfo, err := tmpDB.ShowIndex(col.Table)
			if err != nil {
				// 如果是不存在的表就会报错,报错的可能性有三个:
				// 1.数据库错误  2.表不存在  3.临时表
				// 而这三种错误都是不需要在这一层关注的,直接跳过
				common.Log.Debug("calcCardinality error: %v", err)
				continue
			}

919
			// 将获取的索引信息以db.tb 维度组织到 IndexMeta 中
martianzhang's avatar
martianzhang 已提交
920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999
			idxAdv.IndexMeta[realDB][col.Table] = indexInfo
		}

		// 检查对应列是否为主键或单列唯一索引,如果满足直接返回1,不再重复计算,提高效率
		// 多列复合唯一索引不能跳过计算,单列普通索引不能跳过计算
		for _, index := range idxAdv.IndexMeta[realDB][col.Table].IdxRows {
			// 根据索引的名称判断该索引包含的列数,列数大于1即为复合索引
			columnCount := len(idxAdv.IndexMeta[realDB][col.Table].FindIndex(database.IndexKeyName, index.KeyName))
			if col.Name == index.ColumnName {
				// 主键、唯一键 无需计算散粒度
				if (index.KeyName == "PRIMARY" || index.NonUnique == 0) && columnCount == 1 {
					common.Log.Debug("column '%s' is PK or UK, no need to calculate cardinality.", col.Name)
					col.Cardinality = 1
					break
				}
			}

		}

		// 给非 PRIMARY、UNIQUE 的列计算散粒度
		if col.Cardinality != 1 {
			col.Cardinality = idxAdv.vEnv.ColumnCardinality(col.Table, col.Name)
		}
	}

	return cols
}

// Format 用于格式化输出索引建议
func (idxAdvs IndexAdvises) Format() map[string]Rule {
	rulesMap := make(map[string]Rule)
	number := 1
	rules := make(map[string]*Rule)
	sqls := make(map[string][]string)

	for _, advise := range idxAdvs {
		advKey := advise.Database + advise.Table

		if _, ok := sqls[advKey]; !ok {
			sqls[advKey] = make([]string, 0)
		}

		sqls[advKey] = append(sqls[advKey], advise.DDL)

		if _, ok := rules[advKey]; !ok {
			summary := fmt.Sprintf("为%s库的%s表添加索引", advise.Database, advise.Table)
			if advise.Database == "" {
				summary = fmt.Sprintf("为%s表添加索引", advise.Table)
			}

			rules[advKey] = &Rule{
				Summary:  summary,
				Content:  "",
				Severity: "L2",
			}
		}

		for _, col := range advise.ColumnDetails {
			// 为了更好地显示效果
			if common.Config.Sampling {
				cardinal := fmt.Sprintf("%0.2f", col.Cardinality*100)
				if cardinal != "0.00" {
					rules[advKey].Content += fmt.Sprintf("为列%s添加索引,散粒度为: %s%%; ",
						col.Name, cardinal)
				}
			} else {
				rules[advKey].Content += fmt.Sprintf("为列%s添加索引;", col.Name)
			}
		}
		// 清理多余的标点
		rules[advKey].Content = strings.Trim(rules[advKey].Content, common.Config.Delimiter)
	}

	for adv := range rules {
		key := fmt.Sprintf("IDX.%03d", number)
		ddl := ast.MergeAlterTables(sqls[adv]...)
		// 由于传入合并的SQL都是一张表的,所以一定只会输出一条ddl语句
		for _, v := range ddl {
			rules[adv].Case = v
		}
L
liipx 已提交
1000 1001 1002 1003

		// set item
		rules[adv].Item = key

martianzhang's avatar
martianzhang 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
		rulesMap[key] = *rules[adv]

		number++
	}

	return rulesMap
}

// HeuristicCheck 依赖数据字典的启发式检查
// IndexAdvisor会构建测试环境和数据字典,所以放在这里实现
func (idxAdv *IndexAdvisor) HeuristicCheck(q Query4Audit) map[string]Rule {
	var rule Rule
	heuristicSuggest := make(map[string]Rule)
	if common.Config.OnlineDSN.Disable && common.Config.TestDSN.Disable {
		return heuristicSuggest
	}

	ruleFuncs := []func(*IndexAdvisor) Rule{
L
liipx 已提交
1022
		(*IndexAdvisor).RuleMaxTextColsCount,   // COL.007
martianzhang's avatar
martianzhang 已提交
1023
		(*IndexAdvisor).RuleImplicitConversion, // ARG.003
L
liipx 已提交
1024 1025 1026
		(*IndexAdvisor).RuleGroupByConst,       // CLA.004
		(*IndexAdvisor).RuleOrderByConst,       // CLA.005
		(*IndexAdvisor).RuleUpdatePrimaryKey,   // CLA.016
martianzhang's avatar
martianzhang 已提交
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061
		// (*IndexAdvisor).RuleImpossibleOuterJoin, // TODO: JOI.003, JOI.004
	}

	for _, f := range ruleFuncs {
		rule = f(idxAdv)
		if rule.Item != "OK" {
			heuristicSuggest[rule.Item] = rule
		}
	}
	return heuristicSuggest
}

// DuplicateKeyChecker 对所有用到的库表检查是否存在重复索引
func DuplicateKeyChecker(conn *database.Connector, databases ...string) map[string]Rule {
	common.Log.Debug("Enter:  DuplicateKeyChecker, Caller: %s", common.Caller())
	// 复制一份online connector,防止环境切换影响其他功能的使用
	tmpOnline := *conn
	ruleMap := make(map[string]Rule)
	number := 1

	// 错误处理,用于汇总所有的错误
	funcErrCheck := func(err error) {
		if err != nil {
			if sug, ok := ruleMap["ERR.003"]; ok {
				sug.Content += fmt.Sprintf("; %s", err.Error())
			} else {
				ruleMap["ERR.003"] = Rule{
					Item:     "ERR.003",
					Severity: "L8",
					Content:  err.Error(),
				}
			}
		}
	}

1062
	// 不指定 DB 的时候检查 online dsn 中的 DB
martianzhang's avatar
martianzhang 已提交
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
	if len(databases) == 0 {
		databases = append(databases, tmpOnline.Database)
	}

	for _, db := range databases {
		// 获取所有的表
		tmpOnline.Database = db
		tables, err := tmpOnline.ShowTables()

		if err != nil {
			funcErrCheck(err)
			if !common.Config.DryRun {
				return ruleMap
			}
		}

		for _, tb := range tables {
			// 获取表中所有的索引
			idxMap := make(map[string][]*common.Column)
			idxInfo, err := tmpOnline.ShowIndex(tb)
			if err != nil {
				funcErrCheck(err)
				if !common.Config.DryRun {
					return ruleMap
				}
			}

			// 枚举所有的索引信息,提取用到的列
			for _, idx := range idxInfo.IdxRows {
				if _, ok := idxMap[idx.KeyName]; !ok {
					idxMap[idx.KeyName] = make([]*common.Column, 0)
					for _, col := range idxInfo.FindIndex(database.IndexKeyName, idx.KeyName) {
						idxMap[idx.KeyName] = append(idxMap[idx.KeyName], &common.Column{
							Name:  col.ColumnName,
							Table: tb,
							DB:    db,
						})
					}
				}
			}

			// 对索引进行重复检查
			hasDup := false
			content := ""

			for k1, cl1 := range idxMap {
				for k2, cl2 := range idxMap {
					if k1 != k2 && common.IsColsPart(cl1, cl2) {
						hasDup = true
						col1Str := common.JoinColumnsName(cl1, ", ")
						col2Str := common.JoinColumnsName(cl2, ", ")
						content += fmt.Sprintf("索引%s(%s)与%s(%s)重复;", k1, col1Str, k2, col2Str)
						common.Log.Debug(" %s.%s has duplicate index %s(%s) <--> %s(%s)", db, tb, k1, col1Str, k2, col2Str)
					}
				}
				delete(idxMap, k1)
			}

			// TODO 重复索引检查添加对约束及索引的判断,提供重复索引的删除功能
			if hasDup {
				tmpOnline.Database = db
				ddl, _ := tmpOnline.ShowCreateTable(tb)
				key := fmt.Sprintf("IDX.%03d", number)
				ruleMap[key] = Rule{
					Item:     key,
					Severity: "L2",
					Summary:  fmt.Sprintf("%s.%s存在重复的索引", db, tb),
					Content:  content,
					Case:     ddl,
				}
				number++
			}
		}
	}

	return ruleMap
}