提交 82706e1b 编写于 作者: H hjxilinx

add the union support in sql parser: fix bugs and refactor some codes. #1032. [TBASE-1140]

上级 d99b94a5
...@@ -238,6 +238,11 @@ void sortRemoveDuplicates(STableDataBlocks* dataBuf); ...@@ -238,6 +238,11 @@ void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex); void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -457,6 +457,8 @@ extern int tsInsertHeadSize; ...@@ -457,6 +457,8 @@ extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern SIpStrList tscMgmtIpList; extern SIpStrList tscMgmtIpList;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tast.h" #include "tast.h"
#include "tlog.h" #include "tlog.h"
#include "tscSQLParser.h"
#include "tscSyntaxtreefunction.h" #include "tscSyntaxtreefunction.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsdb.h" #include "tsdb.h"
...@@ -26,7 +27,6 @@ ...@@ -26,7 +27,6 @@
#include "tstoken.h" #include "tstoken.h"
#include "ttypes.h" #include "ttypes.h"
#include "tutil.h" #include "tutil.h"
#include "tscSQLParser.h"
/* /*
* *
......
...@@ -26,19 +26,18 @@ ...@@ -26,19 +26,18 @@
#include "tutil.h" #include "tutil.h"
#include "tnote.h" #include "tnote.h"
void tscProcessFetchRow(SSchedMsg *pMsg); static void tscProcessFetchRow(SSchedMsg *pMsg);
void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()); static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());
/* /*
* proxy function to perform sequentially query&retrieve operation. * Proxy function to perform sequentially query&retrieve operation.
* If sql queries upon metric and two-stage merge procedure is not needed, * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
* it will sequentially query&retrieve data for all vnodes in pCmd->pMetricMeta * query), it will sequentially query&retrieve data for all vnodes
*/ */
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
// TODO return the correct error code to client in tscQueueAsyncError // TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
...@@ -118,39 +117,24 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf ...@@ -118,39 +117,24 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); if (numOfRows == 0) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
if (numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) { } else {
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx /*
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); * 1. has reach the limitation
assert(pMeterMetaInfo->vnodeIndex >= 0); * 2. no remain virtual nodes to be retrieved anymore
*/
/* reach the maximum number of output rows, abort */ (*pSql->fetchFp)(param, pSql, 0);
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
(*pSql->fetchFp)(param, tres, 0);
return;
} }
/* update the limit value according to current retrieval results */
pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
pQueryInfo->limit.offset = pRes->offset;
if ((++(pMeterMetaInfo->vnodeIndex)) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
tscTrace("%p retrieve data from next vnode:%d", pSql, pMeterMetaInfo->vnodeIndex);
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncRetrieveNextVnode;
tscProcessSql(pSql);
return; return;
} }
} else { // localreducer has handle this situation
// local reducer has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
pRes->numOfTotal += pRes->numOfRows; pRes->numOfTotal += pRes->numOfRows;
} }
}
(*pSql->fetchFp)(param, tres, numOfRows); (*pSql->fetchFp)(param, tres, numOfRows);
} }
...@@ -185,14 +169,18 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -185,14 +169,18 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
} }
/* /*
* retrieve callback for fetch rows proxy. It serves as the callback function of querying vnode * retrieve callback for fetch rows proxy.
* The below two functions both serve as the callback function of query virtual node.
* query callback first, and then followed by retrieve callback
*/ */
static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows) { static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
// query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchRowsProxy); tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchRowsProxy);
} }
static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows) { void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncRetrieve); // query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchSingleRowProxy);
} }
void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
...@@ -250,8 +238,12 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), ...@@ -250,8 +238,12 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if (pRes->row >= pRes->numOfRows) { if (pRes->row >= pRes->numOfRows) {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncRetrieve; pSql->fp = tscProcessAsyncFetchSingleRowProxy;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql); tscProcessSql(pSql);
} else { } else {
SSchedMsg schedMsg; SSchedMsg schedMsg;
...@@ -263,49 +255,31 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), ...@@ -263,49 +255,31 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
} }
} }
void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { void tscProcessAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj *pSql = (SSqlObj *)tres; SSqlObj *pSql = (SSqlObj *)tres;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (numOfRows == 0) { if (numOfRows == 0) {
// sequentially retrieve data from remain vnodes. if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
} else {
/* /*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved * 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/ */
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
/* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
(*pSql->fetchFp)(pSql->param, pSql, NULL); (*pSql->fetchFp)(pSql->param, pSql, NULL);
return;
} }
/* update the limit value according to current retrieval results */
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncContinueRetrieve;
tscProcessSql(pSql);
return; return;
} }
} else {
(*pSql->fetchFp)(pSql->param, pSql, NULL);
}
} else {
for (int i = 0; i < pCmd->numOfCols; ++i) for (int i = 0; i < pCmd->numOfCols; ++i)
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
pRes->row++; pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow); (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
}
} }
void tscProcessFetchRow(SSchedMsg *pMsg) { void tscProcessFetchRow(SSchedMsg *pMsg) {
...@@ -314,12 +288,12 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -314,12 +288,12 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
assert(pCmd->numOfCols == pQueryInfo->fieldsInfo.numOfOutputCols);
for (int i = 0; i < pCmd->numOfCols; ++i) for (int i = 0; i < pCmd->numOfCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
pRes->row++; }
pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
} }
......
...@@ -164,8 +164,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor ...@@ -164,8 +164,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
} }
// todo handle failed to create sub query // todo handle failed to create sub query
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) {
/*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) {
SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter));
if (pSupporter == NULL) { if (pSupporter == NULL) {
return NULL; return NULL;
...@@ -175,14 +174,16 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS ...@@ -175,14 +174,16 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
pSupporter->pState = pState; pSupporter->pState = pState;
pSupporter->subqueryIndex = index; pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pSupporter->interval = pQueryInfo->nAggTimeInterval; pSupporter->interval = pQueryInfo->nAggTimeInterval;
pSupporter->limit = pQueryInfo->limit; pSupporter->limit = pQueryInfo->limit;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, index); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, index);
pSupporter->uid = pMeterMetaInfo->pMeterMeta->uid; pSupporter->uid = pMeterMetaInfo->pMeterMeta->uid;
assert (pSupporter->uid != 0);
getTmpfilePath("join-", pSupporter->path); getTmpfilePath("join-", pSupporter->path);
pSupporter->f = fopen(pSupporter->path, "w"); pSupporter->f = fopen(pSupporter->path, "w");
...@@ -236,37 +237,47 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -236,37 +237,47 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
int32_t numOfSub = 0; int32_t numOfSub = 0;
SJoinSubquerySupporter* pSupporter = NULL; SJoinSubquerySupporter* pSupporter = NULL;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param;
pSupporter->pState->numOfCompleted = 0;
/* /*
* If the columns are not involved in the final select clause, the secondary query will not be launched * If the columns are not involved in the final select clause, the secondary query will not be launched
* for the subquery. * for the subquery.
*/ */
SSubqueryState* pState = NULL;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (pSupporter->exprsInfo.numOfExprs > 0) { if (pSupporter->exprsInfo.numOfExprs > 0) {
++numOfSub; ++numOfSub;
} }
} }
assert(numOfSub > 0);
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscTrace( tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in "
"%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " "select clause", pSql, pSql->numOfSubs, numOfSub);
"select clause",
pSql, pSql->numOfSubs, numOfSub); /*
* the subqueries that do not actually launch the secondary query to virtual node is set as completed.
*/
pState = pSupporter->pState;
pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = (pSql->numOfSubs - numOfSub);
int32_t j = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
pSupporter = pSub->param; pSupporter = pSub->param;
pSupporter->pState->numOfTotal = numOfSub;
if (pSupporter->exprsInfo.numOfExprs == 0) { if (pSupporter->exprsInfo.numOfExprs == 0) {
tscTrace("%p subquery %d, not need to launch query, ignore it", pSql, i);
tscDestroyJoinSupporter(pSupporter); tscDestroyJoinSupporter(pSupporter);
taos_free_result(pSub); tscFreeSqlObj(pSub);
pSql->pSubs[i] = NULL;
continue; continue;
} }
// todo refactor to avoid the memory problem handling
SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) { if (pNew == NULL) {
pSql->numOfSubs = i; // revise the number of subquery pSql->numOfSubs = i; // revise the number of subquery
...@@ -279,7 +290,8 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -279,7 +290,8 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscClearSubqueryInfo(&pNew->cmd); tscClearSubqueryInfo(&pNew->cmd);
pSql->pSubs[j++] = pNew; pSql->pSubs[i] = pNew;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
...@@ -307,6 +319,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -307,6 +319,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
// todo refactor function name // todo refactor function name
SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
tscFieldInfoCalOffset(pNewQueryInfo); tscFieldInfoCalOffset(pNewQueryInfo);
...@@ -334,12 +347,15 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -334,12 +347,15 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscPrintSelectClause(&pNew->cmd, 0); tscPrintSelectClause(&pNew->cmd, 0);
#endif #endif
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
tscProcessSql(pNew); tscProcessSql(pNew);
} }
// revise the number of subs
pSql->numOfSubs = j;
return 0; return 0;
} }
...@@ -537,13 +553,35 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -537,13 +553,35 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
} }
} }
static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
int32_t notInvolved = 0;
SJoinSubquerySupporter* pSupporter = NULL;
SSubqueryState* pState = NULL;
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
notInvolved++;
} else {
pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param;
pState = pSupporter->pState;
}
}
pState->numOfTotal = pSql->numOfSubs;
pState->numOfCompleted = pSql->numOfSubs - numOfFetch;
return pSupporter;
}
void tscFetchDatablockFromSubquery(SSqlObj* pSql) { void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
int32_t numOfFetch = 0; int32_t numOfFetch = 0;
assert(pSql->numOfSubs >= 1); assert(pSql->numOfSubs >= 1);
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) { // this subquery does not need to involve in secondary query
continue;
}
SSqlRes *pRes = &pSql->pSubs[i]->res; SSqlRes *pRes = &pSql->pSubs[i]->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
...@@ -562,18 +600,19 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -562,18 +600,19 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
} }
if (numOfFetch <= 0) { if (numOfFetch <= 0) {
return ; return;
} }
// TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param; SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch);
pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed
pSupporter->pState->numOfCompleted = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) {
continue;
}
SSqlRes* pRes1 = &pSql1->res; SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd; SSqlCmd* pCmd1 = &pSql1->cmd;
...@@ -581,9 +620,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -581,9 +620,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
pSupporter = (SJoinSubquerySupporter*)pSql1->param; pSupporter = (SJoinSubquerySupporter*)pSql1->param;
// wait for all subqueries completed // wait for all subqueries completed
pSupporter->pState->numOfTotal = numOfFetch;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0);
assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1); assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
...@@ -605,6 +642,16 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -605,6 +642,16 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
// wait for all subquery completed // wait for all subquery completed
tsem_wait(&pSql->rspSem); tsem_wait(&pSql->rspSem);
// update the records for each subquery
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
SSqlRes* pRes1 = &pSql->pSubs[i]->res;
pRes1->numOfTotalInCurrentClause += pRes1->numOfRows;
}
} }
// all subqueries return, set the result output index // all subqueries return, set the result output index
...@@ -618,7 +665,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -618,7 +665,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
return; // the column transfer support struct has been built return; // the column transfer support struct has been built
} }
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
...@@ -626,15 +673,15 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -626,15 +673,15 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
int32_t tableIndexOfSub = -1; int32_t tableIndexOfSub = -1;
for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) {
SSqlObj* pSub = pSql->pSubs[j]; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, j);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSub->cmd, 0, 0);
if (pMeterMetaInfo->pMeterMeta->uid == pExpr->uid) { if (pMeterMetaInfo->pMeterMeta->uid == pExpr->uid) {
tableIndexOfSub = j; tableIndexOfSub = j;
break; break;
} }
} }
assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables);
SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd; SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0); SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
...@@ -710,7 +757,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -710,7 +757,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
/** /**
......
...@@ -989,7 +989,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -989,7 +989,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
} else { } else {
pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
// assert(pQueryInfo->numOfTables == 1);
} }
if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
...@@ -1053,6 +1052,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1053,6 +1052,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
} }
void *fp = pSql->fp; void *fp = pSql->fp;
ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr;
if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) {
/* /*
* For async insert, after get the metermeta from server, the sql string will not be * For async insert, after get the metermeta from server, the sql string will not be
...@@ -1062,7 +1063,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1062,7 +1063,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
*/ */
if (fp != NULL) { if (fp != NULL) {
if (TSDB_CODE_ACTION_IN_PROGRESS == code) { if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
tscTrace("async insert and waiting to get meter meta, then continue parse sql: %s", pSql->asyncTblPos); tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos);
return code; return code;
} }
...@@ -1277,11 +1278,8 @@ int tsParseInsertSql(SSqlObj *pSql) { ...@@ -1277,11 +1278,8 @@ int tsParseInsertSql(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = NULL; SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
if (sToken.type == TK_INSERT) { uint16_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT;
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); TSDB_QUERY_SET_TYPE(pQueryInfo->type, type);
} else {
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_IMPORT);
}
sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL);
if (sToken.type != TK_INTO) { if (sToken.type != TK_INTO) {
......
...@@ -42,12 +42,6 @@ typedef struct SColumnList { ...@@ -42,12 +42,6 @@ typedef struct SColumnList {
SColumnIndex ids[TSDB_MAX_COLUMNS]; SColumnIndex ids[TSDB_MAX_COLUMNS];
} SColumnList; } SColumnList;
typedef struct SColumnIdListRes {
SSchema* pSchema;
int32_t numOfCols;
SColumnList list;
} SColumnIdListRes;
static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIdx, int32_t tableIndex); static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIdx, int32_t tableIndex);
static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo); static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo);
...@@ -518,6 +512,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -518,6 +512,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if ((code = doCheckForQuery(pSql, pQuerySql, i)) != TSDB_CODE_SUCCESS) { if ((code = doCheckForQuery(pSql, pQuerySql, i)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
tscPrintSelectClause(pCmd, i);
} }
// set the command/global limit parameters from the first subclause to the sqlcmd object // set the command/global limit parameters from the first subclause to the sqlcmd object
...@@ -1078,9 +1074,10 @@ static void extractColumnNameFromString(tSQLExprItem* pItem) { ...@@ -1078,9 +1074,10 @@ static void extractColumnNameFromString(tSQLExprItem* pItem) {
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable) { int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable) {
assert(pSelection != NULL && pCmd != NULL); assert(pSelection != NULL && pCmd != NULL);
const char* msg1 = "invalid column name/illegal column type in arithmetic expression"; const char* msg1 = "invalid column name, or illegal column type";
const char* msg2 = "functions can not be mixed up"; const char* msg2 = "functions can not be mixed up";
const char* msg3 = "not support query expression"; const char* msg3 = "not support query expression";
const char* msg4 = "columns from different table mixed up in arithmetic expression";
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
...@@ -1112,18 +1109,18 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1112,18 +1109,18 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
} else if (pItem->pNode->nSQLOptr >= TK_PLUS && pItem->pNode->nSQLOptr <= TK_REM) { } else if (pItem->pNode->nSQLOptr >= TK_PLUS && pItem->pNode->nSQLOptr <= TK_REM) {
// arithmetic function in select // arithmetic function in select
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SColumnList columnList = {0};
SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); if (validateArithmeticSQLExpr(pItem->pNode, pQueryInfo, &columnList) != TSDB_CODE_SUCCESS) {
SColumnIdListRes columnList = {.pSchema = pSchema, .numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns,
.list = {0}};
int32_t ret =
validateArithmeticSQLExpr(pItem->pNode, pQueryInfo, &columnList.list);
if (ret != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1); return invalidSqlErrMsg(pQueryInfo->msg, msg1);
} }
int32_t tableIndex = columnList.ids[0].tableIndex;
for(int32_t f = 1; f < columnList.num; ++f) {
if (columnList.ids[f].tableIndex != tableIndex) {
return invalidSqlErrMsg(pQueryInfo->msg, msg4);
}
}
char arithmeticExprStr[1024] = {0}; char arithmeticExprStr[1024] = {0};
char* p = arithmeticExprStr; char* p = arithmeticExprStr;
...@@ -1132,10 +1129,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1132,10 +1129,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
} }
// expr string is set as the parameter of function // expr string is set as the parameter of function
SColumnIndex index = {0}; SColumnIndex index = {.tableIndex = tableIndex};
SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputIndex, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputIndex, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE,
sizeof(double), sizeof(double)); sizeof(double), sizeof(double));
addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), 0); addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), index.tableIndex);
/* todo alias name should use the original sql string */ /* todo alias name should use the original sql string */
if (pItem->aliasName != NULL) { if (pItem->aliasName != NULL) {
...@@ -1144,7 +1141,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1144,7 +1141,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
strncpy(pExpr->aliasName, arithmeticExprStr, TSDB_COL_NAME_LEN); strncpy(pExpr->aliasName, arithmeticExprStr, TSDB_COL_NAME_LEN);
} }
insertResultField(pQueryInfo, i, &columnList.list, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName); insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName);
} else { } else {
/* /*
* not support such expression * not support such expression
...@@ -2883,7 +2880,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL ...@@ -2883,7 +2880,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
pList->ids[pList->num++].columnIndex = index.columnIndex; pList->ids[pList->num++] = index;
} else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) { } else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) { } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) {
...@@ -5142,7 +5139,8 @@ void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex) { ...@@ -5142,7 +5139,8 @@ void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t size = sprintf(str + offset, "%s(%d)", aAggs[pExpr->functionId].aName, pExpr->colInfo.colId); int32_t size = sprintf(str + offset, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].aName,
pExpr->uid, pExpr->colInfo.colId);
offset += size; offset += size;
if (i < pQueryInfo->exprsInfo.numOfExprs - 1) { if (i < pQueryInfo->exprsInfo.numOfExprs - 1) {
......
...@@ -725,6 +725,13 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu ...@@ -725,6 +725,13 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pNewQueryInfo->colList.numOfCols++; pNewQueryInfo->colList.numOfCols++;
} }
} }
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name);
} else { } else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
...@@ -734,6 +741,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu ...@@ -734,6 +741,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
tscPrintSelectClause(&pNew->cmd, 0); tscPrintSelectClause(&pNew->cmd, 0);
#endif #endif
return tscProcessSql(pNew); return tscProcessSql(pNew);
} }
......
...@@ -446,6 +446,10 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -446,6 +446,10 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool allSubqueryExhausted = true; bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
if (pSql->pSubs[i] == NULL) {
continue;
}
SSqlRes *pRes1 = &pSql->pSubs[i]->res; SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
...@@ -456,7 +460,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -456,7 +460,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
/* /*
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, go on * available, goes on
*/ */
if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
(!tscHasReachLimitation(pQueryInfo1, pRes1))) { (!tscHasReachLimitation(pQueryInfo1, pRes1))) {
...@@ -483,37 +487,25 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -483,37 +487,25 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData; return hasData;
} }
static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
while (1) { while (1) {
if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); if (pRes->tsrow == NULL) {
pRes->tsrow = malloc(POINTER_BYTES * pQueryInfo->exprsInfo.numOfExprs);
}
SSubqueryState *pState = NULL; bool success = false;
int32_t numOfTableHasRes = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj *pChildObj = pSql->pSubs[i]; if (pSql->pSubs[i] != 0) {
numOfTableHasRes++;
SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param;
pState = pSupporter->pState;
tscDestroyJoinSupporter(pChildObj->param);
taos_free_result(pChildObj);
}
free(pState);
return NULL;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pRes->tsrow == NULL) {
pRes->tsrow = malloc(POINTER_BYTES * pQueryInfo->exprsInfo.numOfExprs);
} }
bool success = false; if (numOfTableHasRes >= 2) { // do merge result
if (pSql->numOfSubs >= 2) { // do merge result
SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes1 = &pSql->pSubs[0]->res;
SSqlRes *pRes2 = &pSql->pSubs[1]->res; SSqlRes *pRes2 = &pSql->pSubs[1]->res;
...@@ -528,8 +520,13 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { ...@@ -528,8 +520,13 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
pRes2->row++; pRes2->row++;
} }
} else { // only one subquery } else { // only one subquery
SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlObj *pSub = pSql->pSubs[0];
doSetResultRowData(pSql->pSubs[0]); if (pSub == NULL) {
pSub = pSql->pSubs[1];
}
SSqlRes *pRes1 = &pSub->res;
doSetResultRowData(pSub);
success = (pRes1->row++ < pRes1->numOfRows); success = (pRes1->row++ < pRes1->numOfRows);
} }
...@@ -543,104 +540,40 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { ...@@ -543,104 +540,40 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
pRes->tsrow[i] = pRes1->tsrow[columnIndex]; pRes->tsrow[i] = pRes1->tsrow[columnIndex];
} }
pRes->numOfTotalInCurrentClause++;
break; break;
} else { // continue retrieve data from vnode } else { // continue retrieve data from vnode
tscFetchDatablockFromSubquery(pSql); if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj
if (pRes->code != TSDB_CODE_SUCCESS) { tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
return NULL;
}
}
}
return pRes->tsrow;
}
/**
* If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
* in case of multi-vnode super table projection query and the result does not reach the limitation.
*/
static bool hasMoreVnodesToTry(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes);
}
static void tscTryQueryNextVnode(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
/*
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes;
while (++pMeterMetaInfo->vnodeIndex < totalVnode) { SSubqueryState *pState = NULL;
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause);
/* for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
* update the limit and offset value for the query on the next vnode, SSqlObj *pChildObj = pSql->pSubs[i];
* according to current retrieval results if (pChildObj == NULL) {
* continue;
* NOTE:
* if the pRes->offset is larger than 0, the start returned position has not reached yet.
* Therefore, the pRes->numOfRows, as well as pRes->numOfTotalInCurrentClause, must be 0.
* The pRes->offset value will be updated by virtual node, during query execution.
*/
if (pQueryInfo->clauseLimit >= 0) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotalInCurrentClause;
} }
pQueryInfo->limit.offset = pRes->offset; SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param;
pState = pSupporter->pState;
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql,
pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
* Therefore, we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql->numOfSubs = 0;
pCmd->command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL);
int32_t ret = tscProcessSql(pSql); // todo check for failure tscDestroyJoinSupporter(pChildObj->param);
if (ret != TSDB_CODE_SUCCESS) { taos_free_result(pChildObj);
pSql->res.code = ret;
return;
} }
// retrieve data free(pState);
assert(pCmd->command == TSDB_SQL_SELECT); return NULL;
pCmd->command = TSDB_SQL_FETCH;
if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
pSql->res.code = ret;
return;
} }
// if the result from current virtual node are empty, try next if exists. otherwise, return the results. tscFetchDatablockFromSubquery(pSql);
if (pRes->numOfRows > 0) { if (pRes->code != TSDB_CODE_SUCCESS) {
break; return NULL;
} }
} }
if (pRes->numOfRows == 0) {
tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
} }
return pRes->tsrow;
} }
TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
...@@ -657,7 +590,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { ...@@ -657,7 +590,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p data from all subqueries have been retrieved to client", pSql); tscTrace("%p data from all subqueries have been retrieved to client", pSql);
return tscJoinResultsetFromBuf(pSql); return tscBuildResFromSubqueries(pSql);
} else { } else {
tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code); tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code);
return NULL; return NULL;
...@@ -679,7 +612,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { ...@@ -679,7 +612,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
tscProcessSql(pSql); // retrieve data from virtual node tscProcessSql(pSql); // retrieve data from virtual node
if (hasMoreVnodesToTry(pSql)) { if (hasMoreVnodesToTry(pSql)) {
tscTryQueryNextVnode(pSql); tscTryQueryNextVnode(pSql, NULL);
} }
/* /*
...@@ -716,7 +649,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -716,7 +649,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current subclause is completed, try the next subclause // current subclause is completed, try the next subclause
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->cmd.command = pQueryInfo->command; pSql->cmd.command = pQueryInfo->command;
pCmd->clauseIndex++; pCmd->clauseIndex++;
...@@ -759,7 +692,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -759,7 +692,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
// current subclause is completed, try the next subclause // current subclause is completed, try the next subclause
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pSql->cmd.command = pQueryInfo->command; pSql->cmd.command = pQueryInfo->command;
pCmd->clauseIndex++; pCmd->clauseIndex++;
...@@ -822,6 +755,11 @@ void taos_free_result(TAOS_RES *res) { ...@@ -822,6 +755,11 @@ void taos_free_result(TAOS_RES *res) {
// set freeFlag to 1 in retrieve message if there are un-retrieved results // set freeFlag to 1 in retrieve message if there are un-retrieved results
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
tscFreeSqlObjPartial(pSql);
return;
}
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
......
...@@ -1888,7 +1888,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1888,7 +1888,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
#endif #endif
char* name = pMeterMetaInfo->name; char* name = pMeterMetaInfo->name;
SMeterMetaInfo* pFinalInfo = NULL; SMeterMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) { if (pPrevSql == NULL) {
...@@ -1907,13 +1906,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1907,13 +1906,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pMeterMetaInfo->tagColumnIndex); pMeterMetaInfo->tagColumnIndex);
} }
assert(pFinalInfo->pMeterMeta != NULL); assert(pFinalInfo->pMeterMeta != NULL && pNewQueryInfo->numOfTables == 1);
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
assert(pFinalInfo->pMetricMeta != NULL); assert(pFinalInfo->pMetricMeta != NULL);
} }
tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d", pSql, pNew, tableIndex, tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type); "fieldInfo:%d, name:%s", pSql, pNew, tableIndex,
pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name);
return pNew; return pNew;
} }
...@@ -1990,3 +1992,102 @@ bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) { ...@@ -1990,3 +1992,102 @@ bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
} }
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
/**
* If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists,
* in case of multi-vnode super table projection query and the result does not reach the limitation.
*/
bool hasMoreVnodesToTry(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes);
}
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
/*
* no result returned from the current virtual node anymore, try the next vnode if exists
* if case of: multi-vnode super table projection query
*/
assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes));
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes;
while (++pMeterMetaInfo->vnodeIndex < totalVnode) {
tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql,
pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause);
/*
* update the limit and offset value for the query on the next vnode,
* according to current retrieval results
*
* NOTE:
* if the pRes->offset is larger than 0, the start returned position has not reached yet.
* Therefore, the pRes->numOfRows, as well as pRes->numOfTotalInCurrentClause, must be 0.
* The pRes->offset value will be updated by virtual node, during query execution.
*/
if (pQueryInfo->clauseLimit >= 0) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotalInCurrentClause;
}
pQueryInfo->limit.offset = pRes->offset;
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql,
pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit);
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries.
* Therefore, we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql->numOfSubs = 0;
pCmd->command = TSDB_SQL_SELECT;
tscResetForNextRetrieve(pRes);
// in case of async query, set the callback function
void* fp1 = pSql->fp;
if (fp1 != NULL) {
assert(fp != NULL);
pSql->fp = fp;
}
int32_t ret = tscProcessSql(pSql); // todo check for failure
if (fp != NULL) {
return;
}
if (ret != TSDB_CODE_SUCCESS) {
pSql->res.code = ret;
return;
}
// retrieve data
assert(pCmd->command == TSDB_SQL_SELECT);
pCmd->command = TSDB_SQL_FETCH;
if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) {
pSql->res.code = ret;
return;
}
// if the result from current virtual node are empty, try next if exists. otherwise, return the results.
if (pRes->numOfRows > 0) {
break;
}
}
if (pRes->numOfRows == 0) {
tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册