提交 dcc175ed 编写于 作者: H hjxilinx

prepare all subqueries object before launch subqueries. and add some check for...

prepare all subqueries object before launch subqueries. and add some check for global status object  during super table query.
上级 d1c31282
......@@ -870,28 +870,26 @@ int tscProcessSql(SSqlObj *pSql) {
return doProcessSql(pSql);
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t vnodeIndex, int32_t numOfVnodes, SRetrieveSupport *pTrs,
tOrderDescriptor *pDesc, tColModel *pModel, tExtMemBuffer **pMemoryBuf,
SSubqueryState *pState) {
pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
/*
* if i > 0, at least one sub query is issued, the allocated resource is
* freed by it when subquery completed.
*/
if (vnodeIndex == 0) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfVnodes);
tfree(pState);
if (pTrs != NULL) {
tfree(pTrs->localBuffer);
pthread_mutex_unlock(&pTrs->queryMutex);
pthread_mutex_destroy(&pTrs->queryMutex);
tfree(pTrs);
}
static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs, SSubqueryState* pState) {
assert(numOfSubs <= pSql->numOfSubs && numOfSubs >= 0 && pState != NULL);
for(int32_t i = 0; i < numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
assert(pSub != NULL);
SRetrieveSupport* pSupport = pSub->param;
tfree(pSupport->localBuffer);
pthread_mutex_unlock(&pSupport->queryMutex);
pthread_mutex_destroy(&pSupport->queryMutex);
tfree(pSupport);
tscFreeSqlObj(pSub);
}
free(pState);
}
int tscLaunchMetricSubQueries(SSqlObj *pSql) {
......@@ -911,8 +909,9 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
const uint32_t nBufferSize = (1 << 16); // 64KB
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
int32_t numOfVnodes = pMeterMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfVnodes > 0);
int32_t numOfSubQueries = pMeterMetaInfo->pMetricMeta->numOfVnodes;
assert(numOfSubQueries > 0);
int32_t ret = tscLocalReducerEnvCreate(pSql, &pMemoryBuf, &pDesc, &pModel, nBufferSize);
if (ret != 0) {
......@@ -923,36 +922,33 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
return pRes->code;
}
pSql->pSubs = malloc(POINTER_BYTES * numOfVnodes);
pSql->numOfSubs = numOfVnodes;
pSql->pSubs = calloc(numOfSubQueries, POINTER_BYTES);
pSql->numOfSubs = numOfSubQueries;
tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfVnodes);
tscTrace("%p retrieved query data from %d vnode(s)", pSql, numOfSubQueries);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = numOfVnodes;
pState->numOfTotal = numOfSubQueries;
pRes->code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < numOfVnodes; ++i) {
if (pRes->code == TSDB_CODE_QUERY_CANCELLED || pRes->code == TSDB_CODE_CLI_OUT_OF_MEMORY) {
/*
* during launch sub queries, if the master query is cancelled. the remain is ignored and set the retrieveDoneRec
* to the value of remaining not built sub-queries. So, the already issued sub queries can successfully free
* allocated resources.
*/
pState->numOfCompleted = (numOfVnodes - i);
doCleanupSubqueries(pSql, i, numOfVnodes, NULL, pDesc, pModel, pMemoryBuf, pState);
if (i == 0) {
return pSql->res.code;
}
int32_t i = 0;
for (; i < numOfSubQueries; ++i) {
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
if (trs == NULL) {
tscError("%p failed to malloc buffer for SRetrieveSupport, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
}
SRetrieveSupport *trs = (SRetrieveSupport *)calloc(1, sizeof(SRetrieveSupport));
trs->pExtMemBuffer = pMemoryBuf;
trs->pOrderDescriptor = pDesc;
trs->pState = pState;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
if (trs->localBuffer == NULL) {
tscError("%p failed to malloc buffer for local buffer, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
tfree(trs);
break;
}
trs->subqueryIndex = i;
trs->pParentSqlObj = pSql;
trs->pFinalColModel = pModel;
......@@ -963,15 +959,10 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
pthread_mutexattr_destroy(&mutexattr);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pSql, trs, NULL);
if (pNew == NULL) {
pState->numOfCompleted = (numOfVnodes - i);
doCleanupSubqueries(pSql, i, numOfVnodes, trs, pDesc, pModel, pMemoryBuf, pState);
if (i == 0) {
return pSql->res.code;
}
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
tfree(trs->localBuffer);
tfree(trs);
break;
}
......@@ -979,9 +970,31 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
if (pSql->cmd.tsBuf) {
pNew->cmd.tsBuf = tsBufClone(pSql->cmd.tsBuf);
}
tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
tscProcessSql(pNew);
tscTrace("%p sub:%p create subquery success. orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
}
if (i < numOfSubQueries) {
tscError("%p failed to prepare subquery structure and launch subqueries", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
doCleanupSubqueries(pSql, i, pState);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pModel, numOfSubQueries);
doCleanupSubqueries(pSql, i, pState);
return pRes->code;
}
for(int32_t j = 0; j < numOfSubQueries; ++j) {
SSqlObj* pSub = pSql->pSubs[j];
SRetrieveSupport* pSupport = pSub->param;
tscTrace("%p sub:%p launch subquery, orderOfSub:%d.", pSql, pSub, pSupport->subqueryIndex);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
......@@ -1032,10 +1045,13 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL);
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
/* retrieved in subquery failed. OR query cancelled in retrieve phase. */
if (trsupport->pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
trsupport->pState->code = -(int)pPObj->res.code;
if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
pState->code = -(int)pPObj->res.code;
/*
* kill current sub-query connection, which may retrieve data from vnodes;
......@@ -1044,15 +1060,15 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
pSql->res.numOfRows = 0;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts
tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
subqueryIndex, trsupport->pState->code);
subqueryIndex, pState->code);
}
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
subqueryIndex, trsupport->pState->code);
subqueryIndex, pState->code);
} else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && trsupport->pState->code == TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
......@@ -1071,7 +1087,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscError("%p sub:%p failed to create new subquery sqlobj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql);
trsupport->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return;
}
......@@ -1079,24 +1095,26 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscProcessSql(pNew);
return;
} else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows);
atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
numOfRows, subqueryIndex, trsupport->pState->code);
numOfRows, subqueryIndex, pState->code);
}
}
if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) {
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < pState->numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, trsupport->pState->numOfTotal,
trsupport->pState->code);
pPObj->res.code = -(trsupport->pState->code);
tscError("%p retrieve from %d vnode(s) completed,code:%d.FAILED.", pPObj, pState->numOfTotal,
pState->code);
pPObj->res.code = -(pState->code);
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
trsupport->pState->numOfTotal);
pState->numOfTotal);
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
......@@ -1134,10 +1152,14 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
return;
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || trsupport->pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubRetrievalError(trsupport, pSql, numOfRows);
}
......@@ -1150,10 +1172,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
if (numOfRows > 0) {
assert(pRes->numOfRows == numOfRows);
atomic_add_fetch_64(&trsupport->pState->numOfRetrievedRows, numOfRows);
atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscTrace("%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d", pPObj, pSql,
pRes->numOfRows, trsupport->pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
pRes->numOfRows, pState->numOfRetrievedRows, pSvd->ip, pSvd->vnode, idx);
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
......@@ -1208,8 +1230,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
/* set no disk space error info, and abort retry */
return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_CLI_NO_DISKSPACE);
}
if (atomic_add_fetch_32(&trsupport->pState->numOfCompleted, 1) < trsupport->pState->numOfTotal) {
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < pState->numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
return tscFreeSubSqlObj(trsupport, pSql);
}
......@@ -1217,10 +1241,10 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
pDesc->pSchema->maxCapacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscTrace("%p retrieve from %d vnodes completed.final NumOfRows:%d,start to build loser tree", pPObj,
trsupport->pState->numOfTotal, trsupport->pState->numOfCompleted);
pState->numOfTotal, pState->numOfCompleted);
tscClearInterpInfo(&pPObj->cmd);
tscCreateLocalReducer(trsupport->pExtMemBuffer, trsupport->pState->numOfTotal, pDesc, trsupport->pFinalColModel,
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel,
&pPObj->cmd, &pPObj->res);
tscTrace("%p build loser tree completed", pPObj);
......@@ -1229,7 +1253,8 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
pPObj->res.row = 0;
// only free once
free(trsupport->pState);
tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql);
if (pPObj->fp == NULL) {
......@@ -1322,13 +1347,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
pSvd = &vnodeInfo->vpeerDesc[vnodeInfo->index];
}
if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS || trsupport->pState->code != TSDB_CODE_SUCCESS) {
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
trsupport->pParentSqlObj->numOfSubs == pState->numOfTotal);
if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
// metric query is killed, Note: code must be less than 0
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
if (trsupport->pParentSqlObj->res.code != TSDB_CODE_SUCCESS) {
code = -(int)(trsupport->pParentSqlObj->res.code);
} else {
code = trsupport->pState->code;
code = pState->code;
}
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", trsupport->pParentSqlObj, pSql,
trsupport->subqueryIndex, code);
......@@ -1344,7 +1373,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry count,set global code:%d", trsupport->pParentSqlObj, pSql, code);
atomic_val_compare_exchange_32(&trsupport->pState->code, 0, code);
atomic_val_compare_exchange_32(&pState->code, 0, code);
} else { // does not reach the maximum retry count, go on
tscTrace("%p sub:%p failed code:%d, retry:%d", trsupport->pParentSqlObj, pSql, code, trsupport->numOfRetry);
......@@ -1353,7 +1382,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex);
trsupport->pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
assert(pNew->cmd.pMeterInfo[0]->pMeterMeta != NULL && pNew->cmd.pMeterInfo[0]->pMetricMeta != NULL);
......@@ -1363,17 +1392,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
}
}
if (trsupport->pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->subqueryIndex, trsupport->pState->code);
trsupport->subqueryIndex, pState->code);
} else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
trsupport->subqueryIndex, trsupport->pState->code);
trsupport->subqueryIndex, pState->code);
}
tscRetrieveFromVnodeCallBack(param, tres, trsupport->pState->code);
tscRetrieveFromVnodeCallBack(param, tres, pState->code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册