提交 e3a1979b 编写于 作者: K kailixu

Merge branch '2.6' into feat/TS-1883-2.6

package main
import (
"container/heap"
"database/sql"
"database/sql/driver"
"flag"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
_ "github.com/taosdata/driver-go/v2/taosSql"
)
type heapElem struct {
timeout int64
colName string
}
type MinHeap []heapElem
type Column struct {
}
func (h MinHeap) Len() int {
return len(h)
}
func (h MinHeap) Less(i, j int) bool {
res := h[i].timeout - h[j].timeout
if res < 0 {
return true
} else if res > 0 {
return false
}
cmp := strings.Compare(h[i].colName, h[j].colName)
if cmp <= 0 {
return true
} else {
return false
}
}
func (h *MinHeap) Swap(i, j int) {
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
}
func (h *MinHeap) Push(x interface{}) {
*h = append(*h, x.(heapElem))
}
func (h *MinHeap) Empty() bool {
if len(*h) == 0 {
return true
}
return false
}
func (h *MinHeap) Top() heapElem {
return (*h)[0]
}
func (h *MinHeap) Pop() interface{} {
res := (*h)[len(*h)-1]
*h = (*h)[:len(*h)-1]
return res
}
type config struct {
hostName string
serverPort int
user string
password string
dbName string
srcdbName string
supTblName string
}
var configPara config
var taosDriverName = "taosSql"
var url string
func init() {
flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.")
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
flag.StringVar(&configPara.dbName, "d", "test1", "check database.")
flag.StringVar(&configPara.srcdbName, "s", "test", "Destination database.")
flag.Parse()
}
func checkErr(err error, prompt string) {
if err != nil {
fmt.Printf("%s\n", prompt)
panic(err)
}
}
type schema struct {
idx int
numOfField int
timestamp time.Time
colName string
interval int32
threshold int32
}
type demo struct {
db *sql.DB
dbname string
srcdbname string
metaTable string
exceptTable string
dStartTs int64
dInterval int32
dThreshold int32
suptabname string
metaDict map[string]*schema
heap MinHeap
timer *time.Timer
wg *sync.WaitGroup
}
/***
|ts |colName |interval |threshold|
|now |stbx.tx.colx|2 |5 |
|now+1|stbx.tx.colx|2 |5 |
|now+2|stbx.tx.colx|2 |5 |
***/
type taskInfo struct {
wg *sync.WaitGroup
subtask map[string]*demo
}
type tableInfo struct {
tbname string
createTime string
columns int
stbname string
uid int64
tid int64
vgId int32
}
func GetSubTableInfo(db *sql.DB, dbname, stbname string) []tableInfo {
tbs := make([]tableInfo, 0, 512)
sql := "show " + dbname + ".tables"
row, err := db.Query(sql)
if err != nil {
checkErr(err, sql)
}
for row.Next() {
var (
tbname string
createTime string
columns int
stb string
uid int64
tid int64
vgId int32
)
err := row.Scan(&tbname, &createTime, &columns, &stb, &uid, &tid, &vgId)
if err != nil {
checkErr(err, sql)
}
if len(stbname) == 0 {
// skip normal table
if len(stb) == 0 || strings.Compare(stb, "") == 0 {
continue
}
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stb, uid: uid, tid: tid, vgId: vgId})
continue
}
if strings.Compare(stb, stbname) == 0 {
tbs = append(tbs, tableInfo{tbname: tbname, createTime: createTime, columns: columns, stbname: stbname, uid: uid, tid: tid, vgId: vgId})
}
}
row.Close()
return tbs
}
func GetStableField(db *sql.DB, dbname, stbname string) []string {
result := make([]string, 0, 10)
sql := "describe " + dbname + "." + stbname
row, err := db.Query(sql)
if err != nil {
checkErr(err, sql)
}
count := 0
for row.Next() {
var field string
var ty string
var tlen int32
var note string
row.Scan(&field, &ty, &tlen, &note)
// ignore time and tag col
if count != 0 && strings.Compare(note, "TAG") != 0 {
// skip first and skip tag col
result = append(result, field)
}
count = count + 1
}
row.Close()
return result
}
func taskInit(db *sql.DB, dbname string, srcdbname string, metatable string, exptable string, tskinfo *taskInfo) {
{
sql := fmt.Sprintf("create database if not exists %s update 2", dbname)
_, err := db.Exec(sql)
checkErr(err, sql)
}
{
sql := fmt.Sprintf("create stable if not exists %s.%s (ts timestamp, dbname binary(64), tabname binary(64), colname binary(64), lastTime timestamp, offline int) tags(tablename binary(128))", dbname, exptable)
_, err := db.Exec(sql)
checkErr(err, sql)
}
{
sql := fmt.Sprintf("create table if not exists %s.%s (ts timestamp, dbname binary(64), tablename binary(64), colName binary(128), checkInterval int, threshold int)", dbname, metatable)
_, err := db.Exec(sql)
checkErr(err, sql)
}
tbs := GetSubTableInfo(db, srcdbname, "")
fmt.Printf("tbs size %d\n", len(tbs))
fieldDict := make(map[string][]string)
fieldTs := time.Now().Add(time.Hour * -1000)
for _, e := range tbs {
tbname := e.tbname
stbname := e.stbname
field, ok := fieldDict[stbname]
if !ok {
field = GetStableField(db, srcdbname, stbname)
fieldDict[stbname] = field
}
for _, f := range field {
insertTableInfoIntoMetatable := func(db *sql.DB, metaDB string, metaTable string, srcDB string, srcTable string, srcCol string, ts time.Time, interval, threshold int) {
sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", metaDB, metaTable, srcDB, srcTable, srcCol)
row, err := db.Query(sql)
if err != nil {
checkErr(err, sql)
}
count := 0
for row.Next() {
count = count + 1
if count >= 1 {
row.Close()
break
}
}
if count == 0 {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", metaDB, metaTable, ts.UnixMilli(), srcDB, srcTable, srcCol, interval, threshold)
_, err := db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
}
}
insertTableInfoIntoMetatable(db, dbname, metatable, srcdbname, tbname, f, fieldTs, 2, 2)
fieldTs = fieldTs.Add(time.Millisecond * 2)
}
key := fmt.Sprintf("%s_%s", srcdbname, stbname)
_, ok = tskinfo.subtask[key]
if !ok {
tskinfo.subtask[key] = &demo{db: db, dbname: dbname, srcdbname: srcdbname, suptabname: stbname, metaTable: metatable, exceptTable: exptable, wg: tskinfo.wg}
}
}
}
func subTaskStart(d *demo) {
d.Init()
for {
select {
case <-d.timer.C:
timeout := d.NextTimout()
fmt.Printf("stbname %s, timeout %d\n", d.suptabname, timeout)
d.timer.Reset(time.Second * time.Duration(timeout))
}
}
d.wg.Done()
}
func (d *demo) Init() {
d.heap = make(MinHeap, 0, 200)
heap.Init(&d.heap)
d.metaDict = make(map[string]*schema)
tbs := GetSubTableInfo(d.db, d.srcdbname, d.suptabname)
fields := GetStableField(d.db, d.srcdbname, d.suptabname)
lastRowDict := func(db *sql.DB, srcDB, stbname string) map[string]time.Time {
result := make(map[string]time.Time)
sql := fmt.Sprintf("select last_row(ts) from %s.%s group by tbname", srcDB, stbname)
row, err := d.db.Query(sql)
if err != nil {
checkErr(err, sql)
}
for row.Next() {
var ts time.Time
var tbname string
row.Scan(&ts, &tbname)
result[tbname] = ts
}
row.Close()
return result
}(d.db, d.srcdbname, d.suptabname)
for _, e := range tbs {
tbname := e.tbname
lastTime, ok := lastRowDict[tbname]
if !ok {
lastTime = time.Now()
}
for i, f := range fields {
col := fmt.Sprintf("%s %s", tbname, f)
var (
ts time.Time
dbname string
tablename string
colname string
checkinterval int
threshold int
)
checkSql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colname = \"%s\"", d.dbname, d.metaTable, d.srcdbname, tbname, f)
checkRow, err := d.db.Query(checkSql)
if err != nil {
checkErr(err, checkSql)
}
for checkRow.Next() {
_ = checkRow.Scan(&ts, &dbname, &tablename, &colname, &checkinterval, &threshold)
d.metaDict[col] = &schema{idx: i, numOfField: len(fields), timestamp: lastTime, colName: col, interval: int32(checkinterval), threshold: int32(threshold)}
}
defer checkRow.Close()
}
}
now := time.Now()
for k, v := range d.metaDict {
durtion := fmt.Sprintf("%ds", v.interval)
s, _ := time.ParseDuration(durtion)
now.Add(s)
heap.Push(&d.heap, heapElem{timeout: now.Unix(), colName: k})
}
d.timer = time.NewTimer(time.Second * 1)
}
type ValueRows struct {
column []interface{}
ts time.Time
tbname string
}
func (d *demo) Update(stbname, tbname, col string, interval int32, threshold int32) {
key := fmt.Sprintf("%s %s", tbname, col)
sql := fmt.Sprintf("select * from %s.%s where dbname = \"%s\" and tablename = \"%s\" and colName = \"%s\"", d.dbname, d.metaTable, d.dbname, tbname, col)
rows, _ := d.db.Query(sql)
fmt.Printf("check metatable %s, SQL: %s\n", d.metaTable, sql)
for rows.Next() {
var (
ts time.Time
dbname string
tbname string
col string
inter int32
thresh int32
)
err := rows.Scan(&ts, &dbname, &tbname, &col, &inter, &thresh)
if interval != inter || threshold != thresh {
sql := fmt.Sprintf("insert into %s.%s values(%v, \"%s\", \"%s\", \"%s\", %d, %d)", d.dbname, d.metaTable, ts.UnixMilli(), d.dbname, tbname, col, interval, threshold)
_, err = d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
}
}
schemadata := d.metaDict[key]
if schemadata != nil {
schemadata.interval = interval
schemadata.threshold = threshold
}
defer rows.Close()
}
func (d *demo) NextTimout() int32 {
now := time.Now().Unix()
colArray := make([]string, 0, 10)
for !d.heap.Empty() {
elem := d.heap.Top()
if elem.timeout <= now {
colArray = append(colArray, elem.colName)
heap.Pop(&d.heap)
} else {
break
}
}
lastRowGroup, colIdx := func(db *sql.DB, srcDB, stbname string) (map[string]*ValueRows, map[string]int) {
result := make(map[string]*ValueRows)
colIdx := make(map[string]int)
sql := fmt.Sprintf("select last_row(*) from %s.%s group by tbname", srcDB, stbname)
row, err := db.Query(sql)
if err != nil {
checkErr(err, sql)
}
tt, err := row.ColumnTypes()
types := make([]reflect.Type, len(tt))
for i, tp := range tt {
st := tp.ScanType()
types[i] = st
}
columns, _ := row.Columns()
for row.Next() {
values := make([]interface{}, len(tt))
for i := range values {
values[i] = reflect.New(types[i]).Interface()
}
row.Scan(values...)
ts, _ := values[0].(driver.Valuer).Value()
tts, _ := ts.(time.Time)
tbname, _ := values[len(tt)-1].(driver.Valuer).Value()
ttbname, _ := tbname.(string)
result[ttbname] = &ValueRows{column: values, ts: tts, tbname: ttbname}
}
row.Close()
for i, v := range columns {
colIdx[v] = i
}
return result, colIdx
}(d.db, d.srcdbname, d.suptabname)
for _, e := range colArray {
elem := d.metaDict[e]
var colName string
var tabName string
fmt.Sscanf(e, "%s %s", &tabName, &colName)
ts, update := func(rowGroup map[string]*ValueRows, colIdx map[string]int, tabName, colName string) (time.Time, bool) {
var ts time.Time
update := false
field := fmt.Sprintf("last_row(%s)", colName)
idx, ok1 := colIdx[field]
row, ok2 := rowGroup[tabName]
if ok1 && ok2 {
if row != nil {
v, _ := row.column[idx].(driver.Valuer).Value()
if v != nil {
ts = row.ts
update = true
}
}
}
return ts, update
}(lastRowGroup, colIdx, tabName, colName)
if !update {
ts = elem.timestamp
}
//fmt.Printf("Get time tbname: %s, colname:%s, current %v, lasttime %v\n", tabName, colName, ts.UnixMilli(), elem.timestamp.UnixMilli())
exceptTableName := fmt.Sprintf("%s_%s_%s", d.suptabname, tabName, colName)
var dura time.Duration = ts.Sub(elem.timestamp)
cost := int32(dura.Seconds())
if cost == 0 {
elem.timestamp = ts
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
} else {
elem.timestamp = ts
if cost > elem.threshold {
sql := fmt.Sprintf("insert into %s.%s using %s.%s tags(\"%s\") values(%v, \"%s\", \"%s\", \"%s\", %v, %d)", d.dbname, exceptTableName, d.dbname, d.exceptTable, exceptTableName, time.Now().UnixMilli(), d.srcdbname, tabName, colName, ts.UnixMilli(), int(time.Now().Sub(elem.timestamp).Seconds()))
fmt.Printf("INSERT SQL: %s\n", sql)
_, err := d.db.Exec(sql)
if err != nil {
checkErr(err, sql)
}
} else {
//fmt.Printf("C dura %d, threshold %d not insert \n", cost, elem.threshold)
}
}
heap.Push(&d.heap, heapElem{timeout: int64(elem.interval) + now, colName: e})
}
if !d.heap.Empty() {
elem := d.heap.Top()
timeout := elem.timeout - now
if timeout < 1 {
timeout = 1
}
return int32(timeout)
}
return 1
}
func printAllArgs() {
fmt.Printf("\n============= args parse result: =============\n")
fmt.Printf("hostName: %v\n", configPara.hostName)
fmt.Printf("serverPort: %v\n", configPara.serverPort)
fmt.Printf("usr: %v\n", configPara.user)
fmt.Printf("password: %v\n", configPara.password)
fmt.Printf("dbName: %v\n", configPara.dbName)
fmt.Printf("srcDbName: %v\n", configPara.srcdbName)
fmt.Printf("stbNme: %v\n", configPara.supTblName)
fmt.Printf("================================================\n")
}
func main() {
printAllArgs()
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
db, err := sql.Open(taosDriverName, url)
if err != nil {
checkErr(err, "failed to connect db")
}
wg := sync.WaitGroup{}
info := &taskInfo{subtask: make(map[string]*demo), wg: &wg}
taskInit(db, configPara.dbName, configPara.srcdbName, "metatable", "exptable", info)
for _, v := range info.subtask {
wg.Add(1)
go subTaskStart(v)
}
wg.Wait()
}
...@@ -2955,7 +2955,8 @@ int tscProcessQueryRsp(SSqlObj *pSql) { ...@@ -2955,7 +2955,8 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
SQueryTableRsp *pQueryAttr = (SQueryTableRsp *)pRes->pRsp; SQueryTableRsp *pQueryAttr = (SQueryTableRsp *)pRes->pRsp;
if (pQueryAttr == NULL) { if (pQueryAttr == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; tscError("0x%"PRIx64" invalid NULL query rsp received", pSql->self);
pRes->code = TSDB_CODE_QRY_APP_ERROR;
return pRes->code; return pRes->code;
} }
......
...@@ -69,6 +69,7 @@ extern char tsTempDir[]; ...@@ -69,6 +69,7 @@ extern char tsTempDir[];
extern int32_t tsShortcutFlag; extern int32_t tsShortcutFlag;
extern int32_t tsMaxSqlGroups; extern int32_t tsMaxSqlGroups;
extern int8_t tsSortWhenGroupBy; extern int8_t tsSortWhenGroupBy;
extern int32_t tsQueryRssThreshold;
// query buffer management // query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
......
...@@ -124,6 +124,9 @@ int32_t tsMaxSqlGroups = 1000000; ...@@ -124,6 +124,9 @@ int32_t tsMaxSqlGroups = 1000000;
// order by first group by column when group by // order by first group by column when group by
int8_t tsSortWhenGroupBy = 1; int8_t tsSortWhenGroupBy = 1;
// memory rss thresold for creating new query in MB. 0 means no threshold limitation
int32_t tsQueryRssThreshold = 0;
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
...@@ -1828,6 +1831,16 @@ static void doInitGlobalConfig(void) { ...@@ -1828,6 +1831,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "queryRssThreshold";
cfg.ptr = &tsQueryRssThreshold;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG;
cfg.minValue = 0;
cfg.maxValue = 2048*1024;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
#ifdef TD_TSZ #ifdef TD_TSZ
// lossy compress // lossy compress
cfg.option = "lossyColumns"; cfg.option = "lossyColumns";
......
...@@ -298,6 +298,7 @@ int32_t* taosGetErrno(); ...@@ -298,6 +298,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version") #define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
#define TSDB_CODE_QRY_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"result num is too large") #define TSDB_CODE_QRY_RESULT_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0711) //"result num is too large")
#define TSDB_CODE_QRY_RSS_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0712) //"query memory rss threshold")
// grant // grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired" #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
......
Subproject commit 64350feba73d584f920474d8abcd913491081897 Subproject commit e00ebd951fbe96c1fde346ae95359982f19a7604
...@@ -236,6 +236,41 @@ static int32_t mnodeChildTableActionDelete(SSdbRow *pRow) { ...@@ -236,6 +236,41 @@ static int32_t mnodeChildTableActionDelete(SSdbRow *pRow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void mnodeTableActionUpdateTimeSeries(const char *tableId, int64_t diffCols) {
if (diffCols > 0) {
grantAdd(TSDB_GRANT_TIMESERIES, diffCols);
} else if (diffCols < 0) {
grantRestore(TSDB_GRANT_TIMESERIES, -diffCols);
} else {
return;
}
if (tableId) {
SName sName = {0};
char db[TSDB_DB_NAME_LEN] = {0};
SDbObj *pDb = NULL;
SAcctObj *pAcct = NULL;
if (tNameFromString(&sName, tableId, T_NAME_ACCT | T_NAME_DB)) {
mWarn("table:%s, failed to get SName to update timeseries", tableId);
return;
}
if (tNameExtractFullName((const SName *)&sName, db)) {
mWarn("table:%s, failed to extract SName to update timeseries", tableId);
return;
}
if (!(pDb = mnodeGetDb(db))) {
mWarn("table:%s, db:%s not exist to update timeseries", tableId, db);
return;
};
if (!(pAcct = mnodeGetAcct(pDb->acct))) {
mWarn("table:%s, acct:%s not exist to update timeseries", tableId, pDb->acct);
return;
}
if (pAcct) pAcct->acctInfo.numOfTimeSeries += diffCols;
}
}
static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) { static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) {
SCTableObj *pNew = pRow->pObj; SCTableObj *pNew = pRow->pObj;
SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId); SCTableObj *pTable = mnodeGetChildTable(pNew->info.tableId);
...@@ -246,6 +281,10 @@ static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) { ...@@ -246,6 +281,10 @@ static int32_t mnodeChildTableActionUpdate(SSdbRow *pRow) {
void *oldSTable = pTable->superTable; void *oldSTable = pTable->superTable;
int32_t oldRefCount = pTable->refCount; int32_t oldRefCount = pTable->refCount;
if (pTable->info.type == TSDB_NORMAL_TABLE) {
mnodeTableActionUpdateTimeSeries(pTable->info.tableId, pNew->numOfColumns - pTable->numOfColumns);
}
memcpy(pTable, pNew, sizeof(SCTableObj)); memcpy(pTable, pNew, sizeof(SCTableObj));
pTable->refCount = oldRefCount; pTable->refCount = oldRefCount;
...@@ -528,6 +567,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) { ...@@ -528,6 +567,9 @@ static int32_t mnodeSuperTableActionUpdate(SSdbRow *pRow) {
int32_t oldRefCount = pTable->refCount; int32_t oldRefCount = pTable->refCount;
int32_t oldNumOfTables = pTable->numOfTables; int32_t oldNumOfTables = pTable->numOfTables;
mnodeTableActionUpdateTimeSeries(pTable->info.tableId,
(int64_t)oldNumOfTables * (pNew->numOfColumns - pTable->numOfColumns));
memcpy(pTable, pNew, sizeof(SSTableObj)); memcpy(pTable, pNew, sizeof(SSTableObj));
pTable->vgHash = oldVgHash; pTable->vgHash = oldVgHash;
...@@ -1464,6 +1506,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 ...@@ -1464,6 +1506,7 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables);
mnodeDecAcctRef(pAcct); mnodeDecAcctRef(pAcct);
} }
grantAdd(TSDB_GRANT_TIMESERIES, (uint64_t)ncols * pStable->numOfTables);
mInfo("msg:%p, app:%p stable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId); mInfo("msg:%p, app:%p stable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
...@@ -1514,6 +1557,7 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -1514,6 +1557,7 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables; pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables;
mnodeDecAcctRef(pAcct); mnodeDecAcctRef(pAcct);
} }
grantRestore(TSDB_GRANT_TIMESERIES, pStable->numOfTables);
mInfo("msg:%p, app:%p stable %s, start to delete column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId); mInfo("msg:%p, app:%p stable %s, start to delete column", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId);
...@@ -2471,6 +2515,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -2471,6 +2515,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
pAcct->acctInfo.numOfTimeSeries += ncols; pAcct->acctInfo.numOfTimeSeries += ncols;
mnodeDecAcctRef(pAcct); mnodeDecAcctRef(pAcct);
} }
grantAdd(TSDB_GRANT_TIMESERIES, ncols);
mInfo("msg:%p, app:%p ctable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId); mInfo("msg:%p, app:%p ctable %s, start to add column", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId);
monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); monSaveAuditLog(MON_DDL_CMD_ADD_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
...@@ -2506,6 +2551,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -2506,6 +2551,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
pAcct->acctInfo.numOfTimeSeries--; pAcct->acctInfo.numOfTimeSeries--;
mnodeDecAcctRef(pAcct); mnodeDecAcctRef(pAcct);
} }
grantRestore(TSDB_GRANT_TIMESERIES, 1);
mInfo("msg:%p, app:%p ctable %s, start to drop column %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, colName); mInfo("msg:%p, app:%p ctable %s, start to drop column %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId, colName);
monSaveAuditLog(MON_DDL_CMD_DROP_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true); monSaveAuditLog(MON_DDL_CMD_DROP_COLUMN, mnodeGetUserFromMsg(pMsg), pTable->info.tableId, true);
......
...@@ -77,6 +77,15 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -77,6 +77,15 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _over; goto _over;
} }
float procMemory = 0;
if (taosGetProcMemory(&procMemory)) {
if (tsQueryRssThreshold > 0 && procMemory >= tsQueryRssThreshold) {
qError("Exceeds query memory RSS threshold. RSS: %f, threshold: %d", procMemory, tsQueryRssThreshold);
code = TSDB_CODE_QRY_RSS_THRESHOLD;
goto _over;
}
}
if (pQueryMsg->numOfTables <= 0) { if (pQueryMsg->numOfTables <= 0) {
qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables); qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
......
...@@ -627,7 +627,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) { ...@@ -627,7 +627,7 @@ static int32_t lazyLoadCacheLast(STsdbQueryHandle* pQueryHandle) {
if (!pQueryHandle->pTableCheckInfo) { if (!pQueryHandle->pTableCheckInfo) {
tsdbError("%p table check info is NULL", pQueryHandle); tsdbError("%p table check info is NULL", pQueryHandle);
terrno = TSDB_CODE_QRY_APP_ERROR; // terrno = TSDB_CODE_QRY_APP_ERROR;
return -1; return -1;
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#define TSDB_CFG_MAX_NUM 141 #define TSDB_CFG_MAX_NUM 142
#define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41 #define TSDB_CFG_VALUE_LEN 41
......
...@@ -304,6 +304,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in ...@@ -304,6 +304,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RESULT_TOO_LARGE, "result num is too large") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RESULT_TOO_LARGE, "result num is too large")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RSS_THRESHOLD, "Exceed Memory RSS thresold when creating query")
// grant // grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册