diff --git a/examples/go/demo.go b/examples/go/demo.go new file mode 100644 index 0000000000000000000000000000000000000000..d29d3d736b238b1351e5b15986d1544a4e2110c6 --- /dev/null +++ b/examples/go/demo.go @@ -0,0 +1,573 @@ +package main + +import ( + "container/heap" + "database/sql" + "database/sql/driver" + "flag" + "fmt" + "reflect" + "strconv" + "strings" + "sync" + "time" + + _ "github.com/taosdata/driver-go/v2/taosSql" +) + +type heapElem struct { + timeout int64 + colName string +} + +type MinHeap []heapElem + +type Column struct { +} + +func (h MinHeap) Len() int { + return len(h) +} + +func (h MinHeap) Less(i, j int) bool { + res := h[i].timeout - h[j].timeout + if res < 0 { + return true + } else if res > 0 { + return false + } + + cmp := strings.Compare(h[i].colName, h[j].colName) + if cmp <= 0 { + return true + } else { + return false + } +} + +func (h *MinHeap) Swap(i, j int) { + (*h)[i], (*h)[j] = (*h)[j], (*h)[i] +} + +func (h *MinHeap) Push(x interface{}) { + *h = append(*h, x.(heapElem)) +} + +func (h *MinHeap) Empty() bool { + if len(*h) == 0 { + return true + } + return false +} + +func (h *MinHeap) Top() heapElem { + return (*h)[0] +} + +func (h *MinHeap) Pop() interface{} { + res := (*h)[len(*h)-1] + *h = (*h)[:len(*h)-1] + return res +} + +type config struct { + hostName string + serverPort int + user string + password string + dbName string + srcdbName string + supTblName string +} + +var configPara config +var taosDriverName = "taosSql" +var url string + +func init() { + flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.") + flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.") + flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.") + flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.") + flag.StringVar(&configPara.dbName, "d", "test1", "check database.") + flag.StringVar(&configPara.srcdbName, "s", "test", "Destination database.") + flag.Parse() + +} + +func checkErr(err error, prompt string) { + if err != nil { + fmt.Printf("%s\n", prompt) + panic(err) + } +} + +type schema struct { + idx int + numOfField int + timestamp time.Time + colName string + interval int32 + threshold int32 +} +type demo struct { + db *sql.DB + + dbname string + srcdbname string + metaTable string + exceptTable string + + dStartTs int64 + dInterval int32 + dThreshold int32 + + suptabname string + + metaDict map[string]*schema + heap MinHeap + timer *time.Timer + + wg *sync.WaitGroup +} + +/*** + +|ts |colName |interval |threshold| +|now |stbx.tx.colx|2 |5 | +|now+1|stbx.tx.colx|2 |5 | +|now+2|stbx.tx.colx|2 |5 | + +***/ + +type taskInfo struct { + wg *sync.WaitGroup + subtask map[string]*demo +} + +type tableInfo struct { + tbname string + createTime string + columns int + stbname string + uid int64 + tid int64 + vgId int32 +} + +func GetSubTableInfo(db *sql.DB, dbname, stbname string) []tableInfo { + tbs := make([]tableInfo, 0, 512) + sql := "show " + dbname + ".tables" + + row, err := db.Query(sql) + if err != nil { + checkErr(err, sql) + } + + for row.Next() { + var ( + tbname string + createTime string + columns int + stb string + uid int64 + tid int64 + vgId int32 + ) + err := row.Scan(&tbname, &createTime, &columns, &stb, &uid, &tid, &vgId) + if err != nil { + checkErr(err, sql) + } + + if len(stbname) == 0 { + // skip normal table + if len(stb) == 0 || strings.Compare(stb, "") == 0 { + continue + } + tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stb, uid: uid, tid: tid, vgId: vgId}) + continue + } + + if strings.Compare(stb, stbname) == 0 { + tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId}) + } + } + row.Close() + return tbs + +} +func GetStableField(db *sql.DB, dbname, stbname string) []string { + result := make([]string, 0, 10) + sql := "describe " + dbname + "." + stbname + row, err := db.Query(sql) + if err != nil { + checkErr(err, sql) + } + count := 0 + for row.Next() { + var field string + var ty string + var tlen int32 + var note string + row.Scan(&field, &ty, &tlen, ¬e) + + // ignore time and tag col + if count != 0 && strings.Compare(note, "TAG") != 0 { + // skip first and skip tag col + result = append(result, field) + } + count = count + 1 + } + row.Close() + return result +} + +func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) { + { + sql := fmt.Sprintf("create database if not exists %s update 2", dbname) + _, err := db.Exec(sql) + checkErr(err, sql) + } + { + sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable) + _, err := db.Exec(sql) + checkErr(err, sql) + } + + { + sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable) + _, err := db.Exec(sql) + checkErr(err, sql) + } + + tbs := GetSubTableInfo(db, srcdbname, "") + fmt.Printf("tbs size %d\n", len(tbs)) + + fieldDict := make(map[string][]string) + + fieldTs := time.Now().Add(time.Hour * -1000) + for _, e := range tbs { + tbname := e.tbname + stbname := e.stbname + + field, ok := fieldDict[stbname] + if !ok { + field = GetStableField(db, srcdbname, stbname) + fieldDict[stbname] = field + } + + for _, f := range field { + insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) { + sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", metaDB, metaTable, srcDB, srcTable, srcCol) + row, err := db.Query(sql) + if err != nil { + checkErr(err, sql) + } + + count := 0 + for row.Next() { + count = count + 1 + if count >= 1 { + row.Close() + break + } + } + + if count == 0 { + sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold) + _, err := db.Exec(sql) + if err != nil { + checkErr(err, sql) + } + } + } + insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2) + fieldTs = fieldTs.Add(time.Millisecond * 2) + } + + key := fmt.Sprintf("%s_%s", srcdbname, stbname) + _, ok = tskinfo.subtask[key] + if !ok { + tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg} + } + } +} + +func subTaskStart(d *demo) { + + d.Init() + for { + select { + case <-d.timer.C: + timeout := d.NextTimout() + fmt.Printf("stbname %s, timeout %d\n", d.suptabname, timeout) + d.timer.Reset(time.Second * time.Duration(timeout)) + } + } + d.wg.Done() +} + +func (d *demo) Init() { + d.heap = make(MinHeap, 0, 200) + heap.Init(&d.heap) + d.metaDict = make(map[string]*schema) + + tbs := GetSubTableInfo(d.db, d.srcdbname, d.suptabname) + fields := GetStableField(d.db, d.srcdbname, d.suptabname) + + lastRowDict := func(db *sql.DB, srcDB, stbname string) map[string]time.Time { + result := make(map[string]time.Time) + sql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", srcDB, stbname) + row, err := d.db.Query(sql) + if err != nil { + checkErr(err, sql) + } + for row.Next() { + var ts time.Time + var tbname string + row.Scan(&ts, &tbname) + result[tbname] = ts + } + row.Close() + return result + }(d.db, d.srcdbname, d.suptabname) + + for _, e := range tbs { + tbname := e.tbname + lastTime, ok := lastRowDict[tbname] + if !ok { + lastTime = time.Now() + } + + for i, f := range fields { + col := fmt.Sprintf("%s %s", tbname, f) + var ( + ts time.Time + dbname string + tablename string + colname string + checkinterval int + threshold int + ) + checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f) + checkRow, err := d.db.Query(checkSql) + if err != nil { + checkErr(err, checkSql) + } + + for checkRow.Next() { + _ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold) + d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)} + } + defer checkRow.Close() + } + } + + now := time.Now() + for k, v := range d.metaDict { + durtion := fmt.Sprintf("%ds", v.interval) + s, _ := time.ParseDuration(durtion) + now.Add(s) + heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k}) + } + + d.timer = time.NewTimer(time.Second * 1) +} + +type ValueRows struct { + column []interface{} + ts time.Time + tbname string +} + +func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int32) { + key := fmt.Sprintf("%s %s", tbname, col) + sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col) + rows, _ := d.db.Query(sql) + fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql) + for rows.Next() { + var ( + ts time.Time + dbname string + tbname string + col string + inter int32 + thresh int32 + ) + + err := rows.Scan(&ts, &dbname, &tbname, &col, &inter, &thresh) + if interval != inter || threshold != thresh { + sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, ts.UnixMilli(), d.dbname, tbname, col, interval, threshold) + _, err = d.db.Exec(sql) + if err != nil { + checkErr(err, sql) + } + } + + } + + schemadata := d.metaDict[key] + if schemadata != nil { + schemadata.interval = interval + schemadata.threshold = threshold + } + defer rows.Close() +} + +func (d *demo) NextTimout() int32 { + now := time.Now().Unix() + colArray := make([]string, 0, 10) + + for !d.heap.Empty() { + elem := d.heap.Top() + if elem.timeout <= now { + colArray = append(colArray, elem.colName) + heap.Pop(&d.heap) + } else { + break + } + } + + lastRowGroup, colIdx := func(db *sql.DB, srcDB, stbname string) (map[string]*ValueRows, map[string]int) { + result := make(map[string]*ValueRows) + colIdx := make(map[string]int) + + sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", srcDB, stbname) + row, err := db.Query(sql) + if err != nil { + checkErr(err, sql) + } + tt, err := row.ColumnTypes() + types := make([]reflect.Type, len(tt)) + for i, tp := range tt { + st := tp.ScanType() + types[i] = st + } + columns, _ := row.Columns() + + for row.Next() { + values := make([]interface{}, len(tt)) + for i := range values { + values[i] = reflect.New(types[i]).Interface() + } + row.Scan(values...) + + ts, _ := values[0].(driver.Valuer).Value() + tts, _ := ts.(time.Time) + + tbname, _ := values[len(tt)-1].(driver.Valuer).Value() + ttbname, _ := tbname.(string) + + result[ttbname] = &ValueRows{column: values, ts: tts, tbname: ttbname} + } + + row.Close() + + for i, v := range columns { + colIdx[v] = i + } + return result, colIdx + }(d.db, d.srcdbname, d.suptabname) + + for _, e := range colArray { + elem := d.metaDict[e] + var colName string + var tabName string + fmt.Sscanf(e, "%s %s", &tabName, &colName) + + ts, update := func(rowGroup map[string]*ValueRows, colIdx map[string]int, tabName, colName string) (time.Time, bool) { + var ts time.Time + update := false + + field := fmt.Sprintf("last_row(%s)", colName) + idx, ok1 := colIdx[field] + row, ok2 := rowGroup[tabName] + if ok1 && ok2 { + if row != nil { + v, _ := row.column[idx].(driver.Valuer).Value() + if v != nil { + ts = row.ts + update = true + } + } + } + return ts, update + }(lastRowGroup, colIdx, tabName, colName) + + if !update { + ts = elem.timestamp + } + + //fmt.Printf("Get time tbname: %s, colname:%s, current %v, lasttime %v\n", tabName, colName, ts.UnixMilli(), elem.timestamp.UnixMilli()) + exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName) + + var dura time.Duration = ts.Sub(elem.timestamp) + cost := int32(dura.Seconds()) + if cost == 0 { + elem.timestamp = ts + sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + fmt.Printf("INSERT SQL: %s\n", sql) + _, err := d.db.Exec(sql) + if err != nil { + checkErr(err, sql) + } + } else { + elem.timestamp = ts + + if cost > elem.threshold { + sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds())) + fmt.Printf("INSERT SQL: %s\n", sql) + _, err := d.db.Exec(sql) + if err != nil { + checkErr(err, sql) + } + } else { + //fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold) + } + } + heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e}) + } + + if !d.heap.Empty() { + elem := d.heap.Top() + timeout := elem.timeout - now + if timeout < 1 { + timeout = 1 + } + return int32(timeout) + } + return 1 + +} + +func printAllArgs() { + fmt.Printf("\n============= args parse result: =============\n") + fmt.Printf("hostName: %v\n", configPara.hostName) + fmt.Printf("serverPort: %v\n", configPara.serverPort) + fmt.Printf("usr: %v\n", configPara.user) + fmt.Printf("password: %v\n", configPara.password) + fmt.Printf("dbName: %v\n", configPara.dbName) + fmt.Printf("srcDbName: %v\n", configPara.srcdbName) + fmt.Printf("stbNme: %v\n", configPara.supTblName) + fmt.Printf("================================================\n") +} + +func main() { + + printAllArgs() + url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/" + + db, err := sql.Open(taosDriverName, url) + if err != nil { + checkErr(err, "failed to connect db") + } + wg := sync.WaitGroup{} + info := &taskInfo{subtask: make(map[string]*demo), wg: &wg} + + taskInit(db, configPara.dbName, configPara.srcdbName, "metatable", "exptable", info) + for _, v := range info.subtask { + wg.Add(1) + go subTaskStart(v) + } + wg.Wait() +} diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 933334a5c8e61fefe27390f780bccfa757749d11..70fd60f1c9d744c7f2ea30db9b6eac49d53f6d7c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2955,7 +2955,8 @@ int tscProcessQueryRsp(SSqlObj *pSql) { SQueryTableRsp *pQueryAttr = (SQueryTableRsp *)pRes->pRsp; if (pQueryAttr == NULL) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; + tscError("0x%"PRIx64" invalid NULL query rsp received", pSql->self); + pRes->code = TSDB_CODE_QRY_APP_ERROR; return pRes->code; } diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index bcc0d68f95cbd9ec884df5e87b3ec10bd5463883..fc014df6359116b55e156cf738d82957c3d8845b 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -69,6 +69,7 @@ extern char tsTempDir[]; extern int32_t tsShortcutFlag; extern int32_t tsMaxSqlGroups; extern int8_t tsSortWhenGroupBy; +extern int32_t tsQueryRssThreshold; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index e49ed4caa54a73e79bbb2ee745dfb4a4b96360c5..58f144c5871ff7ec6b32095d05d06f92fc887dc8 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -124,6 +124,9 @@ int32_t tsMaxSqlGroups = 1000000; // order by first group by column when group by int8_t tsSortWhenGroupBy = 1; +// memory rss thresold for creating new query in MB. 0 means no threshold limitation +int32_t tsQueryRssThreshold = 0; + int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance @@ -1828,6 +1831,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + cfg.option = "queryRssThreshold"; + cfg.ptr = &tsQueryRssThreshold; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG; + cfg.minValue = 0; + cfg.maxValue = 2048*1024; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index fc387d331b07fcf407ade3305d1874ef2b5c8c31..40f1c3096ea025fe7ec16b3696eab6248644bb50 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -298,6 +298,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition") #define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version") #define TSDB_CODE_QRY_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"result num is too large") +#define TSDB_CODE_QRY_RSS_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0712) //"query memory rss threshold") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired" diff --git a/src/kit/taos-tools b/src/kit/taos-tools index 64350feba73d584f920474d8abcd913491081897..e00ebd951fbe96c1fde346ae95359982f19a7604 160000 --- a/src/kit/taos-tools +++ b/src/kit/taos-tools @@ -1 +1 @@ -Subproject commit 64350feba73d584f920474d8abcd913491081897 +Subproject commit e00ebd951fbe96c1fde346ae95359982f19a7604 diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 6adb3e26ed30f3436ca00ecaddea50e6c0b36361..424e995c04a7714a9328c5097fe76553fe02d13e 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -236,6 +236,41 @@ static int32_t mnodeChildTableActionDelete(SSdbRow *pRow) { return TSDB_CODE_SUCCESS; } +static void mnodeTableActionUpdateTimeSeries(const char *tableId, int64_t diffCols) { + if (diffCols > 0) { + grantAdd(TSDB_GRANT_TIMESERIES, diffCols); + } else if (diffCols < 0) { + grantRestore(TSDB_GRANT_TIMESERIES, -diffCols); + } else { + return; + } + + if (tableId) { + SName sName = {0}; + char db[TSDB_DB_NAME_LEN] = {0}; + SDbObj *pDb = NULL; + SAcctObj *pAcct = NULL; + + if (tNameFromString(&sName, tableId, T_NAME_ACCT | T_NAME_DB)) { + mWarn("table:%s, failed to get SName to update timeseries", tableId); + return; + } + if (tNameExtractFullName((const SName *)&sName, db)) { + mWarn("table:%s, failed to extract SName to update timeseries", tableId); + return; + } + if (!(pDb = mnodeGetDb(db))) { + mWarn("table:%s, db:%s not exist to update timeseries", tableId, db); + return; + }; + if (!(pAcct = mnodeGetAcct(pDb->acct))) { + mWarn("table:%s, acct:%s not exist to update timeseries", tableId, pDb->acct); + return; + } + if (pAcct) pAcct->acctInfo.numOfTimeSeries += diffCols; + } +} + static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) { SCTableObj *pNew = pRow->pObj; SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId); @@ -246,6 +281,10 @@ static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) { void *oldSTable = pTable->superTable; int32_t oldRefCount = pTable->refCount; + if (pTable->info.type == TSDB_NORMAL_TABLE) { + mnodeTableActionUpdateTimeSeries(pTable->info.tableId, pNew->numOfColumns - pTable->numOfColumns); + } + memcpy(pTable, pNew, sizeof(SCTableObj)); pTable->refCount = oldRefCount; @@ -528,6 +567,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) { int32_t oldRefCount = pTable->refCount; int32_t oldNumOfTables = pTable->numOfTables; + mnodeTableActionUpdateTimeSeries(pTable->info.tableId, + (int64_t)oldNumOfTables * (pNew->numOfColumns - pTable->numOfColumns)); + memcpy(pTable, pNew, sizeof(SSTableObj)); pTable->vgHash = oldVgHash; @@ -1464,6 +1506,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); mnodeDecAcctRef(pAcct); } + grantAdd(TSDB_GRANT_TIMESERIES, (uint64_t)ncols * pStable->numOfTables); mInfo("msg:%p, app:%p stable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId); @@ -1514,6 +1557,7 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables; mnodeDecAcctRef(pAcct); } + grantRestore(TSDB_GRANT_TIMESERIES, pStable->numOfTables); mInfo("msg:%p, app:%p stable %s, start to delete column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId); @@ -2471,6 +2515,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 pAcct->acctInfo.numOfTimeSeries += ncols; mnodeDecAcctRef(pAcct); } + grantAdd(TSDB_GRANT_TIMESERIES, ncols); mInfo("msg:%p, app:%p ctable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId); monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); @@ -2506,6 +2551,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { pAcct->acctInfo.numOfTimeSeries--; mnodeDecAcctRef(pAcct); } + grantRestore(TSDB_GRANT_TIMESERIES, 1); mInfo("msg:%p, app:%p ctable %s, start to drop column %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, colName); monSaveAuditLog(MON_DDL_CMD_DROP_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index cb89649c3cc93f00021bd9a1c747d44b5e699a23..ed63d0f65c7462adcd2673b7585fe890c52afee6 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -77,6 +77,15 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi if (code != TSDB_CODE_SUCCESS) { goto _over; } + + float procMemory = 0; + if (taosGetProcMemory(&procMemory)) { + if (tsQueryRssThreshold > 0 && procMemory >= tsQueryRssThreshold) { + qError("Exceeds query memory RSS threshold. RSS: %f, threshold: %d", procMemory, tsQueryRssThreshold); + code = TSDB_CODE_QRY_RSS_THRESHOLD; + goto _over; + } + } if (pQueryMsg->numOfTables <= 0) { qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 9924706f368dff0b3b32e6d6d7df6ee20e693686..fdafd6a15fdd0ac2ea3006d799b29707ffc1ef3b 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -627,7 +627,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) { if (!pQueryHandle->pTableCheckInfo) { tsdbError("%p table check info is NULL", pQueryHandle); - terrno = TSDB_CODE_QRY_APP_ERROR; +// terrno = TSDB_CODE_QRY_APP_ERROR; return -1; } diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index d6f4c79ddd8c0d22149eeead5ef23887aae69b04..78e3a782bb226d2cbbaf4bc40579d7fed076e327 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 141 +#define TSDB_CFG_MAX_NUM 142 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 diff --git a/src/util/src/terror.c b/src/util/src/terror.c index f66152988ee31543cd559c050fba82d3353aa9fa..c217d97b54586e6e63ef7b447e62290d350ead33 100644 --- a/src/util/src/terror.c +++ b/src/util/src/terror.c @@ -304,6 +304,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RESULT_TOO_LARGE, "result num is too large") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RSS_THRESHOLD, "Exceed Memory RSS thresold when creating query") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")