提交 5dc2c5cd 编写于 作者: H Haojun Liao

[td-225] refactor subquery codes

上级 3206bf96
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCSECONARYMERGE_H
#define TDENGINE_TSCSECONARYMERGE_H
#ifndef TDENGINE_TSCLOCALMERGE_H
#define TDENGINE_TSCLOCALMERGE_H
#ifdef __cplusplus
extern "C" {
......@@ -27,14 +27,7 @@ extern "C" {
#include "tsclient.h"
#define MAX_NUM_OF_SUBQUERY_RETRY 3
/*
* @version 0.1
* @date 2018/01/05
* @author liaohj
* management of client-side reducer for metric query
*/
struct SQLFunctionCtx;
typedef struct SLocalDataSource {
......@@ -60,7 +53,6 @@ typedef struct SLocalReducer {
char * prevRowOfInput;
tFilePage * pResultBuf;
int32_t nResultBufSize;
// char * pBufForInterpo; // intermediate buffer for interpolation
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
......@@ -81,13 +73,8 @@ typedef struct SLocalReducer {
} SLocalReducer;
typedef struct SSubqueryState {
/*
* the number of completed retrieval subquery, once this value equals to numOfVnodes,
* all retrieval are completed.Local merge is launched.
*/
int32_t numOfCompleted;
int32_t numOfTotal; // number of total sub-queries
int32_t code; // code from subqueries
int32_t numOfRemain; // the number of remain unfinished subquery
int32_t numOfTotal; // the number of total sub-queries
uint64_t numOfRetrievedRows; // total number of points in this query
} SSubqueryState;
......@@ -128,4 +115,4 @@ int32_t tscDoLocalMerge(SSqlObj *pSql);
}
#endif
#endif // TDENGINE_TSCSECONARYMERGE_H
#endif // TDENGINE_TSCLOCALMERGE_H
......@@ -26,11 +26,9 @@ extern "C" {
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
......
......@@ -64,7 +64,8 @@ typedef struct SJoinSupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
int64_t interval; // interval time
int64_t intervalTime; // interval time
int64_t slidingTime; // sliding time
SLimitVal limit; // limit info
uint64_t uid; // query meter uid
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
......
......@@ -12,12 +12,12 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tscSubquery.h"
#include <qast.h>
#include <tcompare.h>
#include <tschemautil.h>
#include "os.h"
#include "qast.h"
#include "tcompare.h"
#include "tschemautil.h"
#include "qtsbuf.h"
#include "tscLog.h"
#include "tsclient.h"
......@@ -169,7 +169,8 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pSupporter->interval = pQueryInfo->intervalTime;
pSupporter->intervalTime = pQueryInfo->intervalTime;
pSupporter->slidingTime = pQueryInfo->slidingTime;
pSupporter->limit = pQueryInfo->limit;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, index);
......@@ -186,7 +187,7 @@ SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, in
return pSupporter;
}
void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
if (pSupporter == NULL) {
return;
}
......@@ -217,7 +218,7 @@ void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
* primary timestamp column , the secondary query is not necessary
*
*/
bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -233,14 +234,11 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
/*
* launch secondary stage query to fetch the result that contains timestamp in set
*/
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
static int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
int32_t numOfSub = 0;
SJoinSupporter* pSupporter = NULL;
/*
* If the columns are not involved in the final select clause,
* the corresponding query will not be issued.
*/
//If the columns are not involved in the final select clause, the corresponding query will not be issued.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (taosArrayGetSize(pSupporter->exprList) > 0) {
......@@ -251,13 +249,12 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it
tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query, others are not retrieve in "
"select clause", pSql, pSql->numOfSubs, numOfSub);
tscTrace("%p start to launch secondary subquery, total:%d, only:%d needs to query", pSql, pSql->numOfSubs, numOfSub);
//the subqueries that do not actually launch the secondary query to virtual node is set as completed.
SSubqueryState* pState = pSupporter->pState;
pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = (pSql->numOfSubs - numOfSub);
pState->numOfRemain = numOfSub;
bool success = true;
......@@ -301,7 +298,8 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE);
pQueryInfo->intervalTime = pSupporter->interval;
pQueryInfo->intervalTime = pSupporter->intervalTime;
pQueryInfo->slidingTime = pSupporter->slidingTime;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
......@@ -375,6 +373,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
if (pSql->pSubs[i] == NULL) {
continue;
}
tscDoQuery(pSql->pSubs[i]);
}
......@@ -405,11 +404,9 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
}
static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
pSqlObj->res.code = pSupporter->pState->code;
if (finished >= numOfTotal) {
assert(pSupporter->pState->numOfRemain > 0);
if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) <= 0) {
tscError("%p all subquery return and query failed, global code:%d", pSqlObj, pSqlObj->res.code);
freeJoinSubqueryObj(pSqlObj);
}
......@@ -421,7 +418,7 @@ static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
pQueryInfo->window = *win;
}
static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
static UNUSED_FUNC void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj* pSql) {
SSqlObj* pParentSql = pSupporter->pObj;
SQueryInfo* pParentQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
......@@ -443,22 +440,7 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
// return;
// }
// }
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished >= numOfTotal) {
assert(finished == numOfTotal);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, pSql,
pSupporter->subqueryIndex);
freeJoinSubqueryObj(pParentSql);
return;
}
tscTrace("%p all subqueries retrieve ts complete, do ts block intersect", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
......@@ -471,7 +453,6 @@ static void tSIntersectionAndLaunchSecQuery(SJoinSupporter* pSupporter, SSqlObj*
updateQueryTimeRange(pParentQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
}
}
}
int32_t tscCompareTidTags(const void* p1, const void* p2) {
......@@ -545,11 +526,13 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscAddSpecialColumnForSelect(pQueryInfo, 0, TSDB_FUNC_TS_COMP, &index, &colSchema, TSDB_COL_NORMAL);
// set the tags value for ts_comp function
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1;
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
int16_t tagColId = tscGetJoinTagColIdByUid(&pSupporter->tagCond, pTableMetaInfo->pTableMeta->uid);
pExpr->param->i64Key = tagColId;
pExpr->numOfParams = 1;
}
// add the filter tag column
if (pSupporter->colList != NULL) {
size_t s = taosArrayGetSize(pSupporter->colList);
......@@ -575,7 +558,7 @@ static void issueTSCompQuery(SSqlObj* pSql, SJoinSupporter* pSupporter, SSqlObj*
tscProcessSql(pSql);
}
static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, void* pSql) {
static bool checkForDuplicateTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1, SSqlObj* pPSqlObj) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);// todo: tags mismatch, tags not completed
......@@ -587,8 +570,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1,
STidTags* p = (STidTags*) varDataVal(p1->pIdTagList + i * p1->tagSize);
if (doCompare(prev->tag, p->tag, pColSchema->type, pColSchema->bytes) == 0) {
tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pSql);
p1->pState->code = TSDB_CODE_QRY_DUP_JOIN_KEY;
tscError("%p join tags have same value for different table, free all sub SqlObj and quit", pPSqlObj);
pPSqlObj->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
return false;
}
}
......@@ -596,8 +579,8 @@ static bool checkForIdenticalTagVal(SQueryInfo* pQueryInfo, SJoinSupporter* p1,
return true;
}
static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
tscTrace("%p all subqueries retrieve tags complete, do tags match", pParentSql);
static void getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray** s1, SArray** s2) {
tscTrace("%p all subqueries retrieve <tid, tags> complete, do tags match", pParentSql);
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
......@@ -613,7 +596,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql,
*s1 = taosArrayInit(p1->num, p1->tagSize);
*s2 = taosArrayInit(p2->num, p2->tagSize);
if (!(checkForIdenticalTagVal(pQueryInfo, p1, pParentSql) && checkForIdenticalTagVal(pQueryInfo, p2, pParentSql))) {
if (!(checkForDuplicateTagVal(pQueryInfo, p1, pParentSql) && checkForDuplicateTagVal(pQueryInfo, p2, pParentSql))) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.code = TSDB_CODE_QRY_DUP_JOIN_KEY;
tscQueueAsyncRes(pParentSql);
......@@ -642,7 +625,7 @@ static void getIntersectionOfTagVal(SQueryInfo* pQueryInfo, SSqlObj* pParentSql,
}
}
static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
......@@ -652,239 +635,287 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY));
// response of tag retrieve
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
// todo handle error
// check for the error code firstly
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo retry if other subqueries are not failed
if (numOfRows == 0 || pRes->completed) {
if (numOfRows > 0) {
size_t validLen = pSupporter->tagSize * pRes->numOfRows;
assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
size_t length = pSupporter->totalLen + validLen;
char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL);
pSupporter->pIdTagList = tmp;
pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
memcpy(pSupporter->pIdTagList + pSupporter->totalLen,pRes->data, validLen);
pSupporter->totalLen += validLen;
pSupporter->num += pRes->numOfRows;
}
tscQueueAsyncRes(pParentSql);
return;
}
// <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
if (hasMoreVnodesToTry(pSql)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// keep the results in memory
if (numOfRows > 0) {
size_t validLen = pSupporter->tagSize * pRes->numOfRows;
size_t length = pSupporter->totalLen + validLen;
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
pTableMetaInfo->vgroupIndex += 1;
assert(pTableMetaInfo->vgroupIndex < totalVgroups);
// todo handle memory error
char* tmp = realloc(pSupporter->pIdTagList, length);
if (tmp == NULL) {
tscError("%p failed to malloc memory", pSql);
tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
pSupporter->num);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pParentSql, pSupporter);
pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(&pSql->res);
tscQueueAsyncRes(pParentSql);
return;
}
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
pSupporter->pIdTagList = tmp;
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished < numOfTotal) {
return;
}
memcpy(pSupporter->pIdTagList + pSupporter->totalLen, pRes->data, validLen);
pSupporter->totalLen += validLen;
pSupporter->num += pRes->numOfRows;
// all subquery are returned, start to compare the tags
assert(finished == numOfTotal);
// query not completed, continue to retrieve tid + tag tuples
if (!pRes->completed) {
taos_fetch_rows_a(tres, tidTagRetrieveCallback, param);
return;
}
}
SArray *s1 = NULL, *s2 = NULL;
getIntersectionOfTagVal(pQueryInfo, pParentSql, &s1, &s2);
// data in current vnode has all returned to client, try next vnode if exits
// <tid + tag> tuples have been retrieved to client, try <tid + tag> tuples from the next vnode
if (hasMoreVnodesToTry(pSql)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
tscTrace("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
return;
} else {
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
pTableMetaInfo->vgroupIndex += 1;
assert(pTableMetaInfo->vgroupIndex < totalVgroups);
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
tscTrace("%p tid_tag from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pSupporter->num);
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(&pSql->res);
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->code = 0;
pSupporter->pState->numOfTotal = 2;
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
SSqlObj* psub = pParentSql->pSubs[m];
issueTSCompQuery(psub, psub->param, pParentSql);
}
}
// no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
return;
}
} else {
if (numOfRows < 0) { // error
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
SArray *s1 = NULL, *s2 = NULL;
getIntersectionOfTableTuple(pQueryInfo, pParentSql, &s1, &s2);
if (taosArrayGetSize(s1) == 0 || taosArrayGetSize(s2) == 0) { // no results,return.
tscTrace("%p free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
return;
}
pParentSql->res.code = numOfRows;
tscQueueAsyncRes(pParentSql);
return;
}
// proceed to for ts_comp query
SSqlCmd* pSubCmd1 = &pParentSql->pSubs[0]->cmd;
SSqlCmd* pSubCmd2 = &pParentSql->pSubs[1]->cmd;
size_t length = pSupporter->totalLen + pRes->rspLen;
assert(length > 0);
SQueryInfo* pQueryInfo1 = tscGetQueryInfoDetail(pSubCmd1, 0);
STableMetaInfo* pTableMetaInfo1 = tscGetMetaInfo(pQueryInfo1, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo1, s1);
char* tmp = realloc(pSupporter->pIdTagList, length);
assert(tmp != NULL);
SQueryInfo* pQueryInfo2 = tscGetQueryInfoDetail(pSubCmd2, 0);
STableMetaInfo* pTableMetaInfo2 = tscGetMetaInfo(pQueryInfo2, 0);
tscBuildVgroupTableInfo(pParentSql, pTableMetaInfo2, s2);
pSupporter->pIdTagList = tmp;
pSupporter->pState->numOfTotal = 2;
pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal;
memcpy(pSupporter->pIdTagList, pRes->data, pRes->rspLen);
pSupporter->totalLen += pRes->rspLen;
pSupporter->num += pRes->numOfRows;
for (int32_t m = 0; m < pParentSql->numOfSubs; ++m) {
SSqlObj* sub = pParentSql->pSubs[m];
issueTSCompQuery(sub, sub->param, pParentSql);
}
}
// continue retrieve data from vnode
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
}
static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE));
// check for the error code firstly
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
// todo retry if other subqueries are not failed yet
assert(numOfRows < 0 && numOfRows == taos_errno(pSql));
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
return;
}
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
if (numOfRows < 0) {
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pSupporter->pState->code = numOfRows;
quitAllSubquery(pParentSql, pSupporter);
if (numOfRows > 0) { // write the compressed timestamp to disk file
fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f);
fclose(pSupporter->f);
pSupporter->f = NULL;
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
if (pBuf == NULL) { // in error process, close the fd
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
tscQueueAsyncRes(pParentSql);
return;
}
if (numOfRows > 0) { // write the compressed timestamp to disk file
fwrite(pRes->data, pRes->numOfRows, 1, pSupporter->f);
fclose(pSupporter->f);
pSupporter->f = NULL;
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
if (pBuf == NULL) { // in error process, close the fd
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
if (pSupporter->pTSBuf == NULL) {
tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
pSupporter->pTSBuf = pBuf;
} else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
pSupporter->pState->code = TSDB_CODE_TSC_APP_ERROR; // todo set the informative code
quitAllSubquery(pParentSql, pSupporter);
return;
}
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf);
}
if (pSupporter->pTSBuf == NULL) {
tscTrace("%p create tmp file for ts block:%s, size:%d bytes", pSql, pBuf->path, numOfRows);
pSupporter->pTSBuf = pBuf;
} else {
assert(pQueryInfo->numOfTables == 1); // for subquery, only one
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// continue to retrieve ts-comp data from vnode
if (!pRes->completed) {
getTmpfilePath("ts-join", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows;
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf);
}
taos_fetch_rows_a(tres, tsCompRetrieveCallback, param);
return;
}
}
if (pRes->completed) {
if (hasMoreVnodesToTry(pSql)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (hasMoreVnodesToTry(pSql)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
pTableMetaInfo->vgroupIndex += 1;
assert(pTableMetaInfo->vgroupIndex < totalVgroups);
int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
pTableMetaInfo->vgroupIndex += 1;
assert(pTableMetaInfo->vgroupIndex < totalVgroups);
tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
pRes->numOfClauseTotal);
tscTrace("%p results from vgroup index:%d completed, try next vgroup:%d. total vgroups:%d. current numOfRes:%d",
pSql, pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups,
pRes->numOfClauseTotal);
pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(&pSql->res);
pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(&pSql->res);
assert(pSupporter->f == NULL);
getTmpfilePath("ts-join", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows;
assert(pSupporter->f == NULL);
getTmpfilePath("ts-join", pSupporter->path);
// TODO check for failure
pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows;
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
} else {
tSIntersectionAndLaunchSecQuery(pSupporter, pSql);
}
// set the callback function
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
} else { // open a new file to save the incoming result
getTmpfilePath("ts-join", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w");
pRes->row = pRes->numOfRows;
if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
return;
}
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
}
} else { // secondary stage retrieve, driven by taos_fetch_row or other functions
if (numOfRows < 0) {
pSupporter->pState->code = numOfRows;
tscError("%p retrieve failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
}
tscTrace("%p all subquery retrieve ts complete, do ts block intersect", pParentSql);
if (numOfRows >= 0) {
pRes->numOfTotal += pRes->numOfRows;
}
// proceeds to launched secondary query to retrieve final data
SJoinSupporter* p1 = pParentSql->pSubs[0]->param;
SJoinSupporter* p2 = pParentSql->pSubs[1]->param;
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
STimeWindow win = TSWINDOW_INITIALIZER;
int64_t num = doTSBlockIntersect(pParentSql, p1, p2, &win);
if (num <= 0) { // no result during ts intersect
tscTrace("%p no results generated in ts intersection, free all sub SqlObj and quit", pParentSql);
freeJoinSubqueryObj(pParentSql);
return;
}
// launch the query the retrieve actual results from vnode along with the filtered timestamp
SQueryInfo* pPQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, pParentSql->cmd.clauseIndex);
updateQueryTimeRange(pPQueryInfo, &win);
tscLaunchSecondPhaseSubqueries(pParentSql);
}
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1;
static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfRows) {
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
SSqlObj* pParentSql = pSupporter->pObj;
SSubqueryState* pState = pSupporter->pState;
return;
}
}
SSqlObj* pSql = (SSqlObj*)tres;
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (finished >= numOfTotal) {
assert(finished == numOfTotal);
tscTrace("%p all %d secondary subquery retrieves completed, global code:%d", tres, numOfTotal,
pParentSql->res.code);
// TODO put to async res?
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(numOfRows == taos_errno(pSql));
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = pSupporter->pState->code;
freeJoinSubqueryObj(pParentSql);
pParentSql->res.completed = true;
}
pParentSql->res.code = numOfRows;
tscError("%p retrieve failed, index:%d, code:%s", pSql, pSupporter->subqueryIndex, tstrerror(numOfRows));
}
// update the records for each subquery in parent sql object.
for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) {
if (pParentSql->pSubs[i] == NULL) {
continue;
}
if (numOfRows >= 0) {
pRes->numOfTotal += pRes->numOfRows;
}
SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
pRes1->numOfClauseTotal += pRes1->numOfRows;
}
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && numOfRows == 0) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
} else {
tscTrace("%p sub:%p completed, completed:%d, total:%d", pParentSql, tres, finished, numOfTotal);
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pTableMetaInfo->vgroupIndex) < pTableMetaInfo->vgroupList->numOfVgroups) {
pState->numOfRemain = 1;
pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
tscTrace("%p sub:%p completed, remain:%d, total:%d", pParentSql, tres, pState->numOfRemain, pState->numOfTotal);
return;
}
tscTrace("%p all %d secondary subqueries retrieval completed, code:%d", tres, pState->numOfTotal, pParentSql->res.code);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
freeJoinSubqueryObj(pParentSql);
pParentSql->res.completed = true;
}
// update the records for each subquery in parent sql object.
for (int32_t i = 0; i < pParentSql->numOfSubs; ++i) {
if (pParentSql->pSubs[i] == NULL) {
continue;
}
SSqlRes* pRes1 = &pParentSql->pSubs[i]->res;
pRes1->numOfClauseTotal += pRes1->numOfRows;
}
// data has retrieved to client, build the join results
tscBuildResFromSubqueries(pParentSql);
}
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
......@@ -902,7 +933,7 @@ static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch
}
pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = pSql->numOfSubs - numOfFetch;
pState->numOfRemain = numOfFetch;
return pSupporter;
}
......@@ -990,7 +1021,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
pSupporter->subqueryIndex, pTableMetaInfo->vgroupIndex);
tscResetForNextRetrieve(pRes1);
pSql1->fp = joinRetrieveCallback;
pSql1->fp = joinRetrieveFinalResCallback;
if (pCmd1->command < TSDB_SQL_LOCAL) {
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
......@@ -1008,8 +1039,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
tscTrace("%p all subquery response, retrieve data", pSql);
// the column transfer support struct has been built
if (pRes->pColumnIndex != NULL) {
return; // the column transfer support struct has been built
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -1045,68 +1077,76 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
SSqlObj* pParentSql = pSupporter->pObj;
// There is only one subquery and table for each subquery.
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(pQueryInfo->numOfTables == 1 && pSql->cmd.numOfClause == 1);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
if (code != TSDB_CODE_SUCCESS) { // direct call joinRetrieveCallback and set the error code
joinRetrieveCallback(param, pSql, code);
} else { // first stage query, continue to retrieve compressed time stamp data
pSql->fp = joinRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
}
} else { // second stage join subquery
SSqlObj* pParentSql = pSupporter->pObj;
// retrieve actual query results from vnode during the second stage join subquery
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code,
pSupporter->pState->code);
quitAllSubquery(pParentSql, pSupporter);
return;
}
return;
}
// TODO here retry is required, not directly returns to client
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(taos_errno(pSql) == code);
if (code != TSDB_CODE_SUCCESS) {
tscError("%p sub query failed, code:%s, set global code:%s, index:%d", pSql, tstrerror(code), tstrerror(code),
pSupporter->subqueryIndex);
pSupporter->pState->code = code;
quitAllSubquery(pParentSql, pSupporter);
tscError("%p abort query, code:%d, global code:%d", pSql, code, pParentSql->res.code);
pParentSql->res.code = code;
quitAllSubquery(pParentSql, pSupporter);
tscQueueAsyncRes(pParentSql);
return;
}
// retrieve <tid, tag> tuples from vnode
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) {
pSql->fp = tidTagRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
return;
}
// retrieve ts_comp info from vnode
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
pSql->fp = tsCompRetrieveCallback;
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
return;
}
// wait for the other subqueries response from vnode
if (atomic_sub_fetch_32(&pSupporter->pState->numOfRemain, 1) > 0) {
return;
}
tscSetupOutputColumnIndex(pParentSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/**
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pSupporter->pState->numOfRemain = pSupporter->pState->numOfTotal; // reset the record value
pSql->fp = joinRetrieveFinalResCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
int32_t numOfTotal = pSupporter->pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1);
if (finished >= numOfTotal) {
assert(finished == numOfTotal);
tscSetupOutputColumnIndex(pParentSql);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/**
* if the query is a continue query (vgroupIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pTableMetaInfo->vgroupIndex > 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
pSupporter->pState->numOfCompleted = 0; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else {
tscQueueAsyncRes(pParentSql);
}
}
}
tscQueueAsyncRes(pParentSql);
}
}
}
......@@ -1257,14 +1297,15 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) {
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pQueryInfo->numOfTables;
pState->numOfRemain = pState->numOfTotal;
tscTrace("%p start launch subquery, total:%d", pSql, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, pState, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
tscError("%p tableIndex:%d, failed to allocate join support object, abort further query", pSql, i);
pState->numOfCompleted = pQueryInfo->numOfTables - i - 1;
pState->numOfRemain = i;
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pSql->res.code;
......@@ -1342,6 +1383,8 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscTrace("%p retrieved query data from %d vnode(s)", pSql, pSql->numOfSubs);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
pState->numOfRemain = pSql->numOfSubs;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
......@@ -1436,7 +1479,7 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows);
static void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows);
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t errCode) {
static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES *tres, int32_t code) {
// set no disk space error info
#ifdef WINDOWS
LPVOID lpMsgBuf;
......@@ -1448,44 +1491,44 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
#else
tscError("sub:%p failed to flush data to disk:reason:%s", tres, strerror(errno));
#endif
trsupport->pState->code = -errCode;
SSqlObj* pParentSql = trsupport->pParentSqlObj;
pParentSql->res.code = code;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
pthread_mutex_unlock(&trsupport->queryMutex);
tscHandleSubqueryError(trsupport, tres, trsupport->pState->code);
tscHandleSubqueryError(trsupport, tres, pParentSql->res.code);
}
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
SSqlObj *pPObj = trsupport->pParentSqlObj;
SSqlObj *pParentSql = trsupport->pParentSqlObj;
int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL);
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// retrieved in subquery failed. OR query cancelled in retrieve phase.
if (pState->code == TSDB_CODE_SUCCESS && pPObj->res.code != TSDB_CODE_SUCCESS) {
pState->code = pPObj->res.code;
if (taos_errno(pSql) == TSDB_CODE_SUCCESS && pParentSql->res.code != TSDB_CODE_SUCCESS) {
/*
* kill current sub-query connection, which may retrieve data from vnodes;
* Here we get: pPObj->res.code == TSDB_CODE_TSC_QUERY_CANCELLED
*/
pSql->res.numOfRows = 0;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts
tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
subqueryIndex, pState->code);
tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", pParentSql, pSql,
subqueryIndex, pParentSql->res.code);
}
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
subqueryIndex, pState->code);
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pParentSql, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pParentSql, pSql,
subqueryIndex, pParentSql->res.code);
} else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pState->code == TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && pParentSql->res.code == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
......@@ -1503,8 +1546,8 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery sqlObj due to out of memory, abort retry",
trsupport->pParentSqlObj, pSql);
pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
return;
}
......@@ -1512,24 +1555,24 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscProcessSql(pNew);
return;
} else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&pState->code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pPObj, pSql,
tstrerror(numOfRows), subqueryIndex, tstrerror(pState->code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%s,orderOfSub:%d failed.no more retry,set global code:%s", pParentSql, pSql,
tstrerror(numOfRows), subqueryIndex, tstrerror(pParentSql->res.code));
}
}
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
int32_t remain = -1;
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pParentSql, pSql, trsupport->subqueryIndex,
pState->numOfTotal - remain);
return tscFreeSubSqlObj(trsupport, pSql);
}
// all subqueries are failed
tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pPObj, pState->numOfTotal, tstrerror(pState->code));
pPObj->res.code = pState->code;
tscError("%p retrieve from %d vnode(s) completed,code:%s.FAILED.", pParentSql, pState->numOfTotal,
tstrerror(pParentSql->res.code));
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, trsupport->pFinalColModel,
pState->numOfTotal);
......@@ -1538,13 +1581,13 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscFreeSubSqlObj(trsupport, pSql);
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pPObj->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pParentSql->cmd, 0);
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
(*pPObj->fp)(pPObj->param, pPObj, pPObj->res.code);
(*pParentSql->fp)(pParentSql->param, pParentSql, pParentSql->res.code);
} else { // regular super table query
if (pPObj->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pPObj);
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscQueueAsyncRes(pParentSql);
}
}
}
......@@ -1590,14 +1633,11 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
return tscAbortFurtherRetryRetrieval(trsupport, pSql, TSDB_CODE_TSC_NO_DISKSPACE);
}
// keep this value local variable, since the pState variable may be released by other threads, if atomic_add opertion
// increases the finished value up to pState->numOfTotal value, which means all subqueries are completed.
// In this case, the comparsion between finished value and released pState->numOfTotal is not safe.
int32_t numOfTotal = pState->numOfTotal;
int32_t finished = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (finished < numOfTotal) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex, finished);
int32_t remain = -1;
if ((remain = atomic_sub_fetch_32(&pState->numOfRemain, 1)) > 0) {
tscTrace("%p sub:%p orderOfSub:%d freed, finished subqueries:%d", pPObj, pSql, trsupport->subqueryIndex,
pState->numOfTotal - remain);
return tscFreeSubSqlObj(trsupport, pSql);
}
......@@ -1632,10 +1672,10 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
......@@ -1643,13 +1683,12 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
}
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pPObj->numOfSubs == pState->numOfTotal);
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pPObj->numOfSubs == pState->numOfTotal);
// query process and cancel query process may execute at the same time
pthread_mutex_lock(&trsupport->queryMutex);
if (numOfRows < 0 || pState->code < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
if (numOfRows < 0 || pPObj->res.code != TSDB_CODE_SUCCESS) {
return tscHandleSubqueryError(trsupport, pSql, numOfRows);
}
......@@ -1738,18 +1777,16 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SCMVgroupInfo* pVgroup = &pTableMetaInfo->vgroupList->vgroups[0];
SSubqueryState* pState = trsupport->pState;
assert(pState->numOfCompleted < pState->numOfTotal && pState->numOfCompleted >= 0 &&
pParentSql->numOfSubs == pState->numOfTotal);
if (pParentSql->res.code != TSDB_CODE_SUCCESS || pState->code != TSDB_CODE_SUCCESS) {
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
// todo set error code
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
// stable query is killed, abort further retry
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
code = pParentSql->res.code;
} else {
code = pState->code;
}
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%s", pParentSql, pSql,
......@@ -1766,7 +1803,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (code != TSDB_CODE_SUCCESS) {
if (trsupport->numOfRetry++ >= MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
atomic_val_compare_exchange_32(&pState->code, 0, code);
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code);
} else { // does not reach the maximum retry time, go on
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
......@@ -1775,8 +1812,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vgId:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pVgroup->vgId, trsupport->subqueryIndex);
pState->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
......@@ -1789,11 +1826,11 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
}
}
if (pState->code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { // at least one peer subquery failed, abort current query
tscTrace("%p sub:%p query failed,ip:%u,vgId:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pState->code);
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex, pParentSql->res.code);
tscHandleSubqueryError(param, tres, pState->code);
tscHandleSubqueryError(param, tres, pParentSql->res.code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete, ip:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSqlObj, pSql,
pVgroup->ipAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
......@@ -1813,8 +1850,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
int32_t total = pState->numOfTotal;
// increase the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
......@@ -1826,8 +1862,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
}
taos_free_result(tres);
int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (completed < total) {
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
return;
}
......@@ -2157,5 +2192,3 @@ static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData;
}
......@@ -1216,7 +1216,6 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
// interval query with limit applied
int32_t numOfRes = 0;
if (isIntervalQuery(pQuery)) {
numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
} else {
......
......@@ -15,7 +15,6 @@
#include <stdint.h>
#include <pthread.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册