提交 374bcb86 编写于 作者: H Haojun Liao

[td-225] refactor code.

上级 457a7010
...@@ -1415,7 +1415,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { ...@@ -1415,7 +1415,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
} }
trs->subqueryIndex = i; trs->subqueryIndex = i;
trs->pParentSqlObj = pSql; trs->pParentSql = pSql;
trs->pFinalColModel = pModel; trs->pFinalColModel = pModel;
pthread_mutexattr_t mutexattr; pthread_mutexattr_t mutexattr;
...@@ -1500,7 +1500,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES ...@@ -1500,7 +1500,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code)); tscError("sub:%p failed to flush data to disk, reason:%s", tres, tstrerror(code));
#endif #endif
SSqlObj* pParentSql = trsupport->pParentSqlObj; SSqlObj* pParentSql = trsupport->pParentSql;
pParentSql->res.code = code; pParentSql->res.code = code;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
...@@ -1509,8 +1509,45 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES ...@@ -1509,8 +1509,45 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
tscHandleSubqueryError(trsupport, tres, pParentSql->res.code); tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
} }
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, int32_t code) {
SSqlObj *pParentSql = trsupport->pParentSql;
int32_t subqueryIndex = trsupport->subqueryIndex;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results
trsupport->localBuffer->num = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
tstrerror(code), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql);
// todo add to async res or not??
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex);
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return pParentSql->res.code;
}
taos_free_result(pSql);
return tscProcessSql(pNew);
}
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
SSqlObj *pParentSql = trsupport->pParentSqlObj; SSqlObj *pParentSql = trsupport->pParentSql;
int32_t subqueryIndex = trsupport->subqueryIndex; int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL); assert(pSql != NULL);
...@@ -1536,31 +1573,9 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -1536,31 +1573,9 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
subqueryIndex, pParentSql->res.code); subqueryIndex, pParentSql->res.code);
} else { } else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
/* if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results
trsupport->localBuffer->num = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscDebug("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql,
tstrerror(numOfRows), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql);
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return; return;
} }
tscProcessSql(pNew);
return;
} else { // reach the maximum retry count, abort } else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql, tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
...@@ -1601,7 +1616,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -1601,7 +1616,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) { static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* pSql) {
int32_t idx = trsupport->subqueryIndex; int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj; SSqlObj * pParentSql = trsupport->pParentSql;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
...@@ -1611,7 +1626,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1611,7 +1626,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// data in from current vnode is stored in cache and disk // data in from current vnode is stored in cache and disk
uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num; uint32_t numOfRowsFromSubquery = trsupport->pExtMemBuffer[idx]->numOfTotalElems + trsupport->localBuffer->num;
tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, tscDebug("%p sub:%p all data retrieved from ip:%s, vgId:%d, numOfRows:%d, orderOfSub:%d", pParentSql, pSql,
pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId, pTableMetaInfo->vgroupList->vgroups[0].ipAddr[0].fqdn, pTableMetaInfo->vgroupList->vgroups[0].vgId,
numOfRowsFromSubquery, idx); numOfRowsFromSubquery, idx);
...@@ -1625,15 +1640,14 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1625,15 +1640,14 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
trsupport->localBuffer->num, colInfo); trsupport->localBuffer->num, colInfo);
#endif #endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
tsAvailTmpDirGB, tsMinimalTmpDirGB); tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE); return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
return;
} }
// each result for a vnode is ordered as an independant list, // each result for a vnode is ordered as an independant list,
// then used as an input of loser tree for disk-based merge routine // then used as an input of loser tree for disk-based merge
int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType); int32_t code = tscFlushTmpBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pQueryInfo->groupbyExpr.orderType);
if (code != 0) { // set no disk space error info, and abort retry if (code != 0) { // set no disk space error info, and abort retry
return tscAbortFurtherRetryRetrieval(trsupport, pSql, code); return tscAbortFurtherRetryRetrieval(trsupport, pSql, code);
...@@ -1641,7 +1655,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1641,7 +1655,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
int32_t remain = -1; int32_t remain = -1;
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) { if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, tscDebug("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfTotal - remain); pState->numOfTotal - remain);
return tscFreeSubSqlObj(trsupport, pSql); return tscFreeSubSqlObj(trsupport, pSql);
...@@ -1650,29 +1664,29 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -1650,29 +1664,29 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
// all sub-queries are returned, start to local merge process // all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pPObj, tscDebug("%p retrieve from %d vnodes completed.final NumOfRows:%" PRId64 ",start to build loser tree", pParentSql,
pState->numOfTotal, pState->numOfRetrievedRows); pState->numOfTotal, pState->numOfRetrievedRows);
SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0); SQueryInfo *pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
tscClearInterpInfo(pPQueryInfo); tscClearInterpInfo(pPQueryInfo);
tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pPObj); tscCreateLocalReducer(trsupport->pExtMemBuffer, pState->numOfTotal, pDesc, trsupport->pFinalColModel, pParentSql);
tscDebug("%p build loser tree completed", pPObj); tscDebug("%p build loser tree completed", pParentSql);
pPObj->res.precision = pSql->res.precision; pParentSql->res.precision = pSql->res.precision;
pPObj->res.numOfRows = 0; pParentSql->res.numOfRows = 0;
pPObj->res.row = 0; pParentSql->res.row = 0;
// only free once // only free once
tfree(trsupport->pState); tfree(trsupport->pState);
tscFreeSubSqlObj(trsupport, pSql); tscFreeSubSqlObj(trsupport, pSql);
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
pPObj->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
if (pPObj->res.code == TSDB_CODE_SUCCESS) { if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pPObj->fp)(pPObj->param, pPObj, 0); (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else { } else {
tscQueueAsyncRes(pPObj); tscQueueAsyncRes(pParentSql);
} }
} }
...@@ -1680,22 +1694,48 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1680,22 +1694,48 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
SRetrieveSupport *trsupport = (SRetrieveSupport *)param; SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
int32_t idx = trsupport->subqueryIndex; int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj; SSqlObj * pParentSql = trsupport->pParentSql;
SSqlObj *pSql = (SSqlObj *)tres; SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately if (pSql == NULL) { // sql object has been released in error process, return immediately
tscDebug("%p subquery has been released, idx:%d, abort", pPObj, idx); tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx);
return; return;
} }
SSubqueryState* pState = trsupport->pState; SSubqueryState* pState = trsupport->pState;
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pPObj->numOfSubs == pState->numOfTotal); assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time // query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex); pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
return tscHandleSubqueryError(trsupport, pSql, numOfRows); SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));
tscHandleSubqueryError(param, tres, numOfRows);
return;
}
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
return;
}
} else {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); // set global code and abort
}
tscHandleSubqueryError(param, tres, numOfRows);
return;
} }
SSqlRes * pRes = &pSql->res; SSqlRes * pRes = &pSql->res;
...@@ -1705,14 +1745,13 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1705,14 +1745,13 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(pRes->numOfRows == numOfRows); assert(pRes->numOfRows == numOfRows);
int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows); int64_t num = atomic_add_fetch_64(&pState->numOfRetrievedRows, numOfRows);
tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pPObj, pSql, tscDebug("%p sub:%p retrieve numOfRows:%" PRId64 " totalNumOfRows:%" PRIu64 " from ip:%s, orderOfSub:%d", pParentSql, pSql,
pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx); pRes->numOfRows, pState->numOfRetrievedRows, pSql->ipList.fqdn[pSql->ipList.inUse], idx);
if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) { if (num > tsMaxNumOfOrderedResults && tscIsProjectionQueryOnSTable(pQueryInfo, 0)) {
tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64, tscError("%p sub:%p num of OrderedRes is too many, max allowed:%" PRId32 " , current:%" PRId64,
pPObj, pSql, tsMaxNumOfOrderedResults, num); pParentSql, pSql, tsMaxNumOfOrderedResults, num);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY); return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_SORTED_RES_TOO_MANY);
return;
} }
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
...@@ -1723,11 +1762,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1723,11 +1762,11 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo); tColModelDisplayEx(pDesc->pColumnModel, pRes->data, pRes->numOfRows, pRes->numOfRows, colInfo);
#endif #endif
if (tsTotalTmpDirGB != 0 && tsAvailTmpDirGB < tsMinimalTmpDirGB) { // no disk space for tmp directory
tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pPObj, pSql, if (tsTotalTmpDirGB != 0 && tsAvailTmpDirectorySpace < tsReservedTmpDirectorySpace) {
tsAvailTmpDirGB, tsMinimalTmpDirGB); tscError("%p sub:%p client disk space remain %.3f GB, need at least %.3f GB, stop query", pParentSql, pSql,
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE); tsAvailTmpDirectorySpace, tsReservedTmpDirectorySpace);
return; return tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_TSC_NO_DISKSPACE);
} }
int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data, int32_t ret = saveToBuffer(trsupport->pExtMemBuffer[idx], pDesc, trsupport->localBuffer, pRes->data,
...@@ -1772,72 +1811,50 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu ...@@ -1772,72 +1811,50 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SRetrieveSupport *trsupport = (SRetrieveSupport *) param; SRetrieveSupport *trsupport = (SRetrieveSupport *) param;
SSqlObj* pParentSql = trsupport->pParentSqlObj; SSqlObj* pParentSql = trsupport->pParentSql;
SSqlObj* pSql = (SSqlObj *) tres; SSqlObj* pSql = (SSqlObj *) tres;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); assert(pSql->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0); STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0]; SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[trsupport->subqueryIndex];
SSubqueryState* pState = trsupport->pState; // stable query killed or other subquery failed, all query stopped
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// todo set error code
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
// stable query is killed, abort further retry
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { tscHandleSubqueryError(param, tres, code);
code = pParentSql->res.code; return;
}
tscDebug("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql,
trsupport->subqueryIndex, tstrerror(code));
} }
/* /*
* if a query on a vnode is failed, all retrieve operations from vnode that occurs later * if a subquery on a vnode failed, all retrieve operations from vnode that occurs later
* than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack * than this one are actually not necessary, we simply call the tscRetrieveFromDnodeCallBack
* function to abort current and remain retrieve process. * function to abort current and remain retrieve process.
* *
* NOTE: thread safe is required. * NOTE: thread safe is required.
*/ */
if (code != TSDB_CODE_SUCCESS) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) { assert(code == taos_errno(pSql));
tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);
} else { // does not reach the maximum retry time, go on
tscDebug("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex);
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo->pTableMetaInfo[0]->pTableMeta != NULL);
taos_free_result(pSql); if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscProcessSql(pNew); tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
return; return;
} }
} } else {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
} }
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
tscDebug("%p sub:%p query failed,ip:%s,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pParentSql->res.code);
tscHandleSubqueryError(param, tres, pParentSql->res.code); tscHandleSubqueryError(param, tres, pParentSql->res.code);
} else { // success, proceed to retrieve data from dnode return;
tscDebug("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql, }
tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
...@@ -1845,8 +1862,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1845,8 +1862,6 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
} else { } else {
taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param); taos_fetch_rows_a(tres, tscRetrieveFromDnodeCallBack, param);
} }
}
} }
static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) {
......
...@@ -128,10 +128,10 @@ extern float tsTotalLogDirGB; ...@@ -128,10 +128,10 @@ extern float tsTotalLogDirGB;
extern float tsTotalTmpDirGB; extern float tsTotalTmpDirGB;
extern float tsTotalDataDirGB; extern float tsTotalDataDirGB;
extern float tsAvailLogDirGB; extern float tsAvailLogDirGB;
extern float tsAvailTmpDirGB; extern float tsAvailTmpDirectorySpace;
extern float tsAvailDataDirGB; extern float tsAvailDataDirGB;
extern float tsMinimalLogDirGB; extern float tsMinimalLogDirGB;
extern float tsMinimalTmpDirGB; extern float tsReservedTmpDirectorySpace;
extern float tsMinimalDataDirGB; extern float tsMinimalDataDirGB;
extern int32_t tsTotalMemoryMB; extern int32_t tsTotalMemoryMB;
extern int32_t tsVersion; extern int32_t tsVersion;
......
...@@ -170,9 +170,9 @@ int64_t tsStreamMax; ...@@ -170,9 +170,9 @@ int64_t tsStreamMax;
int32_t tsNumOfCores = 1; int32_t tsNumOfCores = 1;
float tsTotalTmpDirGB = 0; float tsTotalTmpDirGB = 0;
float tsTotalDataDirGB = 0; float tsTotalDataDirGB = 0;
float tsAvailTmpDirGB = 0; float tsAvailTmpDirectorySpace = 0;
float tsAvailDataDirGB = 0; float tsAvailDataDirGB = 0;
float tsMinimalTmpDirGB = 0.1; float tsReservedTmpDirectorySpace = 0.1;
float tsMinimalDataDirGB = 0.5; float tsMinimalDataDirGB = 0.5;
int32_t tsTotalMemoryMB = 0; int32_t tsTotalMemoryMB = 0;
int32_t tsVersion = 0; int32_t tsVersion = 0;
...@@ -807,7 +807,7 @@ static void doInitGlobalConfig() { ...@@ -807,7 +807,7 @@ static void doInitGlobalConfig() {
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "minimalTmpDirGB"; cfg.option = "minimalTmpDirGB";
cfg.ptr = &tsMinimalTmpDirGB; cfg.ptr = &tsReservedTmpDirectorySpace;
cfg.valType = TAOS_CFG_VTYPE_FLOAT; cfg.valType = TAOS_CFG_VTYPE_FLOAT;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0.001; cfg.minValue = 0.001;
......
...@@ -326,12 +326,12 @@ bool taosGetDisk() { ...@@ -326,12 +326,12 @@ bool taosGetDisk() {
if (statvfs("/tmp", &info)) { if (statvfs("/tmp", &info)) {
//tsTotalTmpDirGB = 0; //tsTotalTmpDirGB = 0;
//tsAvailTmpDirGB = 0; //tsAvailTmpDirectorySpace = 0;
uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno)); uError("failed to get disk size, tmpDir:/tmp errno:%s", strerror(errno));
return false; return false;
} else { } else {
tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit); tsTotalTmpDirGB = (float)((double)info.f_blocks * (double)info.f_frsize / unit);
tsAvailTmpDirGB = (float)((double)info.f_bavail * (double)info.f_frsize / unit); tsAvailTmpDirectorySpace = (float)((double)info.f_bavail * (double)info.f_frsize / unit);
} }
return true; return true;
......
...@@ -631,5 +631,5 @@ void exprSerializeTest2() { ...@@ -631,5 +631,5 @@ void exprSerializeTest2() {
} }
} // namespace } // namespace
TEST(testCase, astTest) { TEST(testCase, astTest) {
exprSerializeTest2(); // exprSerializeTest2();
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册