diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 9c7a3e0bf0246abeb6fd3ed14d50caf97db53e74..6d01538d8fef438eb18fe05f5adf6bd313cbd7f3 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1238,8 +1238,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { goto _clean; } - if (pCmd->pDataBlocks->nSize > 0) { - // merge according to vgId + if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { goto _error_clean; } @@ -1294,12 +1293,7 @@ int tsParseInsertSql(SSqlObj *pSql) { int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { int32_t ret = TSDB_CODE_SUCCESS; - -// if (NULL == pSql->asyncTblPos) { -// tscCleanSqlCmd(&pSql->cmd); -// } else { - tscTrace("continue parse sql: %s", pSql->asyncTblPos); -// } + tscTrace("continue parse sql: %s", pSql->asyncTblPos); if (tscIsInsertOrImportData(pSql->sqlstr)) { /* diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 804d4e5268ba3cf74ff75463e16b792ebf50355b..2505d544e481a4ad98d212ba4908dae29add35b6 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2789,7 +2789,7 @@ static int32_t optrToString(tSQLExpr* pExpr, char** exprString) { return TSDB_CODE_SUCCESS; } -static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/ SStringBuilder* sb) { +static int32_t tablenameListToString(tSQLExpr* pExpr, SStringBuilder* sb) { tSQLExprList* pList = pExpr->pParam; if (pList->nExpr <= 0) { return TSDB_CODE_INVALID_SQL; @@ -2815,7 +2815,7 @@ static int32_t tablenameListToString(tSQLExpr* pExpr, /*char* str*/ SStringBuild return TSDB_CODE_SUCCESS; } -static int32_t tablenameCondToString(tSQLExpr* pExpr, /*char* str*/ SStringBuilder* sb) { +static int32_t tablenameCondToString(tSQLExpr* pExpr, SStringBuilder* sb) { taosStringBuilderAppendStringLen(sb, QUERY_COND_REL_PREFIX_LIKE, QUERY_COND_REL_PREFIX_LIKE_LEN); taosStringBuilderAppendString(sb, pExpr->val.pz); @@ -3756,8 +3756,8 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql return TSDB_CODE_SUCCESS; } - const char* msg = "invalid filter expression"; const char* msg1 = "invalid expression"; + const char* msg2 = "invalid filter expression"; int32_t ret = TSDB_CODE_SUCCESS; @@ -3819,7 +3819,7 @@ int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql taosStringBuilderDestroy(&sb); if (!validateFilterExpr(pQueryInfo)) { - return invalidSqlErrMsg(pQueryInfo->msg, msg); + return invalidSqlErrMsg(pQueryInfo->msg, msg2); } doAddJoinTagsColumnsIntoTagList(pQueryInfo, &condExpr); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 22f8a85757d39b0bdd7da35e681b97f0ce820bed..498a33838966bd4de795f0ab9474cd71bc83988c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -847,6 +847,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); } + // serialize tag column query condition if (pQueryInfo->tagCond.numOfTagCond > 0) { STagCond* pTagCond = &pQueryInfo->tagCond; @@ -865,6 +866,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } } + // tbname in/like query expression should be sent to mgmt node + STagCond* pTagCond = &pQueryInfo->tagCond; + if (pTagCond->tbnameCond.cond != NULL) { + size_t s = strlen(pTagCond->tbnameCond.cond); + memcpy(pMsg, pTagCond->tbnameCond.cond, s); + + pQueryMsg->nameCondLen = htons(s); + pMsg += s; + } + msgLen = pMsg - pStart; tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index ec6881db3f5ae4f1e32dbf790bfb662ae3df84c5..18f491aa9edaaa60efbc6cbca6bfc54e532f8b9c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -608,7 +608,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { * the payloadLen should be actual message body size * the old value of payloadLen is the allocated payload size */ - pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; + pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize - sizeof(SMsgDesc); assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0); return TSDB_CODE_SUCCESS; diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 965727c9d02ec8173b71f2b7b01dd8896831841a..cb25cbfd525623d3832187135beadb3981475af9 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -243,16 +243,6 @@ static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMs taos_queue queue = dnodeGetVnodeRworker(pVnode); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); - -// SReadMsg readMsg = { -// .rpcMsg = {0}, -// .pCont = qhandle, -// .contLen = 0, -// .pRpcContext = pMsg->pRpcContext, -// }; -// -// taos_queue queue = dnodeGetVnodeRworker(pVnode); -// taosWriteQitem(queue, TSDB_MSG_TYPE_QUERY, &readMsg); } static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 999e293d42aca07c5d42c3adcc89b593746283eb..9ac06e4199becba6759f6279d8622ba88475b9b8 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -475,9 +475,10 @@ typedef struct { int64_t slidingTime; // value for sliding window char slidingTimeUnit; // time interval type, for revisement of interval(1d) uint16_t tagCondLen; // tag length in current query + uint16_t nameCondLen; // table name in/like query expression string length int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; - int16_t orderType; // used in group by xx order by xxx + int16_t orderType; // used in group by xx order by xxx uint64_t groupbyTagIds; int64_t limit; int64_t offset; diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index dc7aff105fc73d1808a5f5798b8a6338a4cf93c0..2e1d86a94261cfe692411fcd5ce0b566423aa991 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -172,7 +172,6 @@ typedef struct SQueryRuntimeEnv { typedef struct SQInfo { void* signature; -// void* param; // pointer to the RpcReadMsg TSKEY startTime; TSKEY elapsedTime; int32_t pointsInterpo; diff --git a/src/query/src/qast.c b/src/query/src/qast.c index b301bbf04342ba26128c5800541b200bbdfb6fc8..c6c65eba27e0e96a7ca895a1cdeaeec6fd302029 100644 --- a/src/query/src/qast.c +++ b/src/query/src/qast.c @@ -16,6 +16,7 @@ #include "qast.h" #include #include +#include "../../client/inc/tschemautil.h" #include "os.h" #include "qsqlparser.h" #include "qsyntaxtreefunction.h" @@ -107,7 +108,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, return NULL; } - size_t nodeSize = sizeof(tSQLSyntaxNode); + size_t nodeSize = sizeof(tSQLSyntaxNode); tSQLSyntaxNode *pNode = NULL; if (pToken->type == TK_ID || pToken->type == TK_TBNAME) { @@ -239,9 +240,7 @@ uint8_t isQueryOnPrimaryKey(const char *primaryColumnName, const tSQLSyntaxNode } static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, char *str, int32_t *i) { - SSQLToken t0; - - t0 = tStrGetToken(str, i, false, 0, NULL); + SSQLToken t0 = tStrGetToken(str, i, false, 0, NULL); if (t0.n == 0) { return NULL; } @@ -343,7 +342,8 @@ void tSQLBinaryExprFromString(tSQLBinaryExpr **pExpr, SSchema *pSchema, int32_t return; } - int32_t pos = 0; + int32_t pos = 0; + tSQLSyntaxNode *pStxNode = createSyntaxTree(pSchema, numOfCols, src, &pos); if (pStxNode != NULL) { assert(pStxNode->nodeType == TSQL_NODE_EXPR); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 86baf44f45fddda21b361ddad5273b2a321135a6..2f219368080b135cb380293f565dfb332320b538 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -5265,11 +5265,11 @@ void qTableQuery(SQInfo *pQInfo) { // vnodeDecRefCount(pQInfo); } -static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) { +static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) { int32_t j = 0; - while (j < pQueryTableMsg->numOfCols) { - if (pExprMsg->colInfo.colId == pQueryTableMsg->colList[j].colId) { + while (j < pQueryMsg->numOfCols) { + if (pExprMsg->colInfo.colId == pQueryMsg->colList[j].colId) { break; } @@ -5279,44 +5279,44 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryTableMsg, SSqlFuncEx return j; } -bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) { - int32_t j = getColumnIndexInSource(pQueryTableMsg, pExprMsg); - return j < pQueryTableMsg->numOfCols; +bool vnodeValidateExprColumnInfo(SQueryTableMsg *pQueryMsg, SSqlFuncExprMsg *pExprMsg) { + int32_t j = getColumnIndexInSource(pQueryMsg, pExprMsg); + return j < pQueryMsg->numOfCols; } -static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryTableMsg) { - if (pQueryTableMsg->intervalTime < 0) { - dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryTableMsg, pQueryTableMsg->intervalTime); +static int32_t validateQueryMeterMsg(SQueryTableMsg *pQueryMsg) { + if (pQueryMsg->intervalTime < 0) { + dError("qmsg:%p illegal value of aggTimeInterval %" PRId64 "", pQueryMsg, pQueryMsg->intervalTime); return -1; } - if (pQueryTableMsg->numOfCols <= 0 || pQueryTableMsg->numOfCols > TSDB_MAX_COLUMNS) { - dError("qmsg:%p illegal value of numOfCols %d", pQueryTableMsg, pQueryTableMsg->numOfCols); + if (pQueryMsg->numOfCols <= 0 || pQueryMsg->numOfCols > TSDB_MAX_COLUMNS) { + dError("qmsg:%p illegal value of numOfCols %d", pQueryMsg, pQueryMsg->numOfCols); return -1; } - if (pQueryTableMsg->numOfTables <= 0) { - dError("qmsg:%p illegal value of numOfTables %d", pQueryTableMsg, pQueryTableMsg->numOfTables); + if (pQueryMsg->numOfTables <= 0) { + dError("qmsg:%p illegal value of numOfTables %d", pQueryMsg, pQueryMsg->numOfTables); return -1; } - if (pQueryTableMsg->numOfGroupCols < 0) { - dError("qmsg:%p illegal value of numOfGroupbyCols %d", pQueryTableMsg, pQueryTableMsg->numOfGroupCols); + if (pQueryMsg->numOfGroupCols < 0) { + dError("qmsg:%p illegal value of numOfGroupbyCols %d", pQueryMsg, pQueryMsg->numOfGroupCols); return -1; } - if (pQueryTableMsg->numOfOutputCols > TSDB_MAX_COLUMNS || pQueryTableMsg->numOfOutputCols <= 0) { - dError("qmsg:%p illegal value of output columns %d", pQueryTableMsg, pQueryTableMsg->numOfOutputCols); + if (pQueryMsg->numOfOutputCols > TSDB_MAX_COLUMNS || pQueryMsg->numOfOutputCols <= 0) { + dError("qmsg:%p illegal value of output columns %d", pQueryMsg, pQueryMsg->numOfOutputCols); return -1; } return 0; } -static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArray** pTableIdList) { - assert(pQueryTableMsg->numOfTables > 0); +static char* createTableIdList(SQueryTableMsg* pQueryMsg, char* pMsg, SArray** pTableIdList) { + assert(pQueryMsg->numOfTables > 0); - *pTableIdList = taosArrayInit(pQueryTableMsg->numOfTables, sizeof(STableIdInfo)); + *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo)); STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->sid = htonl(pTableIdInfo->sid); @@ -5326,7 +5326,7 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra taosArrayPush(*pTableIdList, pTableIdInfo); pMsg += sizeof(STableIdInfo); - for (int32_t j = 1; j < pQueryTableMsg->numOfTables; ++j) { + for (int32_t j = 1; j < pQueryMsg->numOfTables; ++j) { pTableIdInfo = (STableIdInfo *)pMsg; pTableIdInfo->sid = htonl(pTableIdInfo->sid); @@ -5341,49 +5341,47 @@ static char* createTableIdList(SQueryTableMsg* pQueryTableMsg, char* pMsg, SArra } /** - * pQueryTableMsg->head has been converted before this function is called. + * pQueryMsg->head has been converted before this function is called. * - * @param pQueryTableMsg + * @param pQueryMsg * @param pTableIdList * @param pExpr * @return */ -static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, - wchar_t** tagCond) { - pQueryTableMsg->numOfTables = htonl(pQueryTableMsg->numOfTables); - - pQueryTableMsg->window.skey = htobe64(pQueryTableMsg->window.skey); - pQueryTableMsg->window.ekey = htobe64(pQueryTableMsg->window.ekey); - pQueryTableMsg->intervalTime = htobe64(pQueryTableMsg->intervalTime); - pQueryTableMsg->slidingTime = htobe64(pQueryTableMsg->slidingTime); +static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, SSqlFuncExprMsg ***pExpr, + wchar_t** tagCond, char** nameCond) { + pQueryMsg->numOfTables = htonl(pQueryMsg->numOfTables); + + pQueryMsg->window.skey = htobe64(pQueryMsg->window.skey); + pQueryMsg->window.ekey = htobe64(pQueryMsg->window.ekey); + pQueryMsg->intervalTime = htobe64(pQueryMsg->intervalTime); + pQueryMsg->slidingTime = htobe64(pQueryMsg->slidingTime); + pQueryMsg->limit = htobe64(pQueryMsg->limit); + pQueryMsg->offset = htobe64(pQueryMsg->offset); - pQueryTableMsg->limit = htobe64(pQueryTableMsg->limit); - pQueryTableMsg->offset = htobe64(pQueryTableMsg->offset); - - pQueryTableMsg->order = htons(pQueryTableMsg->order); - pQueryTableMsg->orderColId = htons(pQueryTableMsg->orderColId); - - pQueryTableMsg->queryType = htons(pQueryTableMsg->queryType); - - pQueryTableMsg->numOfCols = htons(pQueryTableMsg->numOfCols); - pQueryTableMsg->numOfOutputCols = htons(pQueryTableMsg->numOfOutputCols); - pQueryTableMsg->numOfGroupCols = htons(pQueryTableMsg->numOfGroupCols); - pQueryTableMsg->tagCondLen = htons(pQueryTableMsg->tagCondLen); - - pQueryTableMsg->tsOffset = htonl(pQueryTableMsg->tsOffset); - pQueryTableMsg->tsLen = htonl(pQueryTableMsg->tsLen); - pQueryTableMsg->tsNumOfBlocks = htonl(pQueryTableMsg->tsNumOfBlocks); - pQueryTableMsg->tsOrder = htonl(pQueryTableMsg->tsOrder); + pQueryMsg->order = htons(pQueryMsg->order); + pQueryMsg->orderColId = htons(pQueryMsg->orderColId); + pQueryMsg->queryType = htons(pQueryMsg->queryType); + + pQueryMsg->numOfCols = htons(pQueryMsg->numOfCols); + pQueryMsg->numOfOutputCols = htons(pQueryMsg->numOfOutputCols); + pQueryMsg->numOfGroupCols = htons(pQueryMsg->numOfGroupCols); + pQueryMsg->tagCondLen = htons(pQueryMsg->tagCondLen); + pQueryMsg->nameCondLen = htons(pQueryMsg->nameCondLen); + pQueryMsg->tsOffset = htonl(pQueryMsg->tsOffset); + pQueryMsg->tsLen = htonl(pQueryMsg->tsLen); + pQueryMsg->tsNumOfBlocks = htonl(pQueryMsg->tsNumOfBlocks); + pQueryMsg->tsOrder = htonl(pQueryMsg->tsOrder); // query msg safety check - if (validateQueryMeterMsg(pQueryTableMsg) != 0) { + if (validateQueryMeterMsg(pQueryMsg) != 0) { return TSDB_CODE_INVALID_QUERY_MSG; } - char *pMsg = (char *)(pQueryTableMsg->colList) + sizeof(SColumnInfo) * pQueryTableMsg->numOfCols; + char *pMsg = (char *)(pQueryMsg->colList) + sizeof(SColumnInfo) * pQueryMsg->numOfCols; - for (int32_t col = 0; col < pQueryTableMsg->numOfCols; ++col) { - SColumnInfo* pColInfo = &pQueryTableMsg->colList[col]; + for (int32_t col = 0; col < pQueryMsg->numOfCols; ++col) { + SColumnInfo* pColInfo = &pQueryMsg->colList[col]; pColInfo->colId = htons(pColInfo->colId); pColInfo->type = htons(pColInfo->type); @@ -5423,10 +5421,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId bool hasArithmeticFunction = false; - *pExpr = calloc(pQueryTableMsg->numOfOutputCols, POINTER_BYTES); + *pExpr = calloc(pQueryMsg->numOfOutputCols, POINTER_BYTES); SSqlFuncExprMsg *pExprMsg = (SSqlFuncExprMsg *)pMsg; - for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { (*pExpr)[i] = pExprMsg; pExprMsg->colInfo.colIdx = htons(pExprMsg->colInfo.colIdx); @@ -5457,7 +5455,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId return TSDB_CODE_INVALID_QUERY_MSG; } } else { - if (!vnodeValidateExprColumnInfo(pQueryTableMsg, pExprMsg)) { + if (!vnodeValidateExprColumnInfo(pQueryMsg, pExprMsg)) { return TSDB_CODE_INVALID_QUERY_MSG; } } @@ -5465,55 +5463,59 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryTableMsg, SArray **pTableId pExprMsg = (SSqlFuncExprMsg *)pMsg; } - pQueryTableMsg->colNameLen = htonl(pQueryTableMsg->colNameLen); + pQueryMsg->colNameLen = htonl(pQueryMsg->colNameLen); if (hasArithmeticFunction) { // column name array - assert(pQueryTableMsg->colNameLen > 0); - pQueryTableMsg->colNameList = (int64_t)pMsg; - pMsg += pQueryTableMsg->colNameLen; + assert(pQueryMsg->colNameLen > 0); + pQueryMsg->colNameList = (int64_t)pMsg; + pMsg += pQueryMsg->colNameLen; } - pMsg = createTableIdList(pQueryTableMsg, pMsg, pTableIdList); + pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList); - if (pQueryTableMsg->numOfGroupCols > 0) { // group by tag columns -// if (pQueryTableMsg->numOfGroupCols > 0) { -// pQueryTableMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryTableMsg->numOfTagsCols]); + if (pQueryMsg->numOfGroupCols > 0) { // group by tag columns +// if (pQueryMsg->numOfGroupCols > 0) { +// pQueryMsg->groupbyTagIds = (uint64_t) & (pTagSchema[pQueryMsg->numOfTagsCols]); // } else { -// pQueryTableMsg->groupbyTagIds = 0; +// pQueryMsg->groupbyTagIds = 0; // } - pQueryTableMsg->orderByIdx = htons(pQueryTableMsg->orderByIdx); - pQueryTableMsg->orderType = htons(pQueryTableMsg->orderType); + pQueryMsg->orderByIdx = htons(pQueryMsg->orderByIdx); + pQueryMsg->orderType = htons(pQueryMsg->orderType); - pMsg += sizeof(SColIndexEx) * pQueryTableMsg->numOfGroupCols; + pMsg += sizeof(SColIndexEx) * pQueryMsg->numOfGroupCols; } else { - pQueryTableMsg->groupbyTagIds = 0; + pQueryMsg->groupbyTagIds = 0; } - pQueryTableMsg->interpoType = htons(pQueryTableMsg->interpoType); - if (pQueryTableMsg->interpoType != TSDB_INTERPO_NONE) { - pQueryTableMsg->defaultVal = (uint64_t)(pMsg); + pQueryMsg->interpoType = htons(pQueryMsg->interpoType); + if (pQueryMsg->interpoType != TSDB_INTERPO_NONE) { + pQueryMsg->defaultVal = (uint64_t)(pMsg); int64_t *v = (int64_t *)pMsg; - for (int32_t i = 0; i < pQueryTableMsg->numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryMsg->numOfOutputCols; ++i) { v[i] = htobe64(v[i]); } - pMsg += sizeof(int64_t) * pQueryTableMsg->numOfOutputCols; + pMsg += sizeof(int64_t) * pQueryMsg->numOfOutputCols; } // the tag query condition expression string is located at the end of query msg - if (pQueryTableMsg->tagCondLen > 0) { - *tagCond = calloc(1, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE); - memcpy(*tagCond, pMsg, pQueryTableMsg->tagCondLen * TSDB_NCHAR_SIZE); + if (pQueryMsg->tagCondLen > 0) { + *tagCond = calloc(1, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); + memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen * TSDB_NCHAR_SIZE); + } + + if (pQueryMsg->nameCondLen > 0) { + *nameCond = strndup(pMsg, pQueryMsg->nameCondLen); } dTrace("qmsg:%p query on %d meter(s), qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, numOfTagCols:%d, " "timestamp order:%d, tags order:%d, tags order col:%d, numOfOutputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptslen:%d, limit:%" PRId64 ", offset:%" PRId64, - pQueryTableMsg, pQueryTableMsg->numOfTables, pQueryTableMsg->window.skey, pQueryTableMsg->window.ekey, - pQueryTableMsg->numOfGroupCols, pQueryTableMsg->order, pQueryTableMsg->orderType, - pQueryTableMsg->orderByIdx, pQueryTableMsg->numOfOutputCols, - pQueryTableMsg->numOfCols, pQueryTableMsg->intervalTime, pQueryTableMsg->interpoType, pQueryTableMsg->tsLen, - pQueryTableMsg->limit, pQueryTableMsg->offset); + pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->window.skey, pQueryMsg->window.ekey, + pQueryMsg->numOfGroupCols, pQueryMsg->order, pQueryMsg->orderType, + pQueryMsg->orderByIdx, pQueryMsg->numOfOutputCols, + pQueryMsg->numOfCols, pQueryMsg->intervalTime, pQueryMsg->interpoType, pQueryMsg->tsLen, + pQueryMsg->limit, pQueryMsg->offset); return 0; } @@ -6047,54 +6049,59 @@ _error: return code; } -int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { - assert(pQueryTableMsg != NULL); +int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, SQInfo **pQInfo) { + assert(pQueryMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; SArray *pTableIdList = NULL; SSqlFuncExprMsg** pExprMsg = NULL; wchar_t* tagCond = NULL; + char* nameCond = NULL; - if ((code = convertQueryMsg(pQueryTableMsg, &pTableIdList, &pExprMsg, &tagCond)) != TSDB_CODE_SUCCESS) { + if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &nameCond)) != TSDB_CODE_SUCCESS) { return code; } - if (pQueryTableMsg->numOfTables <= 0) { - dError("Invalid number of tables to query, numOfTables:%d", pQueryTableMsg->numOfTables); + if (pQueryMsg->numOfTables <= 0) { + dError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables); code = TSDB_CODE_INVALID_QUERY_MSG; goto _query_over; } // todo check vnode status if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) { - dError("qmsg:%p, SQueryTableMsg wrong format", pQueryTableMsg); + dError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg); code = TSDB_CODE_INVALID_QUERY_MSG; goto _query_over; } SSqlFunctionExpr *pExprs = NULL; - if ((code = createSqlFunctionExprFromMsg(pQueryTableMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) { + if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg)) != TSDB_CODE_SUCCESS) { goto _query_over; } - SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryTableMsg, &code); - if ((pGroupbyExpr == NULL && pQueryTableMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { + SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, &code); + if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) { goto _query_over; } // super table query - if ((pQueryTableMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { + SArray* res = NULL; + if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { STableId* id = taosArrayGet(pTableIdList, 0); + id->uid = -1; - SArray* res = tsdbQueryTableList(tsdb, id->uid, tagCond, pQueryTableMsg->tagCondLen); + res = tsdbQueryTableList(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen); if (taosArrayGetSize(res) == 0) { // no qualified table in stable query in this vnode code = TSDB_CODE_SUCCESS; goto _query_over; } + } else { + res = pTableIdList; } - code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); + code = createQInfo(pQueryMsg, pGroupbyExpr, pExprs, res, tsdb, pQInfo); _query_over: if (code != TSDB_CODE_SUCCESS) { @@ -6103,16 +6110,16 @@ _query_over: // if failed to add ref for all meters in this query, abort current query // if (code != TSDB_CODE_SUCCESS) { - // vnodeDecQueryRefCount(pQueryTableMsg, pMeterObjList, incNumber); + // vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber); // } // - // tfree(pQueryTableMsg->pSqlFuncExprs); + // tfree(pQueryMsg->pSqlFuncExprs); // tfree(pMeterObjList); // ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle); // - // tfree(pQueryTableMsg->pSidExtInfo); - // for(int32_t i = 0; i < pQueryTableMsg->numOfCols; ++i) { - // vnodeFreeColumnInfo(&pQueryTableMsg->colList[i]); + // tfree(pQueryMsg->pSidExtInfo); + // for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) { + // vnodeFreeColumnInfo(&pQueryMsg->colList[i]); // } // // atomic_fetch_add_32(&vnodeSelectReqNum, 1); diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 0140e2e11e6a24c0f52a96a85ed199d3cbdbe8b9..3119fdc7ccc9287fea28b4364cad75e7faa08784 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -73,14 +73,15 @@ typedef struct SQueryFilesInfo { char dbFilePathPrefix[PATH_MAX]; } SQueryFilesInfo; -typedef struct STableQueryRec { +typedef struct STableQueryInfo { + STableId tableId; TSKEY lastKey; STable * pTableObj; int64_t offsetInHeaderFile; int32_t numOfBlocks; int32_t start; SCompBlock *pBlock; -} STableQueryRec; +} STableQueryInfo; typedef struct { SCompBlock *compBlock; @@ -89,7 +90,7 @@ typedef struct { typedef struct STableDataBlockInfoEx { SCompBlockFields pBlock; - STableQueryRec * pMeterDataInfo; + STableQueryInfo* pMeterDataInfo; int32_t blockIndex; int32_t groupIdx; /* number of group is less than the total number of meters */ } STableDataBlockInfoEx; @@ -100,12 +101,11 @@ typedef struct STsdbQueryHandle { SQueryHandlePos cur; // current position SQueryHandlePos start; // the start position, used for secondary/third iteration int32_t unzipBufSize; - char *unzipBuffer; - char *secondaryUnzipBuffer; + char *unzipBuffer; + char *secondaryUnzipBuffer; SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ - SQueryFilesInfo vnodeFileInfo; int16_t numOfRowsPerPage; @@ -113,21 +113,22 @@ typedef struct STsdbQueryHandle { int16_t order; STimeWindow window; // the primary query time window that applies to all queries int32_t blockBufferSize; - SCompBlock *pBlock; + SCompBlock* pBlock; int32_t numOfBlocks; SField ** pFields; SArray * pColumns; // column list, SColumnInfoEx array list - SArray * pTableIdList; // table id object list bool locateStart; int32_t realNumOfRows; bool loadDataAfterSeek; // load data after seek. - - STableDataBlockInfoEx *pDataBlockInfoEx; - STableQueryRec * pTableQueryInfo; - int32_t tableIndex; - bool isFirstSlot; - void * qinfo; // query info handle, for debug purpose + SArray* pTableQueryInfo; + int32_t activeIndex; + + int32_t tableIndex; + bool isFirstSlot; + void * qinfo; // query info handle, for debug purpose + SSkipListIterator* memIter; + STableDataBlockInfoEx *pDataBlockInfoEx; } STsdbQueryHandle; int32_t doAllocateBuf(STsdbQueryHandle *pQueryHandle, int32_t rowsPerFileBlock) { @@ -263,25 +264,27 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond pQueryHandle->window = pCond->twindow; pQueryHandle->pTsdb = tsdb; - pQueryHandle->pTableIdList = idList; pQueryHandle->pColumns = pColumnInfo; pQueryHandle->loadDataAfterSeek = false; pQueryHandle->isFirstSlot = true; - // only support table query - assert(taosArrayGetSize(idList) == 1); - - pQueryHandle->pTableQueryInfo = calloc(1, sizeof(STableQueryRec)); - STableQueryRec* pTableQRec = pQueryHandle->pTableQueryInfo; - - pTableQRec->lastKey = pQueryHandle->window.skey; - - STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0); - - STable *pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), idInfo->uid); - assert(pTable != NULL); + size_t size = taosArrayGetSize(idList); + assert(size >= 1); + + pQueryHandle->pTableQueryInfo = taosArrayInit(size, sizeof(STableQueryInfo)); + for(int32_t i = 0; i < size; ++i) { + STableId id = *(STableId*) taosArrayGet(idList, i); + + STableQueryInfo info = { + .lastKey = pQueryHandle->window.skey, + .tableId = id, + .pTableObj = tsdbGetTableByUid(tsdbGetMeta(tsdb), id.uid), //todo this may be failed + }; + + taosArrayPush(pQueryHandle->pTableQueryInfo, &info); + } - pTableQRec->pTableObj = pTable; + pQueryHandle->activeIndex = 0; // malloc buffer in order to load data from file int32_t numOfCols = taosArrayGetSize(pColumnInfo); @@ -313,7 +316,9 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; - STable *pTable = pHandle->pTableQueryInfo->pTableObj; + STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex); + + STable *pTable = pTableQInfo->pTableObj; // no data in cache, abort if (pTable->mem == NULL && pTable->imem == NULL) { @@ -321,7 +326,7 @@ bool tsdbNextDataBlock(tsdb_query_handle_t *pQueryHandle) { } // all data in mem are checked already. - if (pHandle->pTableQueryInfo->lastKey > pTable->mem->keyLast) { + if (pTableQInfo->lastKey > pTable->mem->keyLast) { return false; } @@ -364,9 +369,9 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max // copy data from cache into data block SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle; - STableIdInfo* idInfo = taosArrayGet(pHandle->pTableIdList, 0); - STable *pTable = pHandle->pTableQueryInfo->pTableObj; + STableQueryInfo* pTableQInfo = taosArrayGet(pHandle->pTableQueryInfo, pHandle->activeIndex); + STable *pTable = pTableQInfo->pTableObj; TSKEY skey = 0, ekey = 0; int32_t rows = 0; @@ -382,14 +387,14 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t *pQueryHandle) { } SDataBlockInfo blockInfo = { - .uid = idInfo->uid, - .sid = idInfo->sid, + .uid = pTable->tableId.uid, + .sid = pTable->tableId.tid, .size = rows, .window = {.skey = skey, .ekey = ekey} }; // update the last key value - pHandle->pTableQueryInfo->lastKey = ekey + 1; + pTableQInfo->lastKey = ekey + 1; return blockInfo; } @@ -427,7 +432,9 @@ static SArray* createTableIdArrayList(struct STsdbRepo* tsdb, int64_t uid) { SSkipListIterator* iter = tSkipListCreateIter(pTable->pIndex); while(tSkipListIterNext(iter)) { - STable* t = *(STable**) tSkipListIterGet(iter); + SSkipListNode* pNode = tSkipListIterGet(iter); + STable* t = *(STable**) SL_GET_NODE_DATA(pNode); + taosArrayPush(pList, &t->tableId); } @@ -696,7 +703,7 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { return true; } -static int32_t mgmtFilterMeterByIndex(STable* pSTable, SArray* pRes, const char* pCond) { +static int32_t doQueryTableList(STable* pSTable, SArray* pRes, const char* pCond) { STColumn* stcol = schemaColAt(pSTable->tagSchema, 0); tSQLBinaryExpr* pExpr = NULL; @@ -736,7 +743,7 @@ SArray *tsdbQueryTableList(struct STsdbRepo* tsdb, int64_t uid, const wchar_t *p STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); assert(pSTable != NULL); - if (mgmtFilterMeterByIndex(pSTable, result, str) == TSDB_CODE_SUCCESS) { + if (doQueryTableList(pSTable, result, str) == TSDB_CODE_SUCCESS) { return result; } }