From aa55054e368aee22563a47c143e035108cb46248 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 19 Oct 2020 23:19:44 +0800 Subject: [PATCH] [td-1747] --- src/client/inc/tscUtil.h | 5 +- src/client/inc/tsclient.h | 1 + src/client/src/tscServer.c | 14 +++--- src/client/src/tscStream.c | 6 +-- src/client/src/tscSubquery.c | 8 +-- src/client/src/tscSystem.c | 8 +-- src/client/src/tscUtil.c | 95 +++++++++++++++++++++++++++++++----- src/mnode/src/mnodeTable.c | 4 +- 8 files changed, 111 insertions(+), 30 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 76a9bbac10..1bd86714c5 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -231,10 +231,11 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); void tscResetForNextRetrieve(SSqlRes* pRes); - -void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex); void tscDoQuery(SSqlObj* pSql); +SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo); +void* tscVgroupInfoClear(SVgroupsInfo *pInfo); +void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src); /** * The create object function must be successful expect for the out of memory issue. * diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index f0d3e26a49..5d9abf852e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -443,6 +443,7 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql); */ void tscFreeSqlObj(SSqlObj *pSql); void tscFreeRegisteredSqlObj(void *pSql); +void tscFreeTableMetaHelper(void *pTableMeta); void tscCloseTscObj(STscObj *pObj); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8a50299d51..ff798a9f99 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -124,9 +124,11 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) { pVgroupInfo->inUse = pEpSet->inUse; pVgroupInfo->numOfEps = pEpSet->numOfEps; for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) { - tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN); + taosTFree(pVgroupInfo->epAddr[i].fqdn); + pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i])); pVgroupInfo->epAddr[i].port = pEpSet->port[i]; } + tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse); taosCorEndWrite(&pVgroupInfo->version); } @@ -1852,15 +1854,15 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { //just init, no need to lock SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j]; - SCMVgroupMsg *vgroupMsg = &pVgroupMsg->vgroups[j]; - pVgroups->vgId = htonl(vgroupMsg->vgId); - pVgroups->numOfEps = vgroupMsg->numOfEps; + SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j]; + pVgroups->vgId = htonl(vmsg->vgId); + pVgroups->numOfEps = vmsg->numOfEps; assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1); for (int32_t k = 0; k < pVgroups->numOfEps; ++k) { - pVgroups->epAddr[k].port = htons(vgroupMsg->epAddr[k].port); - pVgroups->epAddr[k].fqdn = strndup(vgroupMsg->epAddr[k].fqdn, tListLen(vgroupMsg->epAddr[k].fqdn)); + pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port); + pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn)); } } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 61614b56fb..0f67911bbe 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -168,8 +168,8 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0); taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true); - taosTFree(pTableMetaInfo->vgroupList); - + pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); + tscSetRetryTimer(pStream, pStream->pSql, retryDelay); return; } @@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf tscFreeSqlResult(pSql); taosTFree(pSql->pSubs); pSql->subState.numOfSub = 0; - taosTFree(pTableMetaInfo->vgroupList); + pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscSetNextLaunchTimer(pStream, pSql); } } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 49759bc4d3..7a2a242df5 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -449,7 +449,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr SVgroupTableInfo info = {{0}}; for (int32_t m = 0; m < pvg->numOfVgroups; ++m) { if (tt->vgId == pvg->vgroups[m].vgId) { - info.vgInfo = pvg->vgroups[m]; + tscSCMVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]); break; } } @@ -1645,9 +1645,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p // data in from current vnode is stored in cache and disk uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num); - tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, - pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, - numOfRowsFromSubquery, idx); + SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList; + tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql, + vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx); tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 4c5dbb079f..47c2d35a75 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -77,6 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon return 0; } + void taos_init_imp(void) { char temp[128]; @@ -124,8 +125,9 @@ void taos_init_imp(void) { double factor = (tscEmbedded == 0)? 2.0:4.0; tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); - - if (tscNumOfThreads < 2) tscNumOfThreads = 2; + if (tscNumOfThreads < 2) { + tscNumOfThreads = 2; + } tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc"); if (NULL == tscQhandle) { @@ -140,7 +142,7 @@ void taos_init_imp(void) { int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { - tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); + tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta"); tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj"); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 0235f037bd..9e861d3f28 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -115,7 +115,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { // for select query super table, the super table vgroup list can not be null in any cases. if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - assert(pTableMetaInfo->vgroupList != NULL); +// assert(pTableMetaInfo->vgroupList != NULL); // if retrieve vgroupInfo failed, the value may be null } if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) { @@ -408,6 +408,24 @@ void tscFreeRegisteredSqlObj(void *pSql) { } } +void tscFreeTableMetaHelper(void *pTableMeta) { + STableMeta* p = (STableMeta*) pTableMeta; + + int32_t numOfEps = p->vgroupInfo.numOfEps; + assert(numOfEps >= 0 && numOfEps <= TSDB_MAX_REPLICA); + + for(int32_t i = 0; i < numOfEps; ++i) { + taosTFree(p->vgroupInfo.epAddr[i].fqdn); + } + + int32_t numOfEps1 = p->corVgroupInfo.numOfEps; + assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA); + + for(int32_t i = 0; i < numOfEps1; ++i) { + taosTFree(p->corVgroupInfo.epAddr[i].fqdn); + } +} + void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; @@ -1682,8 +1700,14 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) { void tscFreeVgroupTableInfo(SArray* pVgroupTables) { if (pVgroupTables != NULL) { - for (size_t i = 0; i < taosArrayGetSize(pVgroupTables); i++) { + size_t num = taosArrayGetSize(pVgroupTables); + for (size_t i = 0; i < num; i++) { SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i); + + for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) { + taosTFree(pInfo->vgInfo.epAddr[j].fqdn); + } + taosArrayDestroy(pInfo->itemList); } taosArrayDestroy(pVgroupTables); @@ -1695,6 +1719,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); + tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); free(pTableMetaInfo); @@ -1727,13 +1752,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST pTableMetaInfo->pTableMeta = pTableMeta; if (vgroupList != NULL) { - size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups; - pTableMetaInfo->vgroupList = malloc(size); - if (pTableMetaInfo->vgroupList == NULL) { - return NULL; - } - - memcpy(pTableMetaInfo->vgroupList, vgroupList, size); + pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList); } pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES); @@ -1762,8 +1781,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); } - taosTFree(pTableMetaInfo->vgroupList); - + pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); tscColumnListDestroy(pTableMetaInfo->tagColList); pTableMetaInfo->tagColList = NULL; } @@ -2403,3 +2421,58 @@ void tscClearSqlOwner(SSqlObj* pSql) { assert(taosCheckPthreadValid(pSql->owner)); atomic_store_64(&pSql->owner, 0); } + +SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) { + if (vgroupList == NULL) { + return NULL; + } + + size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups; + SVgroupsInfo* pNew = calloc(1, size); + if (pNew == NULL) { + return NULL; + } + + pNew->numOfVgroups = vgroupList->numOfVgroups; + + for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { + SCMVgroupInfo* pNewVInfo = &pNew->vgroups[i]; + + SCMVgroupInfo* pvInfo = &vgroupList->vgroups[i]; + pNewVInfo->vgId = pvInfo->vgId; + pNewVInfo->numOfEps = pvInfo->numOfEps; + + for(int32_t j = 0; j < pvInfo->numOfEps; ++j) { + pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn); + pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port; + } + } + + return pNew; +} + +void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) { + if (vgroupList == NULL) { + return NULL; + } + + for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) { + SCMVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i]; + + for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) { + taosTFree(pVgroupInfo->epAddr[j].fqdn); + } + } + + taosTFree(vgroupList); + return NULL; +} + +void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) { + dst->vgId = src->vgId; + dst->numOfEps = src->numOfEps; + for(int32_t i = 0; i < dst->numOfEps; ++i) { + dst->epAddr[i].port = src->epAddr[i].port; + dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn); + } +} diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 9965047a99..143153be49 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1472,7 +1472,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { int32_t numOfTable = htonl(pInfo->numOfTables); // reserve space - int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupInfo) + sizeof(SVgroupsMsg); + int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupMsg) + sizeof(SVgroupsMsg); for (int32_t i = 0; i < numOfTable; ++i) { char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN) * i; SSuperTableObj *pTable = mnodeGetSuperTable(stableName); @@ -1521,6 +1521,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) { if (pVgroup == NULL) continue; pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId); + pVgroupMsg->vgroups[vgSize].numOfEps = 0; + for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode; if (pDnode == NULL) break; -- GitLab