diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 06bf90a122d8724ecf48ad3fe7b44ef974e2d450..2f227275210e884e3e88aed7828538d2c1bda21c 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -619,7 +619,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { case TSDB_SQL_SELECT: { const char* msg1 = "columns in select clause not identical"; - loadAllTableMeta(pSql, pInfo); + int32_t code = loadAllTableMeta(pSql, pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } SQueryInfo* pCurrent = pCmd->pQueryInfo; for(int32_t i = 0; i < pCmd->clauseIndex; ++i) { @@ -7128,7 +7131,7 @@ static int32_t getTableNameFromSubquery(SSqlNode* pSqlNode, SArray* tableNameLis } static void freeElem(void* p) { - tfree(p); + tfree(*(char**)p); } int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo) { @@ -7252,7 +7255,10 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod strncpy(pTableMetaInfo->aliasName, tNameGetTableName(&pTableMetaInfo->name), tListLen(pTableMetaInfo->aliasName)); } - code = tscGetTableMeta(pSql, pTableMetaInfo); + const char* name = tNameGetTableName(&pTableMetaInfo->name); + pTableMetaInfo->pTableMeta = taosHashGet(pCmd->pTableMetaMap, name, strlen(name)); + assert(pTableMetaInfo->pTableMeta != NULL); + if (code != TSDB_CODE_SUCCESS) { return code; } @@ -7376,13 +7382,13 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } else { pQueryInfo->command = TSDB_SQL_SELECT; - size_t fromSize = taosArrayGetSize(pSqlNode->from->list); - if (fromSize > TSDB_MAX_JOIN_TABLE_NUM) { + size_t numOfTables = taosArrayGetSize(pSqlNode->from->list); + if (numOfTables > TSDB_MAX_JOIN_TABLE_NUM) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } // set all query tables, which are maybe more than one. - code = doLoadAllTableMeta(pSql, pQueryInfo, pSqlNode, (int32_t) fromSize); + code = doLoadAllTableMeta(pSql, pQueryInfo, pSqlNode, (int32_t) numOfTables); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1c4e924d0268061fd1457bd96777fbf2be30ada5..23ae03bdc3d56c29159a9182b4fc9bea0b7816c6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1822,18 +1822,16 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -int tscProcessTableMetaRsp(SSqlObj *pSql) { - STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp; - +static int32_t tableMetaMsgConvert(STableMetaMsg* pMetaMsg) { pMetaMsg->tid = htonl(pMetaMsg->tid); pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->vgroup.vgId = htonl(pMetaMsg->vgroup.vgId); - + pMetaMsg->uid = htobe64(pMetaMsg->uid); pMetaMsg->contLen = htons(pMetaMsg->contLen); pMetaMsg->numOfColumns = htons(pMetaMsg->numOfColumns); - + if ((pMetaMsg->tableType != TSDB_SUPER_TABLE) && (pMetaMsg->tid <= 0 || pMetaMsg->vgroup.vgId < 2 || pMetaMsg->vgroup.numOfEps <= 0)) { tscError("invalid value in table numOfEps:%d, vgId:%d tid:%d, name:%s", pMetaMsg->vgroup.numOfEps, pMetaMsg->vgroup.vgId, @@ -1868,21 +1866,34 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { pSchema++; } - - STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); - assert(pTableMetaInfo->pTableMeta == NULL); - STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg); - if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) { - tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, tNameGetTableName(&pTableMetaInfo->name)); - return TSDB_CODE_TSC_INVALID_VALUE; + return TSDB_CODE_SUCCESS; +} + +// update the vgroupInfo if needed +static void doUpdateVgroupInfo(STableMeta *pTableMeta, SVgroupMsg *pVgroupMsg) { + if (pTableMeta->vgId > 0) { + int32_t vgId = pTableMeta->vgId; + assert(pTableMeta->tableType != TSDB_SUPER_TABLE); + + SNewVgroupInfo vgroupInfo = {.inUse = -1}; + taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + + // vgroup info exists, compare with it + if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, pVgroupMsg)) || (vgroupInfo.inUse < 0)) { + vgroupInfo = createNewVgroupInfo(pVgroupMsg); + taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); + tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); + } } +} +static void doAddTableMetaLocalBuf(STableMeta* pTableMeta, STableMetaMsg* pMetaMsg, bool updateSTable) { if (pTableMeta->tableType == TSDB_CHILD_TABLE) { - // check if super table hashmap or not + // add or update the corresponding super table meta data info int32_t len = (int32_t) strnlen(pTableMeta->sTableName, TSDB_TABLE_FNAME_LEN); - // super tableMeta data alreay exists, create it according to tableMeta and add it to hash map + // The super tableMeta already exists, create it according to tableMeta and add it to hash map STableMeta* pSupTableMeta = createSuperTableMeta(pMetaMsg); uint32_t size = tscGetTableMetaSize(pSupTableMeta); @@ -1892,37 +1903,37 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { tfree(pSupTableMeta); CChildTableMeta* cMeta = tscCreateChildMeta(pTableMeta); - - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, name); - - taosHashPut(tscTableMetaInfo, name, strlen(name), cMeta, sizeof(CChildTableMeta)); + taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), cMeta, sizeof(CChildTableMeta)); tfree(cMeta); } else { uint32_t s = tscGetTableMetaSize(pTableMeta); - - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, name); - - taosHashPut(tscTableMetaInfo, name, strlen(name), pTableMeta, s); + taosHashPut(tscTableMetaInfo, pMetaMsg->tableFname, strlen(pMetaMsg->tableFname), pTableMeta, s); } +} - // update the vgroupInfo if needed - if (pTableMeta->vgId > 0) { - int32_t vgId = pTableMeta->vgId; - assert(pTableMeta->tableType != TSDB_SUPER_TABLE); +int tscProcessTableMetaRsp(SSqlObj *pSql) { + STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp; + int32_t code = tableMetaMsgConvert(pMetaMsg); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - SNewVgroupInfo vgroupInfo = {.inUse = -1}; - taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo)); + STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); + assert(pTableMetaInfo->pTableMeta == NULL); - if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, &pMetaMsg->vgroup)) || - (vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it - vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup); - taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo)); - tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap)); - } + STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg); + if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) { + tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, tNameGetTableName(&pTableMetaInfo->name)); + return TSDB_CODE_TSC_INVALID_VALUE; } + char name[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(&pTableMetaInfo->name, name); + assert(strncmp(pMetaMsg->tableFname, name, tListLen(pMetaMsg->tableFname)) == 0); + + doAddTableMetaLocalBuf(pTableMeta, pMetaMsg, true); + doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); + tscDebug("0x%"PRIx64" recv table meta, uid:%" PRIu64 ", tid:%d, name:%s", pSql->self, pTableMeta->id.uid, pTableMeta->id.tid, tNameGetTableName(&pTableMetaInfo->name)); @@ -1930,109 +1941,66 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -/** +/* * multi table meta rsp pkg format: - * | STaosRsp | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 - * |...... 1B 4B - **/ -int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { -#if 0 + * | SMultiTableInfoMsg | SMeterMeta0 | SSchema0 | SMeterMeta1 | SSchema1 | SMeterMeta2 | SSchema2 + * | 4B + */ +int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { char *rsp = pSql->res.pRsp; - ieType = *rsp; - if (ieType != TSDB_IE_TYPE_META) { - tscError("invalid ie type:%d", ieType); - pSql->res.code = TSDB_CODE_TSC_INVALID_IE; - pSql->res.numOfTotal = 0; - return TSDB_CODE_TSC_APP_ERROR; - } + SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp; + pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables); + + rsp += sizeof(SMultiTableMeta); + + SSqlObj* pParentSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param); + SSqlCmd *pParentCmd = &pParentSql->cmd; + + SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - rsp++; + for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) { + STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMultiMeta->meta; + int32_t code = tableMetaMsgConvert(pMetaMsg); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - SMultiTableInfoMsg *pInfo = (SMultiTableInfoMsg *)rsp; - totalNum = htonl(pInfo->numOfTables); - rsp += sizeof(SMultiTableInfoMsg); + STableMeta* pTableMeta = tscCreateTableMetaFromMsg(pMetaMsg); + if (!tIsValidSchema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.numOfTags)) { + tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname); + return TSDB_CODE_TSC_INVALID_VALUE; + } - for (i = 0; i < totalNum; i++) { - SMultiTableMeta *pMultiMeta = (SMultiTableMeta *)rsp; - STableMeta * pMeta = pMultiMeta->metas; + int32_t t = tscGetTableMetaSize(pTableMeta); - pMeta->sid = htonl(pMeta->sid); - pMeta->sversion = htons(pMeta->sversion); - pMeta->vgId = htonl(pMeta->vgId); - pMeta->uid = htobe64(pMeta->uid); + SName sn = {0}; + tNameFromString(&sn, pMetaMsg->tableFname, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + const char* tableName = tNameGetTableName(&sn); + int32_t keyLen = strlen(tableName); + taosHashPut(pParentCmd->pTableMetaMap, tableName, keyLen, pTableMeta, t); - if (pMeta->sid <= 0 || pMeta->vgId < 0) { - tscError("invalid meter vgId:%d, sid%d", pMeta->vgId, pMeta->sid); - pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE; - pSql->res.numOfTotal = i; - return TSDB_CODE_TSC_APP_ERROR; + bool addToBuf = false; + if (taosHashGet(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid)) == NULL) { + addToBuf = true; + taosHashPut(pSet, &pMetaMsg->uid, sizeof(pMetaMsg->uid), "", 0); } - // pMeta->numOfColumns = htons(pMeta->numOfColumns); - // - // if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) { - // tscError("invalid tag value count:%d", pMeta->numOfTags); - // pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE; - // pSql->res.numOfTotal = i; - // return TSDB_CODE_TSC_APP_ERROR; - // } - // - // if (pMeta->numOfTags > TSDB_MAX_TAGS || pMeta->numOfTags < 0) { - // tscError("invalid numOfTags:%d", pMeta->numOfTags); - // pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE; - // pSql->res.numOfTotal = i; - // return TSDB_CODE_TSC_APP_ERROR; - // } - // - // if (pMeta->numOfColumns > TSDB_MAX_COLUMNS || pMeta->numOfColumns < 0) { - // tscError("invalid numOfColumns:%d", pMeta->numOfColumns); - // pSql->res.code = TSDB_CODE_TSC_INVALID_VALUE; - // pSql->res.numOfTotal = i; - // return TSDB_CODE_TSC_APP_ERROR; - // } - // - // for (int j = 0; j < TSDB_REPLICA_MAX_NUM; ++j) { - // pMeta->vpeerDesc[j].vnode = htonl(pMeta->vpeerDesc[j].vnode); - // } - // - // pMeta->rowSize = 0; - // rsp += sizeof(SMultiTableMeta); - // pSchema = (SSchema *)rsp; - // - // int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags; - // for (int j = 0; j < numOfTotalCols; ++j) { - // pSchema->bytes = htons(pSchema->bytes); - // pSchema->colId = htons(pSchema->colId); - // - // // ignore the tags length - // if (j < pMeta->numOfColumns) { - // pMeta->rowSize += pSchema->bytes; - // } - // pSchema++; - // } - // - // rsp += numOfTotalCols * sizeof(SSchema); - // - // int32_t tagLen = 0; - // SSchema *pTagsSchema = tscGetTableTagSchema(pMeta); - // - // if (pMeta->tableType == TSDB_CHILD_TABLE) { - // for (int32_t j = 0; j < pMeta->numOfTags; ++j) { - // tagLen += pTagsSchema[j].bytes; - // } - // } - // - // rsp += tagLen; - // int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache - // } + // create the tableMeta and add it into the TableMeta map + doAddTableMetaLocalBuf(pTableMeta, pMetaMsg, addToBuf); + + // if the vgroup is not updated in current process, update it. + int64_t vgId = pMetaMsg->vgroup.vgId; + if (taosHashGet(pSet, &vgId, sizeof(vgId)) == NULL) { + doUpdateVgroupInfo(pTableMeta, &pMetaMsg->vgroup); + taosHashPut(pSet, &vgId, sizeof(vgId), "", 0); + } } - + pSql->res.code = TSDB_CODE_SUCCESS; - pSql->res.numOfTotal = i; - tscDebug("0x%"PRIx64" load multi-metermeta resp from complete num:%d", pSql->self, pSql->res.numOfTotal); -#endif - + pSql->res.numOfTotal = pMultiMeta->numOfTables; + tscDebug("0x%"PRIx64" load multi-tableMeta resp from complete numOfTables:%d", pSql->self, pMultiMeta->numOfTables); + return TSDB_CODE_SUCCESS; } @@ -2670,7 +2638,7 @@ void tscInitMsgsFp() { tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp; tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp; tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp; - tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiMeterMetaRsp; + tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp; tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp; tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function. diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 8c47896a340dfaeb8755874738768ba04e946c15..9e96e78cc96d12c450da6a52b826b503556d409e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -754,6 +754,7 @@ typedef struct STableMetaMsg { typedef struct SMultiTableMeta { int32_t numOfTables; + int32_t numOfVgroup; int32_t contLen; char meta[]; } SMultiTableMeta; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 0f08ec478a052dac3cfec7c3394876280763b84f..3f2e4473a2230bd3ebee1dcc59b3188630cf4d09 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1709,11 +1709,7 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) { return TSDB_CODE_SUCCESS; } -static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { - SSTableVgroupMsg *pInfo = pMsg->rpcMsg.pCont; - int32_t numOfTable = htonl(pInfo->numOfTables); - - // reserve space +static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) { int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg); for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; @@ -1725,6 +1721,72 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { mnodeDecTableRef(pTable); } + return contLen; +} + +static char* serializeVgroupInfo(SSTableObj *pTable, char* msg, SMnodeMsg* pMsgBody, void* handle) { + *(uint64_t*)msg = htobe64(pTable->uid); + msg += sizeof(sizeof(pTable->uid)); + + if (pTable->vgHash == NULL) { + mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsgBody, handle, stableName); + mnodeDecTableRef(pTable); + + // even this super table has no corresponding table, still return + int64_t uid = htobe64(pTable->uid); + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; + pVgroupMsg->numOfVgroups = 0; + + msg += sizeof(SVgroupsMsg); + } else { + SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; + mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsgBody, handle, + pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash)); + + int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL); + int32_t vgSize = 0; + while (pVgId) { + SVgObj *pVgroup = mnodeGetVgroup(*pVgId); + pVgId = taosHashIterate(pTable->vgHash, pVgId); + if (pVgroup == NULL) { + continue; + } + + pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId); + pVgroupMsg->vgroups[vgSize].numOfEps = 0; + + for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { + SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; + if (pDnode == NULL) break; + + tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); + pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); + + pVgroupMsg->vgroups[vgSize].numOfEps++; + } + + vgSize++; + mnodeDecVgroupRef(pVgroup); + } + + taosHashCancelIterate(pTable->vgHash, pVgId); + mnodeDecTableRef(pTable); + + pVgroupMsg->numOfVgroups = htonl(vgSize); + + // one table is done, try the next table + msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg); + } + + return msg; +} + +static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { + SSTableVgroupMsg *pInfo = pMsg->rpcMsg.pCont; + int32_t numOfTable = htonl(pInfo->numOfTables); + + // calculate the required space. + int32_t contLen = calculateVgroupMsgLength(pInfo, numOfTable); SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { return TSDB_CODE_MND_OUT_OF_MEMORY; @@ -1735,62 +1797,67 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i; + SSTableObj *pTable = mnodeGetSuperTable(stableName); if (pTable == NULL) { mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName); mnodeDecTableRef(pTable); continue; } - if (pTable->vgHash == NULL) { - mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, - stableName); - mnodeDecTableRef(pTable); - - // even this super table has no corresponding table, still return - pRsp->numOfTables++; - - SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; - pVgroupMsg->numOfVgroups = 0; - - msg += sizeof(SVgroupsMsg); - } else { - SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; - mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle, - pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash)); - - int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL); - int32_t vgSize = 0; - while (pVgId) { - SVgObj *pVgroup = mnodeGetVgroup(*pVgId); - pVgId = taosHashIterate(pTable->vgHash, pVgId); - if (pVgroup == NULL) continue; - pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId); - pVgroupMsg->vgroups[vgSize].numOfEps = 0; - - for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { - SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; - if (pDnode == NULL) break; - - tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); - pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); - - pVgroupMsg->vgroups[vgSize].numOfEps++; - } - - vgSize++; - mnodeDecVgroupRef(pVgroup); - } - - taosHashCancelIterate(pTable->vgHash, pVgId); - mnodeDecTableRef(pTable); - - pVgroupMsg->numOfVgroups = htonl(vgSize); - - // one table is done, try the next table - msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg); - pRsp->numOfTables++; - } + msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle); + pRsp->numOfTables++; + +// if (pTable->vgHash == NULL) { +// mDebug("msg:%p, app:%p stable:%s, no vgroup exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, +// stableName); +// mnodeDecTableRef(pTable); +// +// // even this super table has no corresponding table, still return +// pRsp->numOfTables++; +// +// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; +// pVgroupMsg->numOfVgroups = 0; +// +// msg += sizeof(SVgroupsMsg); +// } else { +// SVgroupsMsg *pVgroupMsg = (SVgroupsMsg *)msg; +// mDebug("msg:%p, app:%p stable:%s, hash:%p sizeOfVgList:%d will be returned", pMsg, pMsg->rpcMsg.ahandle, +// pTable->info.tableId, pTable->vgHash, taosHashGetSize(pTable->vgHash)); +// +// int32_t *pVgId = taosHashIterate(pTable->vgHash, NULL); +// int32_t vgSize = 0; +// while (pVgId) { +// SVgObj *pVgroup = mnodeGetVgroup(*pVgId); +// pVgId = taosHashIterate(pTable->vgHash, pVgId); +// if (pVgroup == NULL) continue; +// +// pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId); +// pVgroupMsg->vgroups[vgSize].numOfEps = 0; +// +// for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { +// SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; +// if (pDnode == NULL) break; +// +// tstrncpy(pVgroupMsg->vgroups[vgSize].epAddr[vn].fqdn, pDnode->dnodeFqdn, TSDB_FQDN_LEN); +// pVgroupMsg->vgroups[vgSize].epAddr[vn].port = htons(pDnode->dnodePort); +// +// pVgroupMsg->vgroups[vgSize].numOfEps++; +// } +// +// vgSize++; +// mnodeDecVgroupRef(pVgroup); +// } +// +// taosHashCancelIterate(pTable->vgHash, pVgId); +// mnodeDecTableRef(pTable); +// +// pVgroupMsg->numOfVgroups = htonl(vgSize); +// +// // one table is done, try the next table +// msg += sizeof(SVgroupsMsg) + vgSize * sizeof(SVgroupMsg); +// pRsp->numOfTables++; +// } } if (pRsp->numOfTables != numOfTable) { @@ -2818,14 +2885,13 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) { } } -//TODO. set the vgroup info for the super table static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont; pInfo->numOfTables = htonl(pInfo->numOfTables); pInfo->loadVgroup = htonl(pInfo->loadVgroup); - // first malloc 4KB, subsequent reallocation will expand the size as twice of the original size - int32_t totalMallocLen = 4 * 1024; + // first malloc 80KB, subsequent reallocation will expand the size as twice of the original size + int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen); if (pMultiMeta == NULL) { @@ -2835,13 +2901,17 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { pMultiMeta->contLen = sizeof(SMultiTableMeta); pMultiMeta->numOfTables = 0; + SArray* pList = taosArrayInit(4, POINTER_BYTES); + for (int32_t t = 0; t < pInfo->numOfTables; ++t) { char *fullName = (char *)(pInfo->tableNames + t * TSDB_TABLE_FNAME_LEN); if (pMsg->pTable == NULL) { pMsg->pTable = mnodeGetTable(fullName); - if (pMsg->pTable == NULL) { // TODO: return error to client? - continue; + if (pMsg->pTable == NULL) { + rpcFreeCont(pMultiMeta); + mError("msg:%p, app:%p table:%s, failed to get table meta, table not exist", pMsg, pMsg->rpcMsg.ahandle, fullName); + return TSDB_CODE_MND_INVALID_TABLE_NAME; } } @@ -2850,8 +2920,9 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { } if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { - mnodeDecTableRef(pMsg->pTable); // TODO: return error to client? - continue; + rpcFreeCont(pMultiMeta); + mnodeDecTableRef(pMsg->pTable); + return TSDB_CODE_APP_NOT_READY; } int availLen = totalMallocLen - pMultiMeta->contLen; @@ -2864,13 +2935,14 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { } } - STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->meta + pMultiMeta->contLen); + STableMetaMsg *pMeta = (STableMetaMsg *)((char*) pMultiMeta + pMultiMeta->contLen); int32_t code = 0; if (pMsg->pTable->type != TSDB_SUPER_TABLE) { code = mnodeDoGetChildTableMeta(pMsg, pMeta); } else { code = mnodeDoGetSuperTableMeta(pMsg, pMeta); + taosArrayPush(pList, fullName); // keep the super table full name } if (code == TSDB_CODE_SUCCESS) { @@ -2881,6 +2953,23 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { mnodeDecTableRef(pMsg->pTable); } + char* msg = (char*) pMultiMeta + pMultiMeta->contLen; + + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + char* name = taosArrayGet(pList, i); + SSTableObj *pTable = mnodeGetSuperTable(name); + if (pTable == NULL) { + mError("msg:%p, app:%p stable:%s, not exist while get stable vgroup info", pMsg, pMsg->rpcMsg.ahandle, stableName); + mnodeDecTableRef(pTable); + continue; + } + + msg = serializeVgroupInfo(pTable, msg, pMsg, pMsg->rpcMsg.ahandle); + } + + pMultiMeta->contLen = (msg - (char*) pMultiMeta); + + pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables); pMsg->rpcRsp.rsp = pMultiMeta; pMsg->rpcRsp.len = pMultiMeta->contLen;