提交 a4da89b3 编写于 作者: R root

Merge remote-tracking branch 'origin/develop' into feature/mqtt

...@@ -47,6 +47,8 @@ Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable ...@@ -47,6 +47,8 @@ Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
因为TDengine具有很好的水平扩展能力,根据总量,再根据单个物理机或虚拟机的资源,就可以轻松决定需要购置多少台物理机或虚拟机了。 因为TDengine具有很好的水平扩展能力,根据总量,再根据单个物理机或虚拟机的资源,就可以轻松决定需要购置多少台物理机或虚拟机了。
具体计算公式,请参见页面:<a href='https://www.taosdata.com/config/config.html'>资源估算方法</a>
## 容错和灾备 ## 容错和灾备
### 容错 ### 容错
......
...@@ -399,7 +399,7 @@ int tsParseSql(SSqlObj *pSql, bool initial); ...@@ -399,7 +399,7 @@ int tsParseSql(SSqlObj *pSql, bool initial);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet); void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
int tscRenewTableMeta(SSqlObj *pSql, char *tableId); int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex);
void tscQueueAsyncRes(SSqlObj *pSql); void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param, int32_t code); void tscQueueAsyncError(void(*fp), void *param, int32_t code);
...@@ -414,7 +414,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo); ...@@ -414,7 +414,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes); void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscResetSqlCmdObj(SSqlCmd *pCmd); void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
/** /**
* free query result of the sql object * free query result of the sql object
......
...@@ -468,7 +468,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -468,7 +468,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) { if (pCmd->command == TSDB_SQL_INSERT || pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql); tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false; pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd); tscResetSqlCmdObj(pCmd, false);
code = tsParseSql(pSql, true); code = tsParseSql(pSql, true);
......
...@@ -1327,18 +1327,40 @@ int tsParseSql(SSqlObj *pSql, bool initial) { ...@@ -1327,18 +1327,40 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
pSql->fetchFp = pSql->fp; pSql->fetchFp = pSql->fp;
pSql->fp = (void(*)())tscHandleMultivnodeInsert; pSql->fp = (void(*)())tscHandleMultivnodeInsert;
} }
if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) { if (initial && ((ret = tsInsertInitialCheck(pSql)) != TSDB_CODE_SUCCESS)) {
return ret; return ret;
} }
// make a backup as tsParseInsertSql may modify the string
char* sqlstr = strdup(pSql->sqlstr);
ret = tsParseInsertSql(pSql); ret = tsParseInsertSql(pSql);
if (sqlstr == NULL || pSql->retry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
free(sqlstr);
} else {
tscResetSqlCmdObj(pCmd, true);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->retry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
ret = tsParseInsertSql(pSql);
}
}
} else { } else {
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr); SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo); ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->retry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
tscResetSqlCmdObj(pCmd, true);
pSql->retry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
}
SQLInfoDestroy(&SQLInfo); SQLInfoDestroy(&SQLInfo);
} }
if (ret == TSDB_CODE_SUCCESS) {
pSql->retry = 0;
}
/* /*
* the pRes->code may be modified or released by another thread in tscTableMetaCallBack function, * the pRes->code may be modified or released by another thread in tscTableMetaCallBack function,
* so do NOT use pRes->code to determine if the getTableMeta function * so do NOT use pRes->code to determine if the getTableMeta function
......
...@@ -276,8 +276,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -276,8 +276,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
} }
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
int32_t cmd = pCmd->command; int32_t cmd = pCmd->command;
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_FETCH || cmd == TSDB_SQL_INSERT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || (rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID ||
...@@ -302,7 +300,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -302,7 +300,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
taosMsleep(duration); taosMsleep(duration);
} }
rpcMsg->code = tscRenewTableMeta(pSql, pTableMetaInfo->name); rpcMsg->code = tscRenewTableMeta(pSql, 0);
// if there is an error occurring, proceed to the following error handling procedure. // if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
...@@ -2202,14 +2200,14 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create ...@@ -2202,14 +2200,14 @@ int tscGetMeterMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool create
/** /**
* retrieve table meta from mnode, and update the local table meta cache. * retrieve table meta from mnode, and update the local table meta cache.
* @param pSql sql object * @param pSql sql object
* @param tableId table full name * @param tableIndex table index
* @return status code * @return status code
*/ */
int tscRenewTableMeta(SSqlObj *pSql, char *tableId) { int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMetaInfo->pTableMeta) { if (pTableMetaInfo->pTableMeta) {
......
...@@ -820,7 +820,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -820,7 +820,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) { static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object // must before clean the sqlcmd object
tscResetSqlCmdObj(&pSql->cmd); tscResetSqlCmdObj(&pSql->cmd, false);
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); static void freeQueryInfoImpl(SQueryInfo* pQueryInfo);
static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache); static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) { SCond* tsGetSTableQueryCond(STagCond* pTagCond, uint64_t uid) {
if (pTagCond->pCond == NULL) { if (pTagCond->pCond == NULL) {
return NULL; return NULL;
} }
...@@ -294,7 +294,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) { ...@@ -294,7 +294,7 @@ void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
} }
static void tscFreeQueryInfo(SSqlCmd* pCmd) { static void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeFromCache) {
if (pCmd == NULL || pCmd->numOfClause == 0) { if (pCmd == NULL || pCmd->numOfClause == 0) {
return; return;
} }
...@@ -304,7 +304,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) { ...@@ -304,7 +304,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i);
freeQueryInfoImpl(pQueryInfo); freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, (const char*)addr, false); clearAllTableMetaInfo(pQueryInfo, (const char*)addr, removeFromCache);
taosTFree(pQueryInfo); taosTFree(pQueryInfo);
} }
...@@ -312,7 +312,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) { ...@@ -312,7 +312,7 @@ static void tscFreeQueryInfo(SSqlCmd* pCmd) {
taosTFree(pCmd->pQueryInfo); taosTFree(pCmd->pQueryInfo);
} }
void tscResetSqlCmdObj(SSqlCmd* pCmd) { void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) {
pCmd->command = 0; pCmd->command = 0;
pCmd->numOfCols = 0; pCmd->numOfCols = 0;
pCmd->count = 0; pCmd->count = 0;
...@@ -326,7 +326,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) { ...@@ -326,7 +326,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd); tscFreeQueryInfo(pCmd, removeFromCache);
} }
void tscFreeSqlResult(SSqlObj* pSql) { void tscFreeSqlResult(SSqlObj* pSql) {
...@@ -364,7 +364,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { ...@@ -364,7 +364,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
taosTFree(pSql->pSubs); taosTFree(pSql->pSubs);
pSql->numOfSubs = 0; pSql->numOfSubs = 0;
tscResetSqlCmdObj(pCmd); tscResetSqlCmdObj(pCmd, false);
} }
void tscFreeSqlObj(SSqlObj* pSql) { void tscFreeSqlObj(SSqlObj* pSql) {
......
...@@ -1951,36 +1951,36 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -1951,36 +1951,36 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
// todo handle the case the the order irrelevant query type mixed up with order critical query type // todo handle the case the the order irrelevant query type mixed up with order critical query type
// descending order query for last_row query // descending order query for last_row query
if (isFirstLastRowQuery(pQuery)) { if (isFirstLastRowQuery(pQuery) && !QUERY_IS_ASC_QUERY(pQuery)) {
qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery), qDebug("QInfo:%p scan order changed for last_row query, old:%d, new:%d", GET_QINFO_ADDR(pQuery),
pQuery->order.order, TSDB_ORDER_ASC); pQuery->order.order, TSDB_ORDER_ASC);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
pQuery->order.order = TSDB_ORDER_ASC; pQuery->order.order = TSDB_ORDER_ASC;
if (pQuery->window.skey > pQuery->window.ekey) { assert (pQuery->window.skey <= pQuery->window.ekey);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
doExchangeTimeWindow(pQInfo, &pQuery->window);
return; return;
} }
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) && pQuery->order.order == TSDB_ORDER_DESC) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) && !QUERY_IS_ASC_QUERY(pQuery)) {
pQuery->order.order = TSDB_ORDER_ASC; pQuery->order.order = TSDB_ORDER_ASC;
if (pQuery->window.skey > pQuery->window.ekey) { SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); assert (pQuery->window.skey <= pQuery->window.ekey);
}
doExchangeTimeWindow(pQInfo, &pQuery->window); doExchangeTimeWindow(pQInfo, &pQuery->window);
return; return;
} }
if (isPointInterpoQuery(pQuery) && pQuery->intervalTime == 0) { if (isPointInterpoQuery(pQuery) && (pQuery->intervalTime == 0) && !QUERY_IS_ASC_QUERY(pQuery)) {
if (!QUERY_IS_ASC_QUERY(pQuery)) { qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey,
qDebug(msg, GET_QINFO_ADDR(pQuery), "interp", pQuery->order.order, TSDB_ORDER_ASC, pQuery->window.skey, pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey); SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
}
pQuery->order.order = TSDB_ORDER_ASC; pQuery->order.order = TSDB_ORDER_ASC;
assert (pQuery->window.skey <= pQuery->window.ekey);
doExchangeTimeWindow(pQInfo, &pQuery->window);
return; return;
} }
...@@ -2920,11 +2920,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2920,11 +2920,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
STableQueryInfo *item = taosArrayGetP(pGroup, i); STableQueryInfo *item = taosArrayGetP(pGroup, i);
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid); SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
pageList = list;
tid = TSDB_TABLEID(item->pTable)->tid;
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) { if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables++] = item; pTableList[numOfTables++] = item;
tid = TSDB_TABLEID(item->pTable)->tid;
pageList = list;
} }
} }
...@@ -4354,6 +4354,32 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { ...@@ -4354,6 +4354,32 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
return true; return true;
} }
static void freeTableQueryInfo(STableGroupInfo* pTableGroupInfo) {
if (pTableGroupInfo->pGroupList == NULL) {
assert(pTableGroupInfo->numOfTables == 0);
} else {
size_t numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = taosArrayGetP(pTableGroupInfo->pGroupList, i);
size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(p, j);
destroyTableQueryInfo(item);
}
taosArrayDestroy(p);
}
taosArrayDestroy(pTableGroupInfo->pGroupList);
pTableGroupInfo->pGroupList = NULL;
pTableGroupInfo->numOfTables = 0;
}
taosHashCleanup(pTableGroupInfo->map);
pTableGroupInfo->map = NULL;
}
static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
...@@ -4389,20 +4415,22 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) ...@@ -4389,20 +4415,22 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (isFirstLastRowQuery(pQuery)) { if (isFirstLastRowQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
if (pRuntimeEnv->pQueryHandle == NULL) { // no data in current stable, clear all
freeTableQueryInfo(&pQInfo->tableqinfoGroupInfo);
} else { // update the query time window
pQuery->window = cond.twindow;
// update the query time window size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
pQuery->window = cond.twindow; for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *group = GET_TABLEGROUP(pQInfo, i);
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo); size_t t = taosArrayGetSize(group);
for(int32_t i = 0; i < numOfGroups; ++i) { for (int32_t j = 0; j < t; ++j) {
SArray *group = GET_TABLEGROUP(pQInfo, i); STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
size_t t = taosArrayGetSize(group);
for (int32_t j = 0; j < t; ++j) {
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
pCheckInfo->win = pQuery->window; pCheckInfo->win = pQuery->window;
pCheckInfo->lastKey = pCheckInfo->win.skey; pCheckInfo->lastKey = pCheckInfo->win.skey;
}
} }
} }
} else if (isPointInterpoQuery(pQuery)) { } else if (isPointInterpoQuery(pQuery)) {
...@@ -4456,6 +4484,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo ...@@ -4456,6 +4484,12 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
return code; return code;
} }
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
setQueryStatus(pQuery, QUERY_COMPLETED);
return TSDB_CODE_SUCCESS;
}
pQInfo->tsdb = tsdb; pQInfo->tsdb = tsdb;
pQInfo->vgId = vgId; pQInfo->vgId = vgId;
...@@ -6349,29 +6383,13 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -6349,29 +6383,13 @@ static void freeQInfo(SQInfo *pQInfo) {
taosTFree(pQuery); taosTFree(pQuery);
} }
// todo refactor, extract method to destroytableDataInfo freeTableQueryInfo(&pQInfo->tableqinfoGroupInfo);
if (pQInfo->tableqinfoGroupInfo.pGroupList != NULL) {
int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pQInfo));
for (int32_t i = 0; i < numOfGroups; ++i) {
SArray *p = GET_TABLEGROUP(pQInfo, i);
size_t num = taosArrayGetSize(p);
for(int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(p, j);
destroyTableQueryInfo(item);
}
taosArrayDestroy(p);
}
}
taosTFree(pQInfo->pBuf); taosTFree(pQInfo->pBuf);
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);
taosHashCleanup(pQInfo->tableqinfoGroupInfo.map);
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
taosArrayDestroy(pQInfo->arrTableIdInfo); taosArrayDestroy(pQInfo->arrTableIdInfo);
pQInfo->signature = 0; pQInfo->signature = 0;
qDebug("QInfo:%p QInfo is freed", pQInfo); qDebug("QInfo:%p QInfo is freed", pQInfo);
......
...@@ -154,9 +154,14 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { ...@@ -154,9 +154,14 @@ int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) {
// todo refactor to more generic // todo refactor to more generic
int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
int32_t v = *(int32_t *)value; int32_t v = 0;
int32_t index = -1; switch(pBucket->type) {
case TSDB_DATA_TYPE_SMALLINT: v = *(int16_t*) value; break;
case TSDB_DATA_TYPE_TINYINT: v = *(int8_t*) value; break;
default: v = *(int32_t*) value;break;
}
int32_t index = -1;
if (pBucket->range.iMaxVal == INT32_MIN) { if (pBucket->range.iMaxVal == INT32_MIN) {
/* /*
* taking negative integer into consideration, * taking negative integer into consideration,
......
...@@ -295,9 +295,16 @@ out_of_memory: ...@@ -295,9 +295,16 @@ out_of_memory:
} }
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) { TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) {
pCond->order = TSDB_ORDER_ASC;
pCond->twindow = changeTableGroupByLastrow(groupList); pCond->twindow = changeTableGroupByLastrow(groupList);
// no qualified table
if (groupList->numOfTables == 0) {
return NULL;
}
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo);
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey);
return pQueryHandle; return pQueryHandle;
} }
...@@ -1981,8 +1988,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1981,8 +1988,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) { STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
STimeWindow window = {INT64_MAX, INT64_MIN}; STimeWindow window = {INT64_MAX, INT64_MIN};
int32_t totalNumOfTable = 0;
// NOTE: starts from the buffer in case of descending timestamp order check data blocks // NOTE: starts from the buffer in case of descending timestamp order check data blocks
// todo consider the query time window, current last_row does not apply the query time window
size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
for(int32_t j = 0; j < numOfGroups; ++j) { for(int32_t j = 0; j < numOfGroups; ++j) {
SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); SArray* pGroup = taosArrayGetP(groupList->pGroupList, j);
...@@ -1993,8 +2001,9 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) { ...@@ -1993,8 +2001,9 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
size_t numOfTables = taosArrayGetSize(pGroup); size_t numOfTables = taosArrayGetSize(pGroup);
for(int32_t i = 0; i < numOfTables; ++i) { for(int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pGroup, i); STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pGroup, i);
TSKEY lastKey = ((STable*)(pKeyInfo->pTable))->lastKey;
// if the lastKey equals to INT64_MIN, there is no data in this table
TSKEY lastKey = ((STable*)(pKeyInfo->pTable))->lastKey;
if (key < lastKey) { if (key < lastKey) {
key = lastKey; key = lastKey;
...@@ -2012,13 +2021,23 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) { ...@@ -2012,13 +2021,23 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
} }
} }
// clear current group
taosArrayClear(pGroup);
// more than one table in each group, only one table left for each group // more than one table in each group, only one table left for each group
if (numOfTables > 1) { if (keyInfo.pTable != NULL) {
taosArrayClear(pGroup); totalNumOfTable++;
taosArrayPush(pGroup, &keyInfo); taosArrayPush(pGroup, &keyInfo);
} }
} }
// window does not being updated, so set the original
if (window.skey == INT64_MAX && window.ekey == INT64_MIN) {
window = TSWINDOW_INITIALIZER;
assert(totalNumOfTable == 0);
}
groupList->numOfTables = totalNumOfTable;
return window; return window;
} }
......
...@@ -153,4 +153,22 @@ if $rows != 46 then ...@@ -153,4 +153,22 @@ if $rows != 46 then
return -1 return -1
endi endi
print ========>td-1317, empty table last_row query crashed
sql create table m1(ts timestamp, k int) tags (a int);
sql create table t1 using m1 tags(1);
sql create table t2 using m1 tags(2);
sql select last_row(*) from t1
if $rows != 0 then
return -1
endi
sql select last_row(*) from m1
if $rows != 0 then
return -1
endi
sql select last_row(*) from m1 where tbname in ('t1')
if $rows != 0 then
return -1
endi
...@@ -20,7 +20,7 @@ $db = $dbPrefix . $i ...@@ -20,7 +20,7 @@ $db = $dbPrefix . $i
$stb = $stbPrefix . $i $stb = $stbPrefix . $i
sql drop database if exists $db sql drop database if exists $db
sql create database $db maxrows 200 cache 1024 tblocks 200 maxTables 4 sql create database $db maxrows 200 maxTables 4
print ====== create tables print ====== create tables
sql use $db sql use $db
sql create table $stb (ts timestamp, c1 timestamp, c2 int) tags(t1 binary(20)) sql create table $stb (ts timestamp, c1 timestamp, c2 int) tags(t1 binary(20))
......
...@@ -22,12 +22,29 @@ $tsu = $tsu - $delta ...@@ -22,12 +22,29 @@ $tsu = $tsu - $delta
$tsu = $tsu + $ts0 $tsu = $tsu + $ts0
##### select from supertable ##### select from supertable
$tb = $tbPrefix . 0 $tb = $tbPrefix . 0
sql select first(c1), last(c1) from $tb where ts >= $ts0 and ts < $tsu interval(5m) fill(value, -1) sql select first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb where ts >= $ts0 and ts < $tsu interval(5m) fill(value, -1)
$res = $rowNum * 2 $res = $rowNum * 2
$res = $res - 1 $n = $res - 2
if $rows != $res then print ============>$n
if $rows != $n then
print expect $n, actual $rows
return -1 return -1
endi endi
if $data03 != 598.000000000 then
print expect 598.000000000, actual $data03
return -1
endi
if $data13 != 598.000000000 then
print expect 598.000000000, actual $data03
return -1
endi
sql select first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb where ts >= $ts0 and ts < $tsu interval(5m) fill(value, NULL)
if $data13 != 598.000000000 then
print expect 598.000000000, actual $data03
return -1
endi
\ No newline at end of file
...@@ -137,4 +137,23 @@ if $rows != 3 then ...@@ -137,4 +137,23 @@ if $rows != 3 then
return -1 return -1
endi endi
print =========>td-1308
sql create database db;
sql use db;
sql create table stb (ts timestamp, c1 int, c2 binary(10)) tags(t1 binary(10));
sql create table tb1 using stb tags('a1');
sql insert into tb1 values('2020-09-03 15:30:48.812', 0, 'tb1');
sql select count(*) from stb where ts > '2020-09-03 15:30:44' interval(4s);
if $rows != 1 then
return -1
endi
sql create table tb4 using stb tags('a4');
sql select count(*) from stb where ts > '2020-09-03 15:30:44' interval(4s);
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册