From c269b6d474cdc4d2d1dc1cd4ca2785b3dfcf471a Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 27 Mar 2020 01:59:26 +0800 Subject: [PATCH] [TD-32] fix error in retrieve data --- src/client/inc/tscUtil.h | 6 +- src/client/inc/tsclient.h | 5 +- src/client/src/tscAsync.c | 4 +- src/client/src/tscLocal.c | 7 +- src/client/src/tscParseInsert.c | 8 +- src/client/src/tscSQLParser.c | 32 +++-- src/client/src/tscSchemaUtil.c | 2 +- src/client/src/tscSecondaryMerge.c | 10 +- src/client/src/tscServer.c | 214 +++++++++++++---------------- src/client/src/tscSql.c | 10 +- src/client/src/tscStream.c | 2 +- src/client/src/tscSub.c | 32 ++--- src/client/src/tscSubquery.c | 50 +++---- src/client/src/tscUtil.c | 46 ++++--- src/dnode/src/dnodeRead.c | 2 +- src/inc/taosmsg.h | 136 +++++++++--------- src/mnode/src/mgmtSuperTable.c | 6 +- src/query/inc/qsqltype.h | 2 +- src/query/inc/queryExecutor.h | 8 +- src/query/src/queryExecutor.c | 136 ++++++++---------- src/vnode/tsdb/src/tsdbMeta.c | 23 +++- src/vnode/tsdb/src/tsdbRead.c | 5 +- 22 files changed, 364 insertions(+), 382 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 25a5db6118..402b6b3706 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -106,7 +106,7 @@ bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo); bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscQueryOnMetric(SSqlCmd* pCmd); -bool tscQueryMetricTags(SQueryInfo* pQueryInfo); +bool tscQueryTags(SQueryInfo* pQueryInfo); bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd); void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, @@ -176,7 +176,7 @@ void tscIncStreamExecutionCount(void* pStream); bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId); // get starter position of metric query condition (query on tags) in SSqlCmd.payload -SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex); +SCond* tsGetSTableQueryCondPos(STagCond* pCond, uint64_t tableIndex); void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); void tscTagCondCopy(STagCond* dest, const STagCond* src); @@ -207,7 +207,7 @@ void tscFreeSubqueryInfo(SSqlCmd* pCmd); void tscClearSubqueryInfo(SSqlCmd* pCmd); void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid); -int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex); +int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index fdc0ae9095..9cb43a9f36 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -70,8 +70,9 @@ typedef struct STableMeta { typedef struct STableMetaInfo { STableMeta * pTableMeta; // table meta, cached in client side and acquried by name - SSuperTableMeta *pMetricMeta; // metricmeta - +// SSuperTableMeta *pMetricMeta; // metricmeta + SArray* vgroupIdList; + /* * 1. keep the vnode index during the multi-vnode super table projection query * 2. keep the vnode index for multi-vnode insertion diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 1b9b0a365f..366faa15dc 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -459,7 +459,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; - code = tscGetMetricMeta(pSql, 0); + code = tscGetSTableVgroupInfo(pSql, 0); pRes->code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; @@ -489,7 +489,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - code = tscGetMetricMeta(pSql, pCmd->clauseIndex); + code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); pRes->code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 46f3ab6687..42b04d7a16 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -286,6 +286,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { // todo add order support static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) { +#if 0 // the result structure has been completed in sql parse, so we // only need to reorganize the results in the column format SSqlCmd * pCmd = &pSql->cmd; @@ -337,6 +338,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) { } } +#endif return 0; } @@ -345,7 +347,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - +#if 0 SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta; int32_t totalNumOfResults = 1; // count function only produce one result int32_t rowLen = tscGetResRowLength(pQueryInfo); @@ -369,7 +371,8 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) { } rowIdx++; } - +#endif + return 0; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 5651e5aa38..9c7a3e0bf0 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1295,11 +1295,11 @@ int tsParseInsertSql(SSqlObj *pSql) { int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { int32_t ret = TSDB_CODE_SUCCESS; - if (NULL == pSql->asyncTblPos) { - tscCleanSqlCmd(&pSql->cmd); - } else { +// if (NULL == pSql->asyncTblPos) { +// tscCleanSqlCmd(&pSql->cmd); +// } else { tscTrace("continue parse sql: %s", pSql->asyncTblPos); - } +// } if (tscIsInsertOrImportData(pSql->sqlstr)) { /* diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 34f043a15b..6d5d08292d 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -209,9 +209,14 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } int32_t code = tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); - assert(pQueryInfo->numOfTables == 0); +// assert(pQueryInfo->numOfTables == 0); - STableMetaInfo* pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); + STableMetaInfo* pTableMetaInfo = NULL; + if (pQueryInfo->numOfTables == 0) { + pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo); + } else { + pTableMetaInfo = &pQueryInfo->pTableMetaInfo[0]; + } pCmd->command = pInfo->type; @@ -639,7 +644,7 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) { * check invalid SQL: * select tbname, tags_fields from super_table_name interval(1s) */ - if (tscQueryMetricTags(pQueryInfo) && pQueryInfo->intervalTime > 0) { + if (tscQueryTags(pQueryInfo) && pQueryInfo->intervalTime > 0) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } @@ -746,7 +751,7 @@ int32_t setMeterID(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlO tscClearMeterMetaInfo(pTableMetaInfo, false); } } else { - assert(pTableMetaInfo->pTableMeta == NULL && pTableMetaInfo->pMetricMeta == NULL); + assert(pTableMetaInfo->pTableMeta == NULL); } tfree(oldName); @@ -1252,7 +1257,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - if (tscQueryMetricTags(pQueryInfo)) { // local handle the metric tag query + if (tscQueryTags(pQueryInfo)) { // local handle the metric tag query pCmd->count = numOfCols; // the number of meter schema, tricky. pQueryInfo->command = TSDB_SQL_RETRIEVE_TAGS; } @@ -1293,7 +1298,7 @@ SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t c int16_t functionId = (int16_t)((colIdx >= numOfCols) ? TSDB_FUNC_TAGPRJ : TSDB_FUNC_PRJ); if (functionId == TSDB_FUNC_TAGPRJ) { - addRequiredTagColumn(pQueryInfo, colIdx - numOfCols, tableIndex); +// addRequiredTagColumn(pQueryInfo, colIdx - numOfCols, tableIndex); pQueryInfo->type = TSDB_QUERY_TYPE_STABLE_QUERY; } else { pQueryInfo->type = TSDB_QUERY_TYPE_PROJECTION_QUERY; @@ -1396,8 +1401,7 @@ static int32_t doAddProjectionExprAndResultFields(SQueryInfo* pQueryInfo, SColum SColumnList ids = {0}; ids.ids[0] = *pIndex; - // tag columns do not add to source list - ids.num = (j >= tscGetNumOfColumns(pTableMeta)) ? 0 : 1; + ids.num = 1; insertResultField(pQueryInfo, startPos + j, &ids, pSchema[j].bytes, pSchema[j].type, pSchema[j].name, pExpr); } @@ -4666,14 +4670,13 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* * And then launching multiple async-queries against all qualified virtual nodes, during the first-stage * query operation. */ - int32_t code = tscGetMetricMeta(pSql, clauseIndex); + int32_t code = tscGetSTableVgroupInfo(pSql, clauseIndex); if (code != TSDB_CODE_SUCCESS) { return code; } // No tables included. No results generated. Query results are empty. - SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; - if (pTableMetaInfo->pTableMeta == NULL || pMetricMeta == NULL || pMetricMeta->numOfTables == 0) { + if (pTableMetaInfo->pTableMeta == NULL) { tscTrace("%p no table in metricmeta, no output result", pSql); pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; } @@ -5687,6 +5690,13 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { } assert(pQueryInfo->numOfTables == pQuerySql->from->nExpr); + + if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { + int32_t code = tscGetSTableVgroupInfo(pSql, index); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } // parse the group by clause in the first place if (parseGroupbyClause(pQueryInfo, pQuerySql->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscSchemaUtil.c b/src/client/src/tscSchemaUtil.c index 66776e36f7..2492352284 100644 --- a/src/client/src/tscSchemaUtil.c +++ b/src/client/src/tscSchemaUtil.c @@ -32,7 +32,7 @@ int32_t tscGetNumOfTags(const STableMeta* pTableMeta) { } if (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE) { - assert(tinfo.numOfTags > 0); + assert(tinfo.numOfTags >= 0); return tinfo.numOfTags; } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 0ee8afd53c..d69f6d295f 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -605,7 +605,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes); +// (*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes); if (*pMemBuffer == NULL) { tscError("%p failed to allocate memory", pSql); pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -636,10 +636,10 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity); - for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) { - (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); - (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; - } +// for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) { +// (*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel); +// (*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL; +// } if (createOrderDescriptor(pOrderDesc, pCmd, pModel) != TSDB_CODE_SUCCESS) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 5826d02cf2..b2bd9110e7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -35,7 +35,6 @@ SRpcIpSet tscDnodeIpSet; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); -void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf); void tscProcessActivityTimer(void *handle, void *tmrId); int tscKeepConn[TSDB_SQL_MAX] = {0}; TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid); @@ -188,7 +187,7 @@ int tscSendMsgToServer(SSqlObj *pSql) { .pCont = pMsg, .contLen = pSql->cmd.payloadLen, .handle = pSql, - .code = 0 + .code = 0 }; rpcSendRequest(pVnodeConn, pSql->ipList, &rpcMsg); } else { @@ -315,7 +314,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) { pRes->numOfRows += pMsg->affectedRows; tscTrace("%p cmd:%d code:%d, inserted rows:%d, rsp len:%d", pSql, pCmd->command, pRes->code, - *(int32_t *)pRes->pRsp, pRes->rspLen); + pMsg->affectedRows, pRes->rspLen); } else { tscTrace("%p cmd:%d code:%s rsp len:%d", pSql, pCmd->command, tstrerror(pRes->code), pRes->rspLen); } @@ -361,7 +360,7 @@ int doProcessSql(SSqlObj *pSql) { pCmd->command == TSDB_SQL_CONNECT || pCmd->command == TSDB_SQL_HB || pCmd->command == TSDB_SQL_META || - pCmd->command == TSDB_SQL_METRIC) { + pCmd->command == TSDB_SQL_STABLEVGROUP) { tscBuildMsg[pCmd->command](pSql, NULL); } @@ -509,22 +508,6 @@ int tscBuildRetrieveMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { - //SSubmitMsg *pShellMsg; - //char * pMsg; - //STableMetaInfo * pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); - - //STableMeta *pTableMeta = pTableMetaInfo->pTableMeta; - - //pMsg = buf + tsRpcHeadSize; - - //TODO set iplist - //pShellMsg = (SSubmitMsg *)pMsg; - //pShellMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode); - //tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pTableMeta->vpeerDesc[pSql->index].ip), - // htons(pShellMsg->vnode)); -} - int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SSubmitMsg *pShellMsg; char * pMsg, *pStart; @@ -554,24 +537,6 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { - //TODO -// SSqlCmd * pCmd = &pSql->cmd; -// STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); -// -// char * pStart = buf + tsRpcHeadSize; -// SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart; -// -// if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // pColumnModel == NULL, query on meter -// STableMeta *pTableMeta = pTableMetaInfo->pTableMeta; -// pQueryMsg->vnode = htons(pTableMeta->vpeerDesc[pSql->index].vnode); -// } else { // query on metric -// SSuperTableMeta * pMetricMeta = pTableMetaInfo->pMetricMeta; -// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); -// pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode); -// } -} - /* * for meter query, simply return the size <= 1k * for metric query, estimate size according to meter tags @@ -589,7 +554,10 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { return MIN_QUERY_MSG_PKT_SIZE + minMsgSize() + sizeof(SQueryTableMsg) + srcColListSize + exprSize; } - + + int32_t size = 4096; + +#if 0 SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); @@ -600,45 +568,23 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd, int32_t clauseIndex) { if (pQueryInfo->tsBuf != NULL) { size += pQueryInfo->tsBuf->fileSize; } - +#endif + return size; } -static char *doSerializeTableInfo(SSqlObj *pSql, int32_t numOfTables, int32_t vgId, char *pMsg) { +static char *doSerializeTableInfo(SSqlObj *pSql, int32_t vgId, char *pMsg) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; - - tscTrace("%p vgId:%d, query on %d tables", pSql, vgId, numOfTables); - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { -#ifdef _DEBUG_VIEW - tscTrace("%p sid:%d, uid:%" PRIu64, pSql, pTableMetaInfo->pTableMeta->sid, pTableMetaInfo->pTableMeta->uid); -#endif - STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; - pTableIdInfo->sid = htonl(pTableMeta->sid); - pTableIdInfo->uid = htobe64(pTableMeta->uid); - pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); - pMsg += sizeof(STableIdInfo); - } else { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); - - for (int32_t i = 0; i < numOfTables; ++i) { - STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; - STableIdInfo *pQueryMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); - - pTableIdInfo->sid = htonl(pQueryMeterInfo->sid); - pTableIdInfo->uid = htobe64(pQueryMeterInfo->uid); - pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid)); - - pMsg += sizeof(STableIdInfo); - -#ifdef _DEBUG_VIEW - tscTrace("%p sid:%d, uid:%" PRId64, pSql, pQueryMeterInfo->sid, pQueryMeterInfo->uid); -#endif - } - } - + tscTrace("%p vgId:%d, query on table:%s, uid:%" PRIu64, pSql, vgId, pTableMetaInfo->name, pTableMeta->uid); + + STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; + pTableIdInfo->sid = htonl(pTableMeta->sid); + pTableIdInfo->uid = htobe64(pTableMeta->uid); + pTableIdInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pTableMeta->uid)); + + pMsg += sizeof(STableIdInfo); return pMsg; } @@ -655,7 +601,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; +// SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta; if (pQueryInfo->colList.numOfCols <= 0) { tscError("%p illegal value of numOfCols in query msg: %d", pSql, tscGetNumOfColumns(pTableMeta)); @@ -678,7 +624,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscError("%p error vnodeIdx:%d", pSql, pTableMetaInfo->vnodeIndex); return -1; } - + + uint32_t vnodeId = 1; + +#if 0 SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pTableMetaInfo->vnodeIndex); uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode; @@ -687,9 +636,11 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { tscError("%p vid:%d,error numOfTables in query message:%d", pSql, vnodeId, numOfTables); return -1; // error } - +#endif + tscTrace("%p query on vid:%d, number of tables:%d", pSql, vnodeId, numOfTables); - pQueryMsg->head.vgId = htons(vnodeId); + pQueryMsg->head.vgId = htonl(vnodeId); + numOfTables = 1; } pQueryMsg->numOfTables = htonl(numOfTables); @@ -723,24 +674,23 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); - if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { // query on meter pQueryMsg->tagLength = 0; } else { // query on super table - pQueryMsg->tagLength = htons(pMetricMeta->tagLen); + pQueryMsg->tagLength = htons(0); } pQueryMsg->queryType = htons(pQueryInfo->type); pQueryMsg->numOfOutputCols = htons(pQueryInfo->exprsInfo.numOfExprs); - if (pQueryInfo->fieldsInfo.numOfOutputCols < 0) { - tscError("%p illegal value of number of output columns in query msg: %d", pSql, - pQueryInfo->fieldsInfo.numOfOutputCols); + int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutputCols; + if (numOfOutput < 0) { + tscError("%p illegal value of number of output columns in query msg: %d", pSql, numOfOutput); return -1; } // set column list ids - char * pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo); + char *pMsg = (char *)(pQueryMsg->colList) + pQueryInfo->colList.numOfCols * sizeof(SColumnInfo); SSchema *pSchema = tscGetTableSchema(pTableMeta); for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) { @@ -848,7 +798,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->colNameLen = htonl(len); // serialize the table info (sid, uid, tags) - pMsg = doSerializeTableInfo(pSql, numOfTables, htons(pQueryMsg->head.vgId), pMsg); + pMsg = doSerializeTableInfo(pSql, htons(pQueryMsg->head.vgId), pMsg); SSqlGroupbyExpr *pGroupbyExpr = &pQueryInfo->groupbyExpr; if (pGroupbyExpr->numOfGroupCols != 0) { @@ -1443,11 +1393,15 @@ int tscProcessTagRetrieveRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); int32_t numOfRes = 0; +#if 0 if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_TAGPRJ) { numOfRes = pTableMetaInfo->pMetricMeta->numOfTables; } else { numOfRes = 1; // for count function, there is only one output. } + +#endif + return tscLocalResultCommonBuilder(pSql, numOfRes); } @@ -1533,8 +1487,7 @@ int tscBuildTableMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sizeof(STagData); } - msgLen = pMsg - (char*)pInfoMsg; - pCmd->payloadLen = msgLen; + pCmd->payloadLen = pMsg - (char*)pInfoMsg;; pCmd->msgType = TSDB_MSG_TYPE_CM_TABLE_META; tfree(tmpData); @@ -1608,7 +1561,9 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) { return MAX(len, TSDB_DEFAULT_PAYLOAD_SIZE); } -int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { +int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { + +#if 0 SSuperTableMetaMsg *pMetaMsg; char * pMsg, *pStart; int msgLen = 0; @@ -1671,13 +1626,13 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { // convert to unicode before sending to mnode for metric query int32_t condLen = 0; if (pTagCond->numOfTagCond > 0) { - SCond *pCond = tsGetMetricQueryCondPos(pTagCond, uid); + SCond *pCond = tsGetSTableQueryCondPos(pTagCond, uid); if (pCond != NULL && pCond->cond != NULL) { condLen = strlen(pCond->cond) + 1; bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE); if (!ret) { - tscError("%p mbs to ucs4 failed:%s", pSql, tsGetMetricQueryCondPos(pTagCond, uid)); + tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCondPos(pTagCond, uid)); return 0; } } @@ -1749,7 +1704,18 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pCmd->payloadLen = msgLen; pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP; assert(msgLen + minMsgSize() <= size); +#endif + SSqlCmd *pCmd = &pSql->cmd; + + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + + SCMSTableVgroupMsg *pStableVgroupMsg = (SCMSTableVgroupMsg *) pCmd->payload; + strncpy(pStableVgroupMsg->tableId, pTableMetaInfo->name, tListLen(pStableVgroupMsg->tableId)); + + pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP; + pCmd->payloadLen = sizeof(SCMSTableVgroupMsg); + return TSDB_CODE_SUCCESS; } @@ -1989,22 +1955,11 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -int tscProcessMetricMetaRsp(SSqlObj *pSql) { - SSuperTableMeta *pMeta; - uint8_t ieType; +int tscProcessSTableVgroupRsp(SSqlObj *pSql) { +#if 0 void ** metricMetaList = NULL; int32_t * sizes = NULL; - - char *rsp = pSql->res.pRsp; - - ieType = *rsp; - if (ieType != TSDB_IE_TYPE_META) { - tscError("invalid ie type:%d", ieType); - return TSDB_CODE_INVALID_IE; - } - - rsp++; - + int32_t num = htons(*(int16_t *)rsp); rsp += sizeof(int16_t); @@ -2088,7 +2043,6 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) { // release the used metricmeta taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false); - pTableMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i], sizes[i], tsMetricMetaKeepTimer); tfree(metricMetaList[i]); @@ -2108,7 +2062,23 @@ _error_clean: free(sizes); free(metricMetaList); - +#endif + + SCMSTableVgroupRspMsg *pStableVgroup = (SCMSTableVgroupRspMsg *)pSql->res.pRsp; + pStableVgroup->numOfDnodes = htonl(pStableVgroup->numOfDnodes); + + SSqlObj* pparent = pSql->param; + assert(pparent != NULL); + + SSqlCmd* pCmd = &pparent->cmd; + STableMetaInfo* pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); + pInfo->vgroupIdList = taosArrayInit(pStableVgroup->numOfDnodes, sizeof(int32_t)); + + // todo opt performance + for(int32_t i = 0; i < pStableVgroup->numOfDnodes; ++i) { + taosArrayPush(pInfo->vgroupIdList, &pStableVgroup->dnodeIps[i]); + } + return pSql->res.code; } @@ -2234,7 +2204,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { if (pTableMetaInfo->pTableMeta) { taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); - taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true); +// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true); } return 0; @@ -2255,7 +2225,7 @@ int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { bool isSuperTable = UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo); taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pTableMeta), true); - taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true); +// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), true); if (isSuperTable) { // if it is a super table, reset whole query cache tscTrace("%p reset query cache since table:%s is stable", pSql, pTableMetaInfo->name); @@ -2459,30 +2429,34 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) { return code; } -int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { +int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { int code = TSDB_CODE_NETWORK_UNAVAIL; SSqlCmd *pCmd = &pSql->cmd; - /* - * the query condition is serialized into pCmd->payload, we need to rebuild key for metricmeta info in cache. - */ - bool required = false; + //the query condition is serialized into pCmd->payload, we need to rebuild key for stable meta info in cache. + bool required = false; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); + if (pQueryInfo->pTableMetaInfo[0]->vgroupIdList != NULL) { + return TSDB_CODE_SUCCESS; + } + +#if 0 + for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { char tagstr[TSDB_MAX_TAGS_LEN + 1] = {0}; STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pTableMetaInfo->pTableMeta->uid); - taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false); +// taosCacheRelease(tscCacheHandle, (void **)&(pTableMetaInfo->pMetricMeta), false); SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr); if (ppMeta == NULL) { required = true; break; } else { - pTableMetaInfo->pMetricMeta = ppMeta; +// pTableMetaInfo->pMetricMeta = ppMeta; } } @@ -2490,12 +2464,13 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { if (!required) { return TSDB_CODE_SUCCESS; } - +#endif + SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - pNew->cmd.command = TSDB_SQL_METRIC; + pNew->cmd.command = TSDB_SQL_STABLEVGROUP; SQueryInfo *pNewQueryInfo = NULL; if ((code = tscGetQueryInfoDetailSafely(&pNew->cmd, 0, &pNewQueryInfo)) != TSDB_CODE_SUCCESS) { @@ -2532,7 +2507,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { // tscFreeSubqueryInfo(pCmd); // } - tscTrace("%p allocate new pSqlObj:%p to get metricMeta", pSql, pNew); + tscTrace("%p allocate new pSqlObj:%p to get stable vgroupInfo", pSql, pNew); pNew->fp = tscTableMetaCallBack; pNew->param = pSql; code = tscProcessSql(pNew); @@ -2569,7 +2544,7 @@ void tscInitMsgsFp() { tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg; tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg; tscBuildMsg[TSDB_SQL_META] = tscBuildTableMetaMsg; - tscBuildMsg[TSDB_SQL_METRIC] = tscBuildMetricMetaMsg; + tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg; tscBuildMsg[TSDB_SQL_MULTI_META] = tscBuildMultiMeterMetaMsg; tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg; @@ -2587,7 +2562,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp; tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp; tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp; - tscProcessMsgRsp[TSDB_SQL_METRIC] = tscProcessMetricMetaRsp; + tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp; tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp; tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; @@ -2613,7 +2588,4 @@ void tscInitMsgsFp() { tscKeepConn[TSDB_SQL_SELECT] = 1; tscKeepConn[TSDB_SQL_FETCH] = 1; tscKeepConn[TSDB_SQL_HB] = 1; - - tscUpdateVnodeMsg[TSDB_SQL_SELECT] = tscUpdateVnodeInQueryMsg; - tscUpdateVnodeMsg[TSDB_SQL_INSERT] = tscUpdateVnodeInSubmitMsg; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index eb46d6101c..e4b07a0f64 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -546,11 +546,11 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are * available, goes on */ - if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && - (!tscHasReachLimitation(pQueryInfo1, pRes1))) { - allSubqueryExhausted = false; - break; - } +// if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && +// (!tscHasReachLimitation(pQueryInfo1, pRes1))) { +// allSubqueryExhausted = false; +// break; +// } } hasData = !allSubqueryExhausted; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 0b464c362b..4fadad5021 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -80,7 +80,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == 0 && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - code = tscGetMetricMeta(pSql, 0); + code = tscGetSTableVgroupInfo(pSql, 0); pSql->res.code = code; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 014b0c5cb7..616bcbba50 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -178,11 +178,11 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0); int numOfTables = 0; if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) { - SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; - for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); - numOfTables += pVnodeSidList->numOfSids; - } +// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; +// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { +// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); +// numOfTables += pVnodeSidList->numOfSids; +// } } SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfTables, sizeof(SSubscriptionProgress)); @@ -197,17 +197,17 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { progress[0].uid = uid; progress[0].key = tscGetSubscriptionProgress(pSub, uid); } else { - SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; - numOfTables = 0; - for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); - for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { - STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j); - int64_t uid = pTableMetaInfo->uid; - progress[numOfTables].uid = uid; - progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid); - } - } +// SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta; +// numOfTables = 0; +// for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { +// SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); +// for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { +// STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j); +// int64_t uid = pTableMetaInfo->uid; +// progress[numOfTables].uid = uid; +// progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid); +// } +// } qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1df977d996..21ce270466 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -476,7 +476,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for projection query, need to try next vnode - int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; +// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; + int32_t totalVnode = 0; if ((++pTableMetaInfo->vnodeIndex) < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, pTableMetaInfo->vnodeIndex - 1, pTableMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotal); @@ -541,16 +542,16 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { assert(pQueryInfo->numOfTables == 1); // for projection query, need to try next vnode if current vnode is exhausted - if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { - pSupporter->pState->numOfCompleted = 0; - pSupporter->pState->numOfTotal = 1; - - pSql->cmd.command = TSDB_SQL_SELECT; - pSql->fp = tscJoinQueryCallback; - tscProcessSql(pSql); - - return; - } +// if ((++pTableMetaInfo->vnodeIndex) < pTableMetaInfo->pMetricMeta->numOfVnodes) { +// pSupporter->pState->numOfCompleted = 0; +// pSupporter->pState->numOfTotal = 1; +// +// pSql->cmd.command = TSDB_SQL_SELECT; +// pSql->fp = tscJoinQueryCallback; +// tscProcessSql(pSql); +// +// return; +// } } int32_t numOfTotal = pSupporter->pState->numOfTotal; @@ -608,10 +609,10 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && - (!tscHasReachLimitation(pQueryInfo, pRes))) { - numOfFetch++; - } +// if (pRes->row >= pRes->numOfRows && pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes && +// (!tscHasReachLimitation(pQueryInfo, pRes))) { +// numOfFetch++; +// } } else { if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pQueryInfo, pRes))) { numOfFetch++; @@ -788,7 +789,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { * data instead of returning to its invoker */ if (pTableMetaInfo->vnodeIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { - assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); +// assert(pTableMetaInfo->vnodeIndex < pTableMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value pSql->fp = joinRetrieveCallback; // continue retrieve data @@ -1009,7 +1010,9 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes; + + int32_t numOfSubQueries = 0; +// int32_t numOfSubQueries = pTableMetaInfo->pMetricMeta->numOfVnodes; assert(numOfSubQueries > 0); int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize); @@ -1260,7 +1263,8 @@ static void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfR STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); +// SVnodeSidList *vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); + SVnodeSidList *vnodeInfo = 0; SVnodeDesc * pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; if (numOfRows > 0) { @@ -1409,10 +1413,10 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SVnodeSidList *vnodeInfo = NULL; SVnodeDesc * pSvd = NULL; - if (pTableMetaInfo->pMetricMeta != NULL) { - vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); - pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; - } +// if (pTableMetaInfo->pMetricMeta != NULL) { +// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx); +// pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index]; +// } SSubqueryState* pState = trsupport->pState; assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 && @@ -1453,7 +1457,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; } else { SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); - assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL); +// assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL && pNewQueryInfo->pTableMetaInfo[0]->pMetricMeta != NULL); tscProcessSql(pNew); return; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index e33fdc70a4..8df84744ce 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -53,7 +53,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) { const int32_t maxKeySize = TSDB_MAX_TAGS_LEN; // allowed max key size - SCond* cond = tsGetMetricQueryCondPos(pTagCond, uid); + SCond* cond = tsGetSTableQueryCondPos(pTagCond, uid); char join[512] = {0}; if (pTagCond->joinInfo.hasJoin) { @@ -92,7 +92,7 @@ void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* str, uint64_t uid) { free(tmp); } -SCond* tsGetMetricQueryCondPos(STagCond* pTagCond, uint64_t uid) { +SCond* tsGetSTableQueryCondPos(STagCond* pTagCond, uint64_t uid) { for (int32_t i = 0; i < TSDB_MAX_JOIN_TABLE_NUM; ++i) { if (uid == pTagCond->cond[i].uid) { return &pTagCond->cond[i]; @@ -122,7 +122,7 @@ bool tscQueryOnMetric(SSqlCmd* pCmd) { (pCmd->msgType == TSDB_MSG_TYPE_QUERY); } -bool tscQueryMetricTags(SQueryInfo* pQueryInfo) { +bool tscQueryTags(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { if (tscSqlExprGet(pQueryInfo, i)->functionId != TSDB_FUNC_TAGPRJ) { return false; @@ -172,6 +172,7 @@ void tscGetDBInfoFromMeterId(char* tableId, char* db) { } SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx) { +#if 0 if (pMetricmeta == NULL) { tscError("illegal metricmeta"); return 0; @@ -189,6 +190,8 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx } return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta); +#endif + } STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { @@ -221,12 +224,12 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { // for select query super table, the metricmeta can not be null in any cases. if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - assert(pTableMetaInfo->pMetricMeta != NULL); +// assert(pTableMetaInfo->pMetricMeta != NULL); } - if (pTableMetaInfo->pMetricMeta == NULL) { - return false; - } +// if (pTableMetaInfo->pMetricMeta == NULL) { +// return false; +// } if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) { return false; @@ -259,12 +262,12 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { } // only query on tag, not a projection query - if (tscQueryMetricTags(pQueryInfo)) { + if (tscQueryTags(pQueryInfo)) { return false; } // for project query, only the following two function is allowed - for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { + for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { int32_t functionId = tscSqlExprGet(pQueryInfo, i)->functionId; if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) { @@ -1925,7 +1928,7 @@ STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST } pTableMetaInfo->pTableMeta = pTableMeta; - pTableMetaInfo->pMetricMeta = pMetricMeta; +// pTableMetaInfo->pMetricMeta = pMetricMeta; pTableMetaInfo->numOfTags = numOfTags; if (tags != NULL) { @@ -1975,7 +1978,7 @@ void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) } taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); - taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache); +// taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pMetricMeta), removeFromCache); } void tscResetForNextRetrieve(SSqlRes* pRes) { @@ -2114,15 +2117,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); - SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta); +// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta); - pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags, - pTableMetaInfo->tagColumnIndex); +// pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags, +// pTableMetaInfo->tagColumnIndex); } assert(pFinalInfo->pTableMeta != NULL && pNewQueryInfo->numOfTables == 1); if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) { - assert(pFinalInfo->pMetricMeta != NULL); +// assert(pFinalInfo->pMetricMeta != NULL); } tscTrace( @@ -2222,13 +2225,13 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { +// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { return false; - } +// } - int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && - (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1); +// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; +// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && +// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vnodeIndex < totalVnode - 1); } void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { @@ -2244,7 +2247,8 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; + int32_t totalVnode = 0; +// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; while (++pTableMetaInfo->vnodeIndex < totalVnode) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index a23336630e..965727c9d0 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -261,7 +261,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { SQInfo* pQInfo = NULL; if (pMsg->contLen != 0) { void* tsdb = dnodeGetVnodeTsdb(pVnode); - int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, NULL, &pQInfo); + int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 838b43da1e..1345d80fe5 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -28,83 +28,83 @@ extern "C" { #include "trpc.h" // message type -#define TSDB_MSG_TYPE_REG 1 -#define TSDB_MSG_TYPE_REG_RSP 2 -#define TSDB_MSG_TYPE_SUBMIT 3 -#define TSDB_MSG_TYPE_SUBMIT_RSP 4 -#define TSDB_MSG_TYPE_QUERY 5 -#define TSDB_MSG_TYPE_QUERY_RSP 6 -#define TSDB_MSG_TYPE_RETRIEVE 7 -#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 +#define TSDB_MSG_TYPE_REG 1 +#define TSDB_MSG_TYPE_REG_RSP 2 +#define TSDB_MSG_TYPE_SUBMIT 3 +#define TSDB_MSG_TYPE_SUBMIT_RSP 4 +#define TSDB_MSG_TYPE_QUERY 5 +#define TSDB_MSG_TYPE_QUERY_RSP 6 +#define TSDB_MSG_TYPE_RETRIEVE 7 +#define TSDB_MSG_TYPE_RETRIEVE_RSP 8 // message from mnode to dnode -#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 +#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9 #define TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP 10 -#define TSDB_MSG_TYPE_MD_DROP_TABLE 11 -#define TSDB_MSG_TYPE_MD_DROP_TABLE_RSP 12 -#define TSDB_MSG_TYPE_MD_ALTER_TABLE 13 -#define TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP 14 -#define TSDB_MSG_TYPE_MD_CREATE_VNODE 15 +#define TSDB_MSG_TYPE_MD_DROP_TABLE 11 +#define TSDB_MSG_TYPE_MD_DROP_TABLE_RSP 12 +#define TSDB_MSG_TYPE_MD_ALTER_TABLE 13 +#define TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP 14 +#define TSDB_MSG_TYPE_MD_CREATE_VNODE 15 #define TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP 16 -#define TSDB_MSG_TYPE_MD_DROP_VNODE 17 -#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18 -#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19 -#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20 -#define TSDB_MSG_TYPE_MD_DROP_STABLE 21 -#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22 -#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23 +#define TSDB_MSG_TYPE_MD_DROP_VNODE 17 +#define TSDB_MSG_TYPE_MD_DROP_VNODE_RSP 18 +#define TSDB_MSG_TYPE_MD_ALTER_VNODE 19 +#define TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP 20 +#define TSDB_MSG_TYPE_MD_DROP_STABLE 21 +#define TSDB_MSG_TYPE_MD_DROP_STABLE_RSP 22 +#define TSDB_MSG_TYPE_MD_ALTER_STREAM 23 #define TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP 24 -#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25 +#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25 #define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26 // message from client to mnode -#define TSDB_MSG_TYPE_CM_CONNECT 31 -#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32 -#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33 -#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34 -#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35 -#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36 -#define TSDB_MSG_TYPE_CM_DROP_ACCT 37 -#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38 -#define TSDB_MSG_TYPE_CM_CREATE_USER 39 -#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40 -#define TSDB_MSG_TYPE_CM_ALTER_USER 41 -#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42 -#define TSDB_MSG_TYPE_CM_DROP_USER 43 -#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44 -#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45 +#define TSDB_MSG_TYPE_CM_CONNECT 31 +#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32 +#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33 +#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34 +#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35 +#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36 +#define TSDB_MSG_TYPE_CM_DROP_ACCT 37 +#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38 +#define TSDB_MSG_TYPE_CM_CREATE_USER 39 +#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40 +#define TSDB_MSG_TYPE_CM_ALTER_USER 41 +#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42 +#define TSDB_MSG_TYPE_CM_DROP_USER 43 +#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44 +#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45 #define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46 -#define TSDB_MSG_TYPE_CM_DROP_DNODE 47 -#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 -#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE +#define TSDB_MSG_TYPE_CM_DROP_DNODE 47 +#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48 +#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE #define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP -#define TSDB_MSG_TYPE_CM_CREATE_DB 49 -#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 -#define TSDB_MSG_TYPE_CM_DROP_DB 51 -#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52 -#define TSDB_MSG_TYPE_CM_USE_DB 53 -#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54 -#define TSDB_MSG_TYPE_CM_ALTER_DB 55 -#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56 -#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57 +#define TSDB_MSG_TYPE_CM_CREATE_DB 49 +#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50 +#define TSDB_MSG_TYPE_CM_DROP_DB 51 +#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52 +#define TSDB_MSG_TYPE_CM_USE_DB 53 +#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54 +#define TSDB_MSG_TYPE_CM_ALTER_DB 55 +#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56 +#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57 #define TSDB_MSG_TYPE_CM_CREATE_TABLE_RSP 58 -#define TSDB_MSG_TYPE_CM_DROP_TABLE 59 -#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60 -#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61 -#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62 -#define TSDB_MSG_TYPE_CM_TABLE_META 63 -#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64 -#define TSDB_MSG_TYPE_CM_STABLE_VGROUP 65 +#define TSDB_MSG_TYPE_CM_DROP_TABLE 59 +#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60 +#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61 +#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62 +#define TSDB_MSG_TYPE_CM_TABLE_META 63 +#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64 +#define TSDB_MSG_TYPE_CM_STABLE_VGROUP 65 #define TSDB_MSG_TYPE_CM_STABLE_VGROUP_RSP 66 -#define TSDB_MSG_TYPE_CM_TABLES_META 67 -#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68 -#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69 +#define TSDB_MSG_TYPE_CM_TABLES_META 67 +#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68 +#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69 #define TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP 70 -#define TSDB_MSG_TYPE_CM_SHOW 71 -#define TSDB_MSG_TYPE_CM_SHOW_RSP 72 -#define TSDB_MSG_TYPE_CM_KILL_QUERY 73 -#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 -#define TSDB_MSG_TYPE_CM_KILL_STREAM 75 +#define TSDB_MSG_TYPE_CM_SHOW 71 +#define TSDB_MSG_TYPE_CM_SHOW_RSP 72 +#define TSDB_MSG_TYPE_CM_KILL_QUERY 73 +#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74 +#define TSDB_MSG_TYPE_CM_KILL_STREAM 75 #define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76 #define TSDB_MSG_TYPE_CM_KILL_CONN 77 #define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78 @@ -622,14 +622,14 @@ typedef struct { char tableIds[]; } SCMMultiTableInfoMsg; -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; -} SCMSuperTableInfoMsg; +typedef struct SCMSTableVgroupMsg { + char tableId[TSDB_TABLE_ID_LEN]; +} SCMSTableVgroupMsg; typedef struct { int32_t numOfDnodes; uint32_t dnodeIps[]; -} SCMSuperTableInfoRsp; +} SCMSTableVgroupRspMsg; typedef struct { int16_t elemLen; diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index 44e8014dad..306d414cbc 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -216,7 +216,7 @@ void* mgmtGetSuperTable(char *tableId) { } static void *mgmtGetSuperTableVgroup(SSuperTableObj *pStable) { - SCMSuperTableInfoRsp *rsp = rpcMallocCont(sizeof(SCMSuperTableInfoRsp) + sizeof(uint32_t) * mgmtGetDnodesNum()); + SCMSTableVgroupRspMsg *rsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * mgmtGetDnodesNum()); rsp->numOfDnodes = htonl(1); rsp->dnodeIps[0] = htonl(inet_addr(tsPrivateIp)); return rsp; @@ -628,14 +628,14 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable) { } static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { - SCMSuperTableInfoMsg *pInfo = pMsg->pCont; + SCMSTableVgroupMsg *pInfo = pMsg->pCont; STableInfo *pTable = mgmtGetSuperTable(pInfo->tableId); if (pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } - SCMSuperTableInfoRsp *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); + SCMSTableVgroupRspMsg *pRsp = mgmtGetSuperTableVgroup((SSuperTableObj *) pTable); if (pRsp != NULL) { int32_t msgLen = sizeof(SSuperTableObj) + htonl(pRsp->numOfDnodes) * sizeof(int32_t); SRpcMsg rpcRsp = {0}; diff --git a/src/query/inc/qsqltype.h b/src/query/inc/qsqltype.h index 5d85ab4b80..ad9affa1cc 100644 --- a/src/query/inc/qsqltype.h +++ b/src/query/inc/qsqltype.h @@ -54,7 +54,7 @@ enum _sql_type { TSDB_SQL_CONNECT, TSDB_SQL_USE_DB, // 30 TSDB_SQL_META, - TSDB_SQL_METRIC, + TSDB_SQL_STABLEVGROUP, TSDB_SQL_MULTI_META, TSDB_SQL_HB, diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 16f2006d6f..dc7aff105f 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -203,7 +203,7 @@ typedef struct SQInfo { * @param pQInfo * @return */ -int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* param, SQInfo** pQInfo); +int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, SQInfo** pQInfo); /** * query on single table @@ -211,12 +211,6 @@ int32_t qCreateQueryInfo(void* pVnode, SQueryTableMsg* pQueryTableMsg, void* par */ void qTableQuery(SQInfo* pQInfo); -/** - * query on super table - * @param pReadMsg - */ -void qSuperTableQuery(void* pReadMsg); - /** * wait for the query completed, and retrieve final results to client * @param pQInfo diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index e9923c6fd8..a1167ff440 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -5128,137 +5128,99 @@ static void tableIntervalProcessor(SQInfo *pQInfo) { // pQInfo->size - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } -void qTableQuery(SQInfo *pQInfo) { - if (pQInfo == NULL || pQInfo->signature != pQInfo) { - dTrace("%p freed abort query", pQInfo); - return; - } - - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - - SQuery *pQuery = pRuntimeEnv->pQuery; - if (isQueryKilled(pQInfo)) { - dTrace("QInfo:%p it is already killed, abort", pQInfo); - return; - } +static void singleTableQueryImpl(SQInfo* pQInfo) { + SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; - dTrace("QInfo:%p query task is launched", pQInfo); - if (vnodeHasRemainResults(pQInfo)) { /* * There are remain results that are not returned due to result interpolation * So, we do keep in this procedure instead of launching retrieve procedure for next results. */ int32_t numOfInterpo = 0; - int32_t remain = taosNumOfRemainPoints(&pRuntimeEnv->interpoInfo); pQuery->rec.size = vnodeQueryResultInterpolate(pQInfo, (tFilePage **)pQuery->sdata, - (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); - + (tFilePage **)pRuntimeEnv->pInterpoBuf, remain, &numOfInterpo); + doRevisedResultsByLimit(pQInfo); - + pQInfo->pointsInterpo += numOfInterpo; pQuery->rec.size += pQuery->rec.size; - - // dTrace("QInfo:%p %d points returned %d points interpo, totalRead:%d totalInterpo:%d totalReturn:%d", - // pQInfo, pQuery->size, numOfInterpo, pQInfo->size, pQInfo->pointsInterpo, - // pQInfo->pointsReturned); + + dTrace("QInfo:%p current:%d returned, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); sem_post(&pQInfo->dataReady); return; } - + // here we have scan all qualified data in both data file and cache if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { // continue to get push data from the group result if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || - (pQuery->intervalTime > 0 && pQuery->rec.total < pQuery->limit.limit)) { + ((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) { + // todo limit the output for interval query? pQuery->rec.size = 0; pQInfo->subgroupIdx = 0; // always start from 0 - + if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.size += pQuery->rec.size; - + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); - + if (pQuery->rec.size > 0) { - // dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d - // totalReturn:%d", - // pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->size, - // pQInfo->size, pQInfo->pointsInterpo, pQInfo->pointsReturned); - + dTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.size, pQuery->rec.total); sem_post(&pQInfo->dataReady); return; } } } - - // dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, - // pMeterObj->sid, - // pMeterObj->meterId, pQInfo->size); - + + dTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total); // vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); return; } - + // number of points returned during this query pQuery->rec.size = 0; - int64_t st = taosGetTimestampUs(); - + // group by normal column, sliding window query, interval query are handled by interval query processor if (pQuery->intervalTime != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) - // assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead); tableIntervalProcessor(pQInfo); } else { if (isFixedOutputQuery(pQuery)) { assert(pQuery->checkBufferInLoop == 0); - + tableFixedOutputProcessor(pQInfo); } else { // diff/add/multiply/subtract/division assert(pQuery->checkBufferInLoop == 1); tableMultiOutputProcessor(pQInfo); } } - + // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); - + /* check if query is killed or not */ if (isQueryKilled(pQInfo)) { dTrace("QInfo:%p query is killed", pQInfo); } else { dTrace("QInfo:%p query task completed, %d points are returned", pQInfo, pQuery->rec.size); } - + sem_post(&pQInfo->dataReady); - // vnodeDecRefCount(pQInfo); } -void qSuperTableQuery(void *pReadMsg) { -// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; -// -// if (pQInfo == NULL) { -// return; -// } - -// if (pQInfo->killed) { -// vnodeDecRefCount(pQInfo); -// dTrace("QInfo:%p it is already killed, abort", pQInfo); -// return; -// } - -// assert(pQInfo->refCount >= 1); -#if 0 - SQuery *pQuery = &pQInfo->runtimeEnv.pQuery; +void multiTableQueryImpl(SQInfo* pQInfo) { + SQuery* pQuery = pQInfo->runtimeEnv.pQuery; pQuery->rec.size = 0; int64_t st = taosGetTimestampUs(); + if (pQuery->intervalTime > 0 || (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr))) { - assert(pQuery->checkBufferInLoop == 0); vnodeMultiMeterQueryProcessor(pQInfo); } else { assert((pQuery->checkBufferInLoop == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) || @@ -5267,23 +5229,40 @@ void qSuperTableQuery(void *pReadMsg) { vnodeSTableSeqProcessor(pQInfo); } - /* record the total elapsed time */ + // record the total elapsed time pQInfo->elapsedTime += (taosGetTimestampUs() - st); - pQuery->status = isQueryKilled(pQInfo) ? 1 : 0; - -// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, -// pQInfo->query.interpoType); +// taosInterpoSetStartInfo(&pQInfo->runtimeEnv.interpoInfo, pQuery->size, pQInfo->query.interpoType); if (pQuery->rec.size == 0) { -// pQInfo->over = 1; -// dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, -// pQInfo->size); + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + dTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, numOfTables, pQuery->rec.total); // vnodePrintQueryStatistics(pSupporter); } sem_post(&pQInfo->dataReady); -// vnodeDecRefCount(pQInfo); -#endif +} + +void qTableQuery(SQInfo *pQInfo) { + if (pQInfo == NULL || pQInfo->signature != pQInfo) { + dTrace("%p freed abort query", pQInfo); + return; + } + + if (isQueryKilled(pQInfo)) { + dTrace("QInfo:%p it is already killed, abort", pQInfo); + return; + } + + dTrace("QInfo:%p query task is launched", pQInfo); + + int32_t numOfTables = taosArrayGetSize(pQInfo->pTableIdList); + if (numOfTables == 1) { + singleTableQueryImpl(pQInfo); + } else { + multiTableQueryImpl(pQInfo); + } + + // vnodeDecRefCount(pQInfo); } static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryTableMsg, SSqlFuncExprMsg *pExprMsg) { @@ -6064,7 +6043,7 @@ _error: return code; } -int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param, SQInfo **pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, SQInfo **pQInfo) { assert(pQueryTableMsg != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -6098,12 +6077,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param goto _query_over; } - if (QUERY_IS_STABLE_QUERY(pQueryTableMsg->queryType)) { - // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); - } else { - code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); -// (*pQInfo)->param = param; - } + code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); _query_over: if (code != TSDB_CODE_SUCCESS) { diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 72c1667fe1..397807f547 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -373,8 +373,27 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { return 0; } static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) { - assert(pTable->type == TSDB_CHILD_TABLE); - // TODO + assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL); + STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid); + assert(pSTable != NULL); + + int32_t level = 0; + int32_t headSize = 0; + + // first tag column + STColumn* s = pSTable->tagSchema->columns[0]; //??? + + tSkipListRandNodeInfo(pSTable->pIndex, &level, &headSize); + SSkipListNode* pNode = calloc(1, headSize + s->bytes + POINTER_BYTES); + pNode->level = level; + + SSkipList* list = pSTable->pIndex; + + memcpy(SL_GET_NODE_KEY(list, pNode), dataRowTuple(pTable->tagVal), s->columns[0].bytes); + memcpy(SL_GET_NODE_DATA(pNode), &pTable, POINTER_BYTES); + + tSkipListPut(list, pNode); + return 0; } diff --git a/src/vnode/tsdb/src/tsdbRead.c b/src/vnode/tsdb/src/tsdbRead.c index 2dc61f5107..fc196bb62f 100644 --- a/src/vnode/tsdb/src/tsdbRead.c +++ b/src/vnode/tsdb/src/tsdbRead.c @@ -275,8 +275,9 @@ tsdb_query_handle_t *tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond *pCond STableIdInfo* idInfo = taosArrayGet(pQueryHandle->pTableIdList, 0); - STableId tableId = {.uid = idInfo->uid, .tid = idInfo->sid}; - STable *pTable = tsdbIsValidTableToInsert(tsdbGetMeta(pQueryHandle->pTsdb), tableId); + STable *pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), idInfo->uid); + assert(pTable != NULL); + pTableQRec->pTableObj = pTable; // malloc buffer in order to load data from file -- GitLab