提交 aa55054e 编写于 作者: H Haojun Liao

[td-1747]

上级 2cac39d6
......@@ -231,10 +231,11 @@ int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex);
void tscDoQuery(SSqlObj* pSql);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src);
/**
* The create object function must be successful expect for the out of memory issue.
*
......
......@@ -443,6 +443,7 @@ void tscPartiallyFreeSqlObj(SSqlObj *pSql);
*/
void tscFreeSqlObj(SSqlObj *pSql);
void tscFreeRegisteredSqlObj(void *pSql);
void tscFreeTableMetaHelper(void *pTableMeta);
void tscCloseTscObj(STscObj *pObj);
......
......@@ -124,9 +124,11 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
pVgroupInfo->inUse = pEpSet->inUse;
pVgroupInfo->numOfEps = pEpSet->numOfEps;
for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
tstrncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
taosTFree(pVgroupInfo->epAddr[i].fqdn);
pVgroupInfo->epAddr[i].fqdn = strndup(pEpSet->fqdn[i], tListLen(pEpSet->fqdn[i]));
pVgroupInfo->epAddr[i].port = pEpSet->port[i];
}
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
taosCorEndWrite(&pVgroupInfo->version);
}
......@@ -1852,15 +1854,15 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
//just init, no need to lock
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
SCMVgroupMsg *vgroupMsg = &pVgroupMsg->vgroups[j];
pVgroups->vgId = htonl(vgroupMsg->vgId);
pVgroups->numOfEps = vgroupMsg->numOfEps;
SCMVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
pVgroups->vgId = htonl(vmsg->vgId);
pVgroups->numOfEps = vmsg->numOfEps;
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
pVgroups->epAddr[k].port = htons(vgroupMsg->epAddr[k].port);
pVgroups->epAddr[k].fqdn = strndup(vgroupMsg->epAddr[k].fqdn, tListLen(vgroupMsg->epAddr[k].fqdn));
pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
}
}
......
......@@ -168,7 +168,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), true);
taosTFree(pTableMetaInfo->vgroupList);
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
return;
......@@ -275,7 +275,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
tscFreeSqlResult(pSql);
taosTFree(pSql->pSubs);
pSql->subState.numOfSub = 0;
taosTFree(pTableMetaInfo->vgroupList);
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetNextLaunchTimer(pStream, pSql);
}
}
......
......@@ -449,7 +449,7 @@ void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArr
SVgroupTableInfo info = {{0}};
for (int32_t m = 0; m < pvg->numOfVgroups; ++m) {
if (tt->vgId == pvg->vgroups[m].vgId) {
info.vgInfo = pvg->vgroups[m];
tscSCMVgroupInfoCopy(&info.vgInfo, &pvg->vgroups[m]);
break;
}
}
......@@ -1645,9 +1645,9 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = (uint32_t)(trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num);
SVgroupsInfo* vgroupsInfo = pTableMetaInfo->vgroupList;
tscDebug("%p sub:%p all data retrieved from ep:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
pTableMetaInfo->vgroupList->vgroups[0].epAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx);
vgroupsInfo->vgroups[0].epAddr[0].fqdn, vgroupsInfo->vgroups[0].vgId, numOfRowsFromSubquery, idx);
tColModelCompact(pDesc->pColumnModel, trsupport->localBuffer, pDesc->pColumnModel->capacity);
......
......@@ -77,6 +77,7 @@ int32_t tscInitRpc(const char *user, const char *secretEncrypt, void **pDnodeCon
return 0;
}
void taos_init_imp(void) {
char temp[128];
......@@ -124,8 +125,9 @@ void taos_init_imp(void) {
double factor = (tscEmbedded == 0)? 2.0:4.0;
tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor);
if (tscNumOfThreads < 2) tscNumOfThreads = 2;
if (tscNumOfThreads < 2) {
tscNumOfThreads = 2;
}
tscQhandle = taosInitScheduler(queueSize, tscNumOfThreads, "tsc");
if (NULL == tscQhandle) {
......@@ -140,7 +142,7 @@ void taos_init_imp(void) {
int64_t refreshTime = 10; // 10 seconds by default
if (tscMetaCache == NULL) {
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
}
......
......@@ -115,7 +115,7 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) {
// for select query super table, the super table vgroup list can not be null in any cases.
if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
assert(pTableMetaInfo->vgroupList != NULL);
// assert(pTableMetaInfo->vgroupList != NULL); // if retrieve vgroupInfo failed, the value may be null
}
if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) {
......@@ -408,6 +408,24 @@ void tscFreeRegisteredSqlObj(void *pSql) {
}
}
void tscFreeTableMetaHelper(void *pTableMeta) {
STableMeta* p = (STableMeta*) pTableMeta;
int32_t numOfEps = p->vgroupInfo.numOfEps;
assert(numOfEps >= 0 && numOfEps <= TSDB_MAX_REPLICA);
for(int32_t i = 0; i < numOfEps; ++i) {
taosTFree(p->vgroupInfo.epAddr[i].fqdn);
}
int32_t numOfEps1 = p->corVgroupInfo.numOfEps;
assert(numOfEps1 >= 0 && numOfEps1 <= TSDB_MAX_REPLICA);
for(int32_t i = 0; i < numOfEps1; ++i) {
taosTFree(p->corVgroupInfo.epAddr[i].fqdn);
}
}
void tscFreeSqlObj(SSqlObj* pSql) {
if (pSql == NULL || pSql->signature != pSql) {
return;
......@@ -1682,8 +1700,14 @@ void tscClearSubqueryInfo(SSqlCmd* pCmd) {
void tscFreeVgroupTableInfo(SArray* pVgroupTables) {
if (pVgroupTables != NULL) {
for (size_t i = 0; i < taosArrayGetSize(pVgroupTables); i++) {
size_t num = taosArrayGetSize(pVgroupTables);
for (size_t i = 0; i < num; i++) {
SVgroupTableInfo* pInfo = taosArrayGet(pVgroupTables, i);
for(int32_t j = 0; j < pInfo->vgInfo.numOfEps; ++j) {
taosTFree(pInfo->vgInfo.epAddr[j].fqdn);
}
taosArrayDestroy(pInfo->itemList);
}
taosArrayDestroy(pVgroupTables);
......@@ -1695,6 +1719,7 @@ void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool rem
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo, removeFromCache);
free(pTableMetaInfo);
......@@ -1727,13 +1752,7 @@ STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, ST
pTableMetaInfo->pTableMeta = pTableMeta;
if (vgroupList != NULL) {
size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
pTableMetaInfo->vgroupList = malloc(size);
if (pTableMetaInfo->vgroupList == NULL) {
return NULL;
}
memcpy(pTableMetaInfo->vgroupList, vgroupList, size);
pTableMetaInfo->vgroupList = tscVgroupInfoClone(vgroupList);
}
pTableMetaInfo->tagColList = taosArrayInit(4, POINTER_BYTES);
......@@ -1762,8 +1781,7 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache)
taosCacheRelease(tscMetaCache, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache);
}
taosTFree(pTableMetaInfo->vgroupList);
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscColumnListDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL;
}
......@@ -2403,3 +2421,58 @@ void tscClearSqlOwner(SSqlObj* pSql) {
assert(taosCheckPthreadValid(pSql->owner));
atomic_store_64(&pSql->owner, 0);
}
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *vgroupList) {
if (vgroupList == NULL) {
return NULL;
}
size_t size = sizeof(SVgroupsInfo) + sizeof(SCMVgroupInfo) * vgroupList->numOfVgroups;
SVgroupsInfo* pNew = calloc(1, size);
if (pNew == NULL) {
return NULL;
}
pNew->numOfVgroups = vgroupList->numOfVgroups;
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pNewVInfo = &pNew->vgroups[i];
SCMVgroupInfo* pvInfo = &vgroupList->vgroups[i];
pNewVInfo->vgId = pvInfo->vgId;
pNewVInfo->numOfEps = pvInfo->numOfEps;
for(int32_t j = 0; j < pvInfo->numOfEps; ++j) {
pNewVInfo->epAddr[j].fqdn = strdup(pvInfo->epAddr[j].fqdn);
pNewVInfo->epAddr[j].port = pvInfo->epAddr[j].port;
}
}
return pNew;
}
void* tscVgroupInfoClear(SVgroupsInfo *vgroupList) {
if (vgroupList == NULL) {
return NULL;
}
for(int32_t i = 0; i < vgroupList->numOfVgroups; ++i) {
SCMVgroupInfo* pVgroupInfo = &vgroupList->vgroups[i];
for(int32_t j = 0; j < pVgroupInfo->numOfEps; ++j) {
taosTFree(pVgroupInfo->epAddr[j].fqdn);
}
}
taosTFree(vgroupList);
return NULL;
}
void tscSCMVgroupInfoCopy(SCMVgroupInfo* dst, const SCMVgroupInfo* src) {
dst->vgId = src->vgId;
dst->numOfEps = src->numOfEps;
for(int32_t i = 0; i < dst->numOfEps; ++i) {
dst->epAddr[i].port = src->epAddr[i].port;
dst->epAddr[i].fqdn = strdup(src->epAddr[i].fqdn);
}
}
......@@ -1472,7 +1472,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
int32_t numOfTable = htonl(pInfo->numOfTables);
// reserve space
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupInfo) + sizeof(SVgroupsMsg);
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupMsg) + sizeof(SVgroupsMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN) * i;
SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
......@@ -1521,6 +1521,8 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
if (pVgroup == NULL) continue;
pVgroupMsg->vgroups[vgSize].vgId = htonl(pVgroup->vgId);
pVgroupMsg->vgroups[vgSize].numOfEps = 0;
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
if (pDnode == NULL) break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册