提交 09c39cd2 编写于 作者: H Haojun Liao

[td-1373] fix memory leak

上级 c6b30184
......@@ -58,6 +58,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
pSubQueryInfo1->tsBuf = output1;
pSubQueryInfo2->tsBuf = output2;
TSKEY st = taosGetTimestampUs();
// no result generated, return directly
if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) {
tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql);
......@@ -123,6 +125,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
tVariantDestroy(&elem1.tag);
tVariantDestroy(&elem2.tag);
} else {
pLimit->offset -= 1;
}
......@@ -157,9 +162,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufDestroy(pSupporter1->pTSBuf);
tsBufDestroy(pSupporter2->pTSBuf);
TSKEY et = taosGetTimestampUs();
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d", pSql, numOfInput1, numOfInput2, output1->numOfTotal,
output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1));
"intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal,
output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st);
return output1->numOfTotal;
}
......@@ -952,11 +958,21 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
// update the records for each subquery in parent sql object.
for (int32_t i = 0; i < pState->numOfSub; ++i) {
if (pParentSql->pSubs[i] == NULL) {
tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i);
continue;
}
SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
pRes1->numOfClauseTotal += pRes1->numOfRows;
if (pRes1->row > 0 && pRes1->numOfRows > 0) {
tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
assert(pRes1->row < pRes1->numOfRows);
} else {
pRes1->numOfClauseTotal += pRes1->numOfRows;
tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64, pParentSql, pParentSql->pSubs[i], i,
pRes1->numOfRows, pRes1->numOfTotal);
}
}
// data has retrieved to client, build the join results
......@@ -998,8 +1014,11 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if (!tscHasReachLimitation(pQueryInfo, pRes)) {
if (pRes->row >= pRes->numOfRows) {
// no data left in current result buffer
hasData = false;
// The current query is completed for the active vnode, try next vnode if exists
// If it is completed, no need to fetch anymore.
if (!pRes->completed) {
numOfFetch++;
}
......@@ -1016,20 +1035,24 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if (hasData) {
tscBuildResFromSubqueries(pSql);
return;
} else if (numOfFetch <= 0) {
}
// If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode
// super table projection query.
if (numOfFetch <= 0) {
bool tryNextVnode = false;
SSqlObj* pp = pSql->pSubs[0];
SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0);
if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) {
// get the number of subquery that need to retrieve the next vnode.
if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) {
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
pSql->subState.numOfRemain++;
}
}
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
......@@ -1040,7 +1063,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
......@@ -1068,11 +1091,11 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
if (tryNextVnode) {
return;
} else {
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
}
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0);
} else {
......@@ -2130,7 +2153,7 @@ static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t column
assert(pInfo->pSqlExpr != NULL);
*bytes = pInfo->pSqlExpr->resBytes;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows;
char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes);
return pData;
}
......@@ -2142,11 +2165,13 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
int32_t numOfRes = INT32_MAX;
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows));
int32_t remain = pSub->res.numOfRows - pSub->res.row;
numOfRes = (int32_t)(MIN(numOfRes, remain));
}
if (numOfRes == 0) {
......@@ -2172,14 +2197,23 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t i = 0; i < numOfExprs; ++i) {
SColumnIndex* pIndex = &pRes->pColumnIndex[i];
SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
SSqlRes* pRes1 = &pSql->pSubs[pIndex->tableIndex]->res;
SSqlCmd* pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd;
char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes);
memcpy(data, pData, bytes * numOfRes);
data += bytes * numOfRes;
pRes1->row = numOfRes;
}
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
pSub->res.row += numOfRes;
assert(pSub->res.row <= pSub->res.numOfRows);
}
pRes->numOfRows = numOfRes;
......
......@@ -400,11 +400,11 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pDa
if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first
writeDataToDisk(pTSBuf);
tVariantAssign(&pTSBuf->block.tag, tag);
} else {
expandBuffer(ptsData, len);
}
tVariantAssign(&pTSBuf->block.tag, tag);
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value
......@@ -662,7 +662,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
return false;
}
int32_t blockIndex = pCur->order == TSDB_ORDER_ASC ? 0 : pBlockInfo->numOfBlocks - 1;
int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1);
tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex);
break;
......@@ -688,8 +688,7 @@ void tsBufResetPos(STSBuf* pTSBuf) {
}
STSElem tsBufGetElem(STSBuf* pTSBuf) {
STSElem elem1 = {.vnode = -1};
STSElem elem1 = {.vnode = -1};
if (pTSBuf == NULL) {
return elem1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册