diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index c16945bbcf35ac5247ecbb505aaf667870fc9d09..d46c32d73d823d52c739fc01d1d11c0a59f27168 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -196,14 +196,14 @@ STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex); SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex); int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo); -void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache); +void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache); STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, SVgroupsInfo* vgroupList, SArray* pTagCols); STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); -void tscFreeSubqueryInfo(SSqlCmd* pCmd); +void tscFreeQueryInfo(SSqlCmd* pCmd); void tscClearSubqueryInfo(SSqlCmd* pCmd); int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 9cc6bcb364b76b8156a95cefb34afb27d41a599f..d6fb9279c7f1fd634795967d63829163689e1ed1 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -298,6 +298,7 @@ typedef struct STscObj { char sversion[TSDB_VERSION_LEN]; char writeAuth : 1; char superAuth : 1; + void* pMgmtConn; struct SSqlObj * pSql; struct SSqlObj * pHb; struct SSqlObj * sqlList; @@ -359,7 +360,7 @@ typedef struct SSqlStream { struct SSqlStream *prev, *next; } SSqlStream; -int32_t tscInitRpc(const char *user, const char *secret); +int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn); void tscInitMsgsFp(); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); @@ -427,9 +428,7 @@ int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column); extern void * pVnodeConn; -extern void * pTscMgmtConn; extern void * tscCacheHandle; -extern int slaveIndex; extern void * tscTmr; extern void * tscQhandle; extern int tscKeepConn[]; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 99d20de48bcf8b02590b5030a4432f5d83076998..b954db0734be2ffa14a01955fc50e1573bc33ee5 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -46,7 +46,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->signature = pSql; pSql->param = param; pSql->pTscObj = pObj; - pSql->maxRetry = 1; + pSql->maxRetry = TSDB_REPLICA_MAX_NUM; pSql->fp = fp; if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE)) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index df47afd0dcf2cd50e6dfa11b3ddd7ce40e95d877..59e2f147e1b62a6ff507cfc404902f26cc4d498b 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -733,7 +733,7 @@ int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SS */ if (size > 0) { if (strncasecmp(oldName, pTableMetaInfo->name, tListLen(pTableMetaInfo->name)) != 0) { - tscClearMeterMetaInfo(pTableMetaInfo, false); + tscClearTableMetaInfo(pTableMetaInfo, false); } } else { assert(pTableMetaInfo->pTableMeta == NULL); @@ -2477,6 +2477,10 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* return TSDB_CODE_SUCCESS; } + if (pQueryInfo->colList == NULL) { + pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); + } + pQueryInfo->groupbyExpr.numOfGroupCols = pList->nExpr; if (pList->nExpr > TSDB_MAX_TAGS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); @@ -4915,7 +4919,7 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex) { list.num = 1; list.ids[0] = colIndex; - insertResultField(pQueryInfo, size - 1, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr); + insertResultField(pQueryInfo, size, &list, pSchema->bytes, pSchema->type, pSchema->name, pExpr); SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, size - 1); pInfo->visible = false; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 15abcebe58c60f078315ed8852b7189ddcffaba5..f6d20f69ac55b4812e48c9dcda9859b1ffbe4dbf 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -190,6 +190,7 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { } int tscSendMsgToServer(SSqlObj *pSql) { + STscObj* pObj = pSql->pTscObj; SSqlCmd* pCmd = &pSql->cmd; char *pMsg = rpcMallocCont(pCmd->payloadLen); @@ -223,7 +224,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .handle = pSql, .code = 0 }; - rpcSendRequest(pTscMgmtConn, &pSql->ipList, &rpcMsg); + rpcSendRequest(pObj->pMgmtConn, &pSql->ipList, &rpcMsg); } return TSDB_CODE_SUCCESS; @@ -2599,7 +2600,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { // if (pSql->fp != NULL && pSql->pStream == NULL) { // pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); -// tscFreeSubqueryInfo(pCmd); +// tscFreeQueryInfo(pCmd); // } tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew); diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index f70beab16625eb9c80e74b921bf6b66987ca0e62..4320e8f81b39d521bba29750d709e60709d33a4b 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -66,7 +66,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con return NULL; } - if (tscInitRpc(user, pass) != 0) { + void* pMgmtConn = NULL; + if (tscInitRpc(user, pass, &pMgmtConn) != 0) { terrno = TSDB_CODE_NETWORK_UNAVAIL; return NULL; } @@ -118,6 +119,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con strtolower(pObj->db, tmp); } + pObj->pMgmtConn = pMgmtConn; pthread_mutex_init(&pObj->mutex, NULL); SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 9f7d4887d131d7bec873f409188b188f96e16245..d69068172937692ac95dd6ca64717b00e236321b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -147,7 +147,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf retryDelay); STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); - tscClearMeterMetaInfo(pTableMetaInfo, true); + tscClearTableMetaInfo(pTableMetaInfo, true); tscSetRetryTimer(pStream, pStream->pSql, retryDelay); return; @@ -177,7 +177,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (pSql == NULL || numOfRows < 0) { int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision); tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime); - tscClearMeterMetaInfo(pTableMetaInfo, true); + tscClearTableMetaInfo(pTableMetaInfo, true); tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime); return; @@ -259,7 +259,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf pStream->numOfRes); // release the metric/meter meta information reference, so data in cache can be updated - tscClearMeterMetaInfo(pTableMetaInfo, false); + tscClearTableMetaInfo(pTableMetaInfo, false); tscSetNextLaunchTimer(pStream, pSql); } } diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index fac771234ab147642f23dfb885b4f62653e2c221..f1b6065f1cc971d18fe4e5b99be4028a0e2608d5 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -33,9 +33,6 @@ // global, not configurable void * pVnodeConn; -void * pVMeterConn; -void * pTscMgmtConn; -void * pSlaveConn; void * tscCacheHandle; int slaveIndex; void * tscTmr; @@ -54,7 +51,7 @@ void tscCheckDiskUsage(void *para, void *unused) { taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); } -int32_t tscInitRpc(const char *user, const char *secret) { +int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) { SRpcInit rpcInit; char secretEncrypt[32] = {0}; taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt); @@ -80,7 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secret) { } } - if (pTscMgmtConn == NULL) { + if (*pMgmtConn == NULL) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localIp = tsLocalIp; rpcInit.localPort = 0; @@ -96,8 +93,8 @@ int32_t tscInitRpc(const char *user, const char *secret) { rpcInit.spi = 1; rpcInit.secret = secretEncrypt; - pTscMgmtConn = rpcOpen(&rpcInit); - if (pTscMgmtConn == NULL) { + *pMgmtConn = rpcOpen(&rpcInit); + if (*pMgmtConn == NULL) { tscError("failed to init connection to mgmt"); return -1; } @@ -211,11 +208,6 @@ void taos_cleanup() { pVnodeConn = NULL; } - if (pTscMgmtConn != NULL) { - rpcClose(pTscMgmtConn); - pTscMgmtConn = NULL; - } - taosTmrCleanUp(tscTmr); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index abdab2d9a162faf544dfbf7daa958b657aaa446f..e9ff558b92e45342c615ef8904a9236f913969ce 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -337,7 +337,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) { pCmd->pTableList= NULL; pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - tscFreeSubqueryInfo(pCmd); + tscFreeQueryInfo(pCmd); } /* @@ -761,6 +761,8 @@ void tscCloseTscObj(STscObj* pObj) { tscFreeSqlObj(pSql); sem_destroy(&pSql->rspSem); + rpcClose(pObj->pMgmtConn); + pthread_mutex_destroy(&pObj->mutex); tscTrace("%p DB connection is closed", pObj); @@ -1459,7 +1461,7 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb) { void tscCleanSqlCmd(SSqlCmd* pCmd) { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); - tscFreeSubqueryInfo(pCmd); + tscFreeQueryInfo(pCmd); uint32_t allocSize = pCmd->allocSize; char* allocPtr = pCmd->payload; @@ -1601,7 +1603,7 @@ int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { return TSDB_CODE_SUCCESS; } -static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { +static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { tscTagCondRelease(&pQueryInfo->tagCond); tscFieldInfoClear(&pQueryInfo->fieldsInfo); @@ -1611,6 +1613,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { tscColumnListDestroy(pQueryInfo->colList); memset(&pQueryInfo->colList, 0, sizeof(pQueryInfo->colList)); + if (pQueryInfo->groupbyExpr.columnInfo != NULL) { + taosArrayDestroy(pQueryInfo->groupbyExpr.columnInfo); + pQueryInfo->groupbyExpr.columnInfo = NULL; + } + pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf); tfree(pQueryInfo->defaultVal); @@ -1619,11 +1626,11 @@ static void doClearSubqueryInfo(SQueryInfo* pQueryInfo) { void tscClearSubqueryInfo(SSqlCmd* pCmd) { for (int32_t i = 0; i < pCmd->numOfClause; ++i) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); - doClearSubqueryInfo(pQueryInfo); + freeQueryInfoImpl(pQueryInfo); } } -void tscFreeSubqueryInfo(SSqlCmd* pCmd) { +void tscFreeQueryInfo(SSqlCmd* pCmd) { if (pCmd == NULL || pCmd->numOfClause == 0) { return; } @@ -1632,7 +1639,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd) { char* addr = (char*)pCmd - offsetof(SSqlObj, cmd); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, i); - doClearSubqueryInfo(pQueryInfo); + freeQueryInfoImpl(pQueryInfo); tscClearAllTableMetaInfo(pQueryInfo, (const char*)addr, false); tfree(pQueryInfo); } @@ -1691,7 +1698,7 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index); - tscClearMeterMetaInfo(pTableMetaInfo, removeFromCache); + tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); free(pTableMetaInfo); int32_t after = pQueryInfo->numOfTables - index - 1; @@ -1713,13 +1720,18 @@ void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool tfree(pQueryInfo->pTableMetaInfo); } -void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) { +void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) { if (pTableMetaInfo == NULL) { return; } taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); tfree(pTableMetaInfo->vgroupList); + + if (pTableMetaInfo->tagColList != NULL) { + taosArrayDestroy(pTableMetaInfo->tagColList); + pTableMetaInfo->tagColList = NULL; + } } void tscResetForNextRetrieve(SSqlRes* pRes) { diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index e2f3b2f6552b172887512710021eed9211702890..1cb7918ed5412b7f438b22e3bc3871302f47be86 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -990,9 +990,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat return TSDB_CODE_SUCCESS; } -static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int16_t *type, int16_t *bytes) { - char *groupbyColumnData = NULL; - +static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes, SArray* pDataBlock) { SSqlGroupbyExpr *pGroupbyExpr = pQuery->pGroupbyExpr; for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { @@ -1015,12 +1013,22 @@ static UNUSED_FUNC char *getGroupbyColumnData(SQuery *pQuery, SData **data, int1 *type = pQuery->colList[colIndex].type; *bytes = pQuery->colList[colIndex].bytes; - - // groupbyColumnData = doGetDataBlocks(pQuery, data, pQuery->colList[colIndex].inf); - break; + /* + * the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare + * stage, the remain meter may not have the required column in cache actually. So, the validation of required + * column in cache with the corresponding meter schema is reinforced. + */ + int32_t numOfCols = taosArrayGetSize(pDataBlock); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData *p = taosArrayGet(pDataBlock, i); + if (pColIndex->colId == p->info.colId) { + return p->pData; + } + } } - - return groupbyColumnData; + + return NULL; } static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) { @@ -1091,8 +1099,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS char *groupbyColumnData = NULL; if (groupbyStateValue) { - assert(0); - // groupbyColumnData = getGroupbyColumnData(pQuery, data, &type, &bytes); + groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock); } for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { @@ -6088,9 +6095,16 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) STableId *id = taosArrayGet(pTableIdList, 0); id->uid = -1; // todo fix me - + + // group by normal column, do not pass the group by condition to tsdb to group table into different group + int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols; + if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(pGroupColIndex->flag)) { + numOfGroupByCols = 0; + } + + // todo handle the error /*int32_t ret =*/tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, - pQueryMsg->numOfGroupCols); + numOfGroupByCols); if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; diff --git a/src/query/tests/patternMatchTest.cpp b/src/query/tests/patternMatchTest.cpp index 2e70f26269f170009f6939350e7eba951db64690..41156ce8ffcce87af4c275bc293d5f0b279a0dd8 100644 --- a/src/query/tests/patternMatchTest.cpp +++ b/src/query/tests/patternMatchTest.cpp @@ -58,7 +58,7 @@ TEST(testCase, patternMatchTest) { EXPECT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH); str = "abcdefgabcdeju"; - ret = patternMatch("abc%f_", str, 1, &info); + ret = patternMatch("abc%f_", str, 1, &info); // pattern string is longe than the size EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH); str = "abcdefgabcdeju"; @@ -72,4 +72,8 @@ TEST(testCase, patternMatchTest) { str = "abcdefgabcdeju"; ret = patternMatch("a__", str, 2, &info); EXPECT_EQ(ret, TSDB_PATTERN_NOMATCH); + + str = "carzero"; + ret = patternMatch("%o", str, strlen(str), &info); + EXPECT_EQ(ret, TSDB_PATTERN_MATCH); } diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index 984730002436f870e70cf9cc0f23f48b178257eb..e5a70740f112f44428c3a75d96c0bba3927d8731 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -75,7 +75,7 @@ int main(int argc, char *argv[]) { doQuery(taos, "create database if not exists test"); doQuery(taos, "use test"); - doQuery(taos, "insert into tm99 values('2020-01-01 1:1:1', 99);"); + doQuery(taos, "select count(*),k,sum(k) from m1 group by k"); // doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);"); @@ -86,7 +86,7 @@ int main(int argc, char *argv[]) { // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);"); // doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);"); - doQuery(taos, "select sum(k),count(*) from m1 group by a"); +// doQuery(taos, "select sum(k),count(*) from m1 group by a"); taos_close(taos); return 0;