From 9326bee43064f07b312da32d20b5a44d85390886 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 May 2021 10:54:28 +0800 Subject: [PATCH] [td-4151] code refactor. --- src/client/inc/tscUtil.h | 2 +- src/client/inc/tsclient.h | 5 ++ src/client/src/tscSQLParser.c | 40 ++++++----- src/client/src/tscServer.c | 126 +++++++++++++++++++++++++++------- src/inc/taosmsg.h | 2 +- src/mnode/src/mnodeTable.c | 97 +++++++++----------------- src/query/src/qAggMain.c | 2 +- src/query/src/qPlan.c | 2 +- src/tsdb/src/tsdbCommit.c | 2 +- src/util/inc/tarray.h | 12 +++- src/util/src/tarray.c | 6 +- 11 files changed, 178 insertions(+), 118 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 79220efb1a..adfaec20c4 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -305,7 +305,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet); -int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVgroupInfo); +int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupList); int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length); bool tscSetSqlOwner(SSqlObj* pSql); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 294cbc595a..8bced2c9f5 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -246,6 +246,11 @@ typedef struct SQueryInfo { bool onlyTagQuery; } SQueryInfo; +typedef struct { + STableMeta *pTableMeta; + SVgroupsInfo *pVgroupInfo; +} STableMetaVgroupInfo; + typedef struct { int command; uint8_t msgType; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 2f22727521..387a348f34 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5607,10 +5607,8 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlN * And then launching multiple async-queries against all qualified virtual nodes, during the first-stage * query operation. */ - int32_t code = tscGetSTableVgroupInfo(pSql, pQueryInfo); - if (code != TSDB_CODE_SUCCESS) { - return code; - } +// assert(allVgroupInfoRetrieved(pQueryInfo)); + // No tables included. No results generated. Query results are empty. if (pTableMetaInfo->vgroupList->numOfVgroups == 0) { @@ -7172,6 +7170,8 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { STableMeta* pTableMeta = calloc(1, maxSize); SArray* plist = taosArrayInit(4, POINTER_BYTES); + SArray* pVgroupList = taosArrayInit(4, POINTER_BYTES); + for(int32_t i = 0; i < numOfTables; ++i) { SName* pname = taosArrayGet(tableNameList, i); tNameExtractFullName(pname, name); @@ -7183,14 +7183,19 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { if (pTableMeta->id.uid > 0) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) { int32_t code = tscCreateTableMetaFromCChildMeta(pTableMeta, name); - if (code != TSDB_CODE_SUCCESS) { - // add to retrieve list + if (code != TSDB_CODE_SUCCESS) { // add to retrieve list continue; } + } else if (pTableMeta->tableType == TSDB_SUPER_TABLE) { + // the vgroup list of a super table is not kept in local buffer, so here need retrieve it + // from the mnode each time + char* t = strdup(name); + taosArrayPush(pVgroupList, &t); } STableMeta* pMeta = tscTableMetaDup(pTableMeta); - taosHashPut(pCmd->pTableMetaMap, name, strlen(name), &pMeta, POINTER_BYTES); + STableMetaVgroupInfo p = {.pTableMeta = pMeta,}; + taosHashPut(pCmd->pTableMetaMap, name, strlen(name), &p, sizeof(STableMetaVgroupInfo)); } else {// add to the retrieve table meta array list. char* t = strdup(name); taosArrayPush(plist, &t); @@ -7199,7 +7204,7 @@ int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { // load the table meta for a given table name list if (taosArrayGetSize(plist) > 0) { - int32_t code = getMultiTableMetaFromMnode(pSql, plist, true); + int32_t code = getMultiTableMetaFromMnode(pSql, plist, pVgroupList); taosArrayDestroyEx(plist, freeElem); return code; @@ -7256,7 +7261,11 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod } const char* name = tNameGetTableName(&pTableMetaInfo->name); - pTableMetaInfo->pTableMeta = taosHashGet(pCmd->pTableMetaMap, name, strlen(name)); + STableMetaVgroupInfo* p = taosHashGet(pCmd->pTableMetaMap, name, strlen(name)); + + pTableMetaInfo->pTableMeta = p->pTableMeta; + pTableMetaInfo->vgroupList = p->pVgroupInfo; + assert(pTableMetaInfo->pTableMeta != NULL); if (code != TSDB_CODE_SUCCESS) { @@ -7394,16 +7403,9 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } bool isSTable = UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo); - if (isSTable) { - code = tscGetSTableVgroupInfo(pSql, pQueryInfo); // TODO refactor: getTablemeta along with vgroupInfo - if (code != TSDB_CODE_SUCCESS) { - return code; - } - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_QUERY); - } else { - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TABLE_QUERY); - } + int32_t type = isSTable? TSDB_QUERY_TYPE_STABLE_QUERY:TSDB_QUERY_TYPE_TABLE_QUERY; + TSDB_QUERY_SET_TYPE(pQueryInfo->type, type); // parse the group by clause in the first place if (validateGroupbyNode(pQueryInfo, pSqlNode->pGroupby, pCmd) != TSDB_CODE_SUCCESS) { @@ -7527,7 +7529,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf pQueryInfo->exprList1 = taosArrayInit(4, POINTER_BYTES); } - taosArrayPushBatch(pQueryInfo->exprList1, (void*) p, numOfExpr); + taosArrayAddBatch(pQueryInfo->exprList1, (void*) p, numOfExpr); } #if 0 diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 23ae03bdc3..1b5a0b2445 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1829,7 +1829,7 @@ static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); pMetaMsg->uid = htobe64(pMetaMsg->uid); - pMetaMsg->contLen = htons(pMetaMsg->contLen); +// pMetaMsg->contLen = htonl(pMetaMsg->contLen); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) && @@ -1941,16 +1941,61 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -/* - * multi table meta rsp pkg format: - * | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 - * | 4B - */ +static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t id) { + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg; + pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); + + *size = (int32_t) (sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg)); + + size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); + SVgroupsInfo* pVgroupInfo = calloc(1, vgroupsz); + assert(pVgroupInfo != NULL); + + pVgroupInfo->numOfVgroups = pVgroupMsg->numOfVgroups; + if (pVgroupInfo->numOfVgroups <= 0) { + tscDebug("0x%"PRIx64" empty vgroup info, no corresponding tables for stable", id); + } else { + for (int32_t j = 0; j < pVgroupInfo->numOfVgroups; ++j) { + // just init, no need to lock + SVgroupInfo *pVgroup = &pVgroupInfo->vgroups[j]; + + SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; + vmsg->vgId = htonl(vmsg->vgId); + vmsg->numOfEps = vmsg->numOfEps; + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port); + } + + SNewVgroupInfo newVi = createNewVgroupInfo(vmsg); + pVgroup->numOfEps = newVi.numOfEps; + pVgroup->vgId = newVi.vgId; + for (int32_t k = 0; k < vmsg->numOfEps; ++k) { + pVgroup->epAddr[k].port = newVi.ep[k].port; + pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN); + } + + // check if current buffer contains the vgroup info. + // If not, add it + SNewVgroupInfo existVgroupInfo = {.inUse = -1}; + taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo)); + + if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) || + (existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it + taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi)); + tscDebug("0x%"PRIx64" add new VgroupInfo, vgId:%d, total cached:%d", id, newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap)); + } + } + } + + return pVgroupInfo; +} + int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { char *rsp = pSql->res.pRsp; SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp; pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables); + pMultiMeta->numOfVgroup = htonl(pMultiMeta->numOfVgroup); rsp += sizeof(SMultiTableMeta); @@ -1959,8 +2004,9 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + char* pMsg = pMultiMeta->meta; for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) { - STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMultiMeta->meta; + STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg; int32_t code = tableMetaMsgConvert(pMetaMsg); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1972,13 +2018,14 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_TSC_INVALID_VALUE; } - int32_t t = tscGetTableMetaSize(pTableMeta); - SName sn = {0}; tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + const char* tableName = tNameGetTableName(&sn); int32_t keyLen = strlen(tableName); - taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, pTableMeta, t); + + STableMetaVgroupInfo p = {.pTableMeta = pTableMeta,}; + taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, &p, sizeof(STableMetaVgroupInfo)); bool addToBuf = false; if (taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) { @@ -1991,10 +2038,26 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { // if the vgroup is not updated in current process, update it. int64_t vgId = pMetaMsg->vgroup.vgId; - if (taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { + if (pTableMeta->tableType != TSDB_SUPER_TABLE && taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); taosHashPut(pSet, &vgId, sizeof(vgId), "", 0); } + + pMsg += pMetaMsg->contLen; + } + + if (pMultiMeta->numOfVgroup > 0) { + char* name = pMsg; + pMsg += TSDB_TABLE_NAME_LEN; + + STableMetaVgroupInfo* p = taosHashGet(pParentCmd->pTableMetaMap, name, strnlen(name, TSDB_TABLE_NAME_LEN)); + assert(p != NULL); + + int32_t size = 0; + SVgroupsInfo* pVgroupInfo = createVgroupInfoFromMsg(pMsg, &size, pSql->self); + + p->pVgroupInfo = pVgroupInfo; + pMsg += size; } pSql->res.code = TSDB_CODE_SUCCESS; @@ -2024,12 +2087,12 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { STableMetaInfo *pInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i); - SVgroupsMsg * pVgroupMsg = (SVgroupsMsg *) pMsg; + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *) pMsg; pVgroupMsg->numOfVgroups = htonl(pVgroupMsg->numOfVgroups); - size_t size = sizeof(SVgroupMsg) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsMsg); - - size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); + int32_t size = 0; + pInfo->vgroupList = createVgroupInfoFromMsg(pMsg, &size, pSql->self); + /* size_t vgroupsz = sizeof(SVgroupInfo) * pVgroupMsg->numOfVgroups + sizeof(SVgroupsInfo); pInfo->vgroupList = calloc(1, vgroupsz); assert(pInfo->vgroupList != NULL); @@ -2068,7 +2131,7 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { } } } - +*/ pMsg += size; } @@ -2403,7 +2466,7 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn return code; } -int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVgroupInfo) { +int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList) { SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); if (NULL == pNew) { tscError("0x%"PRIx64" failed to allocate sqlobj to get multiple table meta", pSql->self); @@ -2414,8 +2477,10 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVg pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_MULTI_META; - int32_t numOfTables = taosArrayGetSize(pNameList); - int32_t size = numOfTables * TSDB_TABLE_FNAME_LEN + sizeof(SMultiTableInfoMsg); + int32_t numOfTable = (int32_t) taosArrayGetSize(pNameList); + int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pVgroupNameList); + + int32_t size = (numOfTable + numOfVgroupList) * TSDB_TABLE_FNAME_LEN + sizeof(SMultiTableInfoMsg); if (TSDB_CODE_SUCCESS != tscAllocPayload(&pNew->cmd, size)) { tscError("0x%"PRIx64" malloc failed for payload to get table meta", pSql->self); tscFreeSqlObj(pNew); @@ -2423,14 +2488,14 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVg } SMultiTableInfoMsg* pInfo = (SMultiTableInfoMsg*) pNew->cmd.payload; - pInfo->loadVgroup = htonl(loadVgroupInfo? 1:0); - pInfo->numOfTables = htonl(numOfTables); + pInfo->numOfTables = htonl((uint32_t) taosArrayGetSize(pNameList)); + pInfo->numOfVgroups = htonl((uint32_t) taosArrayGetSize(pVgroupNameList)); char* start = pInfo->tableNames; int32_t len = 0; - for(int32_t i = 0; i < numOfTables; ++i) { + for(int32_t i = 0; i < numOfTable; ++i) { char* name = taosArrayGetP(pNameList, i); - if (i < numOfTables - 1) { + if (i < numOfTable - 1 || numOfVgroupList > 0) { len = sprintf(start, "%s,", name); } else { len = sprintf(start, "%s", name); @@ -2439,12 +2504,23 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, bool loadVg start += len; } + for(int32_t i = 0; i < numOfVgroupList; ++i) { + char* name = taosArrayGetP(pVgroupNameList, i); + if (i < numOfVgroupList - 1) { + len = sprintf(start, "%s, ", name); + } else { + len = sprintf(start, "%s", name); + } + + start += len; + } + pNew->cmd.payloadLen = (start - pInfo->tableNames) + sizeof(SMultiTableInfoMsg); pNew->cmd.msgType = TSDB_MSG_TYPE_CM_TABLES_META; registerSqlObj(pNew); - tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, loadVgroup:%d, msg size:%d", pSql->self, - pNew->self, numOfTables, loadVgroupInfo, pNew->cmd.payloadLen); + tscDebug("0x%"PRIx64" new pSqlObj:0x%"PRIx64" to get %d tableMeta, vgroupInfo:%d, msg size:%d", pSql->self, + pNew->self, numOfTable, numOfVgroupList, pNew->cmd.payloadLen); pNew->fp = tscTableMetaCallBack; pNew->param = (void *)pSql->self; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 9e96e78cc9..cc45774b15 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -703,7 +703,7 @@ typedef struct { } STableInfoMsg; typedef struct { - int32_t loadVgroup; + int32_t numOfVgroups; int32_t numOfTables; char tableNames[]; } SMultiTableInfoMsg; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 3f2e4473a2..dc1b2072de 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1724,16 +1724,19 @@ static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTa return contLen; } -static char* serializeVgroupInfo(SSTableObj *pTable, char* msg, SMnodeMsg* pMsgBody, void* handle) { - *(uint64_t*)msg = htobe64(pTable->uid); - msg += sizeof(sizeof(pTable->uid)); +static char* serializeVgroupInfo(SSTableObj *pTable, char* name, char* msg, SMnodeMsg* pMsgBody, void* handle) { + SName sn = {0}; + tNameFromString(&sn, name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + const char* tableName = tNameGetTableName(&sn); + + strncpy(msg, tableName, TSDB_TABLE_NAME_LEN); + msg += TSDB_TABLE_NAME_LEN; if (pTable->vgHash == NULL) { - mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, stableName); + mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, name); mnodeDecTableRef(pTable); // even this super table has no corresponding table, still return - int64_t uid = htobe64(pTable->uid); SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; pVgroupMsg->numOfVgroups = 0; @@ -1805,59 +1808,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { continue; } - msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle); + msg = serializeVgroupInfo(pTable, stableName, msg, pMsg, pMsg->rpcMsg.ahandle); pRsp->numOfTables++; - -// if (pTable->vgHash == NULL) { -// mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, -// stableName); -// mnodeDecTableRef(pTable); -// -// // even this super table has no corresponding table, still return -// pRsp->numOfTables++; -// -// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; -// pVgroupMsg->numOfVgroups = 0; -// -// msg += sizeof(SVgroupsMsg); -// } else { -// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; -// mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle, -// pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash)); -// -// int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL); -// int32_t vgSize = 0; -// while (pVgId) { -// SVgObj *pVgroup = mnodeGetVgroup(*pVgId); -// pVgId = taosHashIterate(pTable->vgHash, pVgId); -// if (pVgroup == NULL) continue; -// -// pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId); -// pVgroupMsg->vgroups[vgSize].numOfEps = 0; -// -// for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { -// SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; -// if (pDnode == NULL) break; -// -// tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); -// pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); -// -// pVgroupMsg->vgroups[vgSize].numOfEps++; -// } -// -// vgSize++; -// mnodeDecVgroupRef(pVgroup); -// } -// -// taosHashCancelIterate(pTable->vgHash, pVgId); -// mnodeDecTableRef(pTable); -// -// pVgroupMsg->numOfVgroups = htonl(vgSize); -// -// // one table is done, try the next table -// msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg); -// pRsp->numOfTables++; -// } } if (pRsp->numOfTables != numOfTable) { @@ -2887,8 +2839,9 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; - pInfo->numOfTables = htonl(pInfo->numOfTables); - pInfo->loadVgroup = htonl(pInfo->loadVgroup); + + pInfo->numOfTables = htonl(pInfo->numOfTables); + pInfo->numOfVgroups = htonl(pInfo->numOfVgroups); // first malloc 80KB, subsequent reallocation will expand the size as twice of the original size int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); @@ -2925,8 +2878,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { return TSDB_CODE_APP_NOT_READY; } - int availLen = totalMallocLen - pMultiMeta->contLen; - if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) { + int remain = totalMallocLen - pMultiMeta->contLen; + if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) { totalMallocLen *= 2; pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen); if (pMultiMeta == NULL) { @@ -2942,7 +2895,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { code = mnodeDoGetChildTableMeta(pMsg, pMeta); } else { code = mnodeDoGetSuperTableMeta(pMsg, pMeta); - taosArrayPush(pList, fullName); // keep the super table full name + + // keep the full name for each super table for retrieve vgroup list + taosArrayPush(pList, &fullName); } if (code == TSDB_CODE_SUCCESS) { @@ -2955,16 +2910,26 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { char* msg = (char*) pMultiMeta + pMultiMeta->contLen; - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - char* name = taosArrayGet(pList, i); + // add the additional super table names that needs the vgroup info + for(int32_t i = 0; i < pInfo->numOfVgroups; ++i) { + char *fullName = (char *)(pInfo->tableNames + (i + pInfo->numOfTables) * TSDB_TABLE_FNAME_LEN); + taosArrayPush(pList, fullName); + } + + // add the pVgroupList into the pList + int32_t numOfStable = (int32_t) taosArrayGetSize(pList); + pMultiMeta->numOfVgroup = htonl(numOfStable); + + for(int32_t i = 0; i < numOfStable; ++i) { + char* name = taosArrayGetP(pList, i); SSTableObj *pTable = mnodeGetSuperTable(name); if (pTable == NULL) { - mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName); + mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, name); mnodeDecTableRef(pTable); continue; } - msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle); + msg = serializeVgroupInfo(pTable, name, msg, pMsg, pMsg->rpcMsg.ahandle); } pMultiMeta->contLen = (msg - (char*) pMultiMeta); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 844817c144..6218494b48 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4779,7 +4779,7 @@ static void mergeTableBlockDist(STableBlockDist* pDist, const STableBlockDist* p pDist->dataBlockInfos = taosArrayInit(4, sizeof(SFileBlockInfo)); } - taosArrayPushBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, (int32_t) taosArrayGetSize(pSrc->dataBlockInfos)); + taosArrayAddBatch(pDist->dataBlockInfos, pSrc->dataBlockInfos->pData, (int32_t) taosArrayGetSize(pSrc->dataBlockInfos)); } void block_func_merge(SQLFunctionCtx* pCtx) { diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index c46257ba9e..9176b4a06b 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -215,7 +215,7 @@ SArray* createQueryPlanImpl(SQueryInfo* pQueryInfo) { for(int32_t i = 0; i < size; ++i) { SQueryInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i); SArray* p = createQueryPlanImpl(pq); - taosArrayPushBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); + taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p)); } } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 4351cce51c..4cbd3c10f3 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -1149,7 +1149,7 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const return -1; } - if (pSubBlocks && taosArrayPushBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { + if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } diff --git a/src/util/inc/tarray.h b/src/util/inc/tarray.h index f2e268c2d4..bc25776caa 100644 --- a/src/util/inc/tarray.h +++ b/src/util/inc/tarray.h @@ -50,7 +50,15 @@ void* taosArrayInit(size_t size, size_t elemSize); * @param nEles * @return */ -void *taosArrayPushBatch(SArray *pArray, const void *pData, int nEles); +void *taosArrayAddBatch(SArray *pArray, const void *pData, int nEles); + +/** + * add all element from the source array list into the destination + * @param pArray + * @param pInput + * @return + */ +void* taosArrayAddAll(SArray* pArray, const SArray* pInput); /** * @@ -59,7 +67,7 @@ void *taosArrayPushBatch(SArray *pArray, const void *pData, int nEles); * @return */ static FORCE_INLINE void* taosArrayPush(SArray* pArray, const void* pData) { - return taosArrayPushBatch(pArray, pData, 1); + return taosArrayAddBatch(pArray, pData, 1); } /** diff --git a/src/util/src/tarray.c b/src/util/src/tarray.c index 4dde5dbba2..87433b4d4e 100644 --- a/src/util/src/tarray.c +++ b/src/util/src/tarray.c @@ -55,7 +55,7 @@ static int32_t taosArrayResize(SArray* pArray) { return 0; } -void* taosArrayPushBatch(SArray* pArray, const void* pData, int nEles) { +void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles) { if (pArray == NULL || pData == NULL) { return NULL; } @@ -81,6 +81,10 @@ void* taosArrayPushBatch(SArray* pArray, const void* pData, int nEles) { return dst; } +void* taosArrayAddAll(SArray* pArray, const SArray* pInput) { + return taosArrayAddBatch(pArray, pInput->pData, (int32_t) taosArrayGetSize(pInput)); +} + void* taosArrayPop(SArray* pArray) { assert( pArray != NULL ); -- GitLab