提交 2f9d0556 编写于 作者: H hjxilinx

[tbase-1282]

上级 952cbdb2
...@@ -95,6 +95,8 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); ...@@ -95,6 +95,8 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
bool tscIsPointInterpQuery(SSqlCmd* pCmd); bool tscIsPointInterpQuery(SSqlCmd* pCmd);
bool tscIsTWAQuery(SSqlCmd* pCmd); bool tscIsTWAQuery(SSqlCmd* pCmd);
bool tscProjectionQueryOnMetric(SSqlCmd* pCmd); bool tscProjectionQueryOnMetric(SSqlCmd* pCmd);
bool tscProjectionQueryOnTable(SSqlCmd* pCmd);
bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd); bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd);
bool tscQueryOnMetric(SSqlCmd* pCmd); bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SSqlCmd* pCmd); bool tscQueryMetricTags(SSqlCmd* pCmd);
......
...@@ -477,6 +477,8 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql); ...@@ -477,6 +477,8 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj); bool tscIsUpdateQuery(STscObj *pObj);
bool tscHasReachLimitation(SSqlObj* pSql);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
// transfer SSqlInfo to SqlCmd struct // transfer SSqlInfo to SqlCmd struct
......
...@@ -112,8 +112,9 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, ...@@ -112,8 +112,9 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
tSQLSyntaxNode *pNode = NULL; tSQLSyntaxNode *pNode = NULL;
if (pToken->type == TK_ID || pToken->type == TK_TBNAME) { if (pToken->type == TK_ID || pToken->type == TK_TBNAME) {
int32_t i = 0;
if (pToken->type == TK_ID) { if (pToken->type == TK_ID) {
int32_t i = 0;
do { do {
size_t len = strlen(pSchema[i].name); size_t len = strlen(pSchema[i].name);
if (strncmp(pToken->z, pSchema[i].name, pToken->n) == 0 && pToken->n == len) break; if (strncmp(pToken->z, pSchema[i].name, pToken->n) == 0 && pToken->n == len) break;
...@@ -326,8 +327,8 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha ...@@ -326,8 +327,8 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha
uint8_t localOptr = getBinaryExprOptr(&t0); uint8_t localOptr = getBinaryExprOptr(&t0);
if (localOptr == 0) { if (localOptr == 0) {
pError("not support binary operator:%d", t0.type); pError("not support binary operator:%d", t0.type);
free(pBinExpr);
return NULL; return NULL;
free(pBinExpr)
} }
return parseRemainStr(str, pBinExpr, pSchema, localOptr, numOfCols, i); return parseRemainStr(str, pBinExpr, pSchema, localOptr, numOfCols, i);
...@@ -652,8 +653,7 @@ void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, bool (*fp)(tSkipList ...@@ -652,8 +653,7 @@ void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, bool (*fp)(tSkipList
// brutal force search // brutal force search
int64_t num = pResult->num; int64_t num = pResult->num;
for (int32_t i = 0, j = 0; i < pResult->num; ++i) { for (int32_t i = 0, j = 0; i < pResult->num; ++i) {
//if (fp == NULL || (fp != NULL && fp(pResult->pRes[i], pExpr->info) == true)) { if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) {
if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) {
pResult->pRes[j++] = pResult->pRes[i]; pResult->pRes[j++] = pResult->pRes[i];
} else { } else {
num--; num--;
......
...@@ -13,9 +13,9 @@ ...@@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tscJoinProcess.h"
#include "os.h" #include "os.h"
#include "tcache.h" #include "tcache.h"
#include "tscJoinProcess.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tscompression.h" #include "tscompression.h"
...@@ -45,8 +45,8 @@ static bool doCompare(int32_t order, int64_t left, int64_t right) { ...@@ -45,8 +45,8 @@ static bool doCompare(int32_t order, int64_t left, int64_t right) {
} }
} }
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, SJoinSubquerySupporter* pSupporter2, static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1,
TSKEY* st, TSKEY* et) { SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) {
STSBuf* output1 = tsBufCreate(true); STSBuf* output1 = tsBufCreate(true);
STSBuf* output2 = tsBufCreate(true); STSBuf* output2 = tsBufCreate(true);
...@@ -150,14 +150,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor ...@@ -150,14 +150,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
tsBufDestory(pSupporter1->pTSBuf); tsBufDestory(pSupporter1->pTSBuf);
tsBufDestory(pSupporter2->pTSBuf); tsBufDestory(pSupporter2->pTSBuf);
tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", pSql,
pSql, numOfInput1, numOfInput2, output1->numOfTotal); numOfInput1, numOfInput2, output1->numOfTotal);
return output1->numOfTotal; return output1->numOfTotal;
} }
//todo handle failed to create sub query // todo handle failed to create sub query
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, /*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) { SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState,
/*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) {
SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter));
if (pSupporter == NULL) { if (pSupporter == NULL) {
return NULL; return NULL;
...@@ -241,8 +242,10 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -241,8 +242,10 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
} }
// scan all subquery, if one sub query has only ts, ignore it // scan all subquery, if one sub query has only ts, ignore it
tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " tscTrace(
"select clause", pSql, pSql->numOfSubs, numOfSub); "%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in "
"select clause",
pSql, pSql->numOfSubs, numOfSub);
int32_t j = 0; int32_t j = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
...@@ -258,7 +261,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -258,7 +261,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) { if (pNew == NULL) {
pSql->numOfSubs = i; //revise the number of subquery pSql->numOfSubs = i; // revise the number of subquery
pSupporter->pState->numOfTotal = i; pSupporter->pState->numOfTotal = i;
pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
...@@ -296,14 +299,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -296,14 +299,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
tscFieldInfoCalOffset(&pNew->cmd); tscFieldInfoCalOffset(&pNew->cmd);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0);
/* /*
* When handling the projection query, the offset value will be modified for table-table join, which is changed * When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection. * during the timestamp intersection.
*/ */
pSupporter->limit = pSql->cmd.limit; pSupporter->limit = pSql->cmd.limit;
pNew->cmd.limit = pSupporter->limit; pNew->cmd.limit = pSupporter->limit;
// fetch the join tag column // fetch the join tag column
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0); SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0);
...@@ -317,7 +320,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { ...@@ -317,7 +320,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
tscPrintSelectClause(&pNew->cmd); tscPrintSelectClause(&pNew->cmd);
#endif #endif
tscProcessSql(pNew); tscProcessSql(pNew);
} }
...@@ -388,10 +391,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -388,10 +391,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
return; return;
} }
if (numOfRows > 0) { // write the data into disk if (numOfRows > 0) { // write the data into disk
fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f);
fclose(pSupporter->f); fclose(pSupporter->f);
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
if (pBuf == NULL) { if (pBuf == NULL) {
tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows); tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows);
...@@ -405,9 +408,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -405,9 +408,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path); tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path);
pSupporter->pTSBuf = pBuf; pSupporter->pTSBuf = pBuf;
} else { } else {
assert(pSql->cmd.numOfTables == 1); // for subquery, only one metermetaInfo assert(pSql->cmd.numOfTables == 1); // for subquery, only one metermetaInfo
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pMeterMetaInfo->vnodeIndex); tsBufMerge(pSupporter->pTSBuf, pBuf, pMeterMetaInfo->vnodeIndex);
tsBufDestory(pBuf); tsBufDestory(pBuf);
} }
...@@ -418,26 +421,25 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -418,26 +421,25 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSql->res.row = pSql->res.numOfRows; pSql->res.row = pSql->res.numOfRows;
taos_fetch_rows_a(tres, joinRetrieveCallback, param); taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} else if (numOfRows == 0) { // no data from this vnode anymore } else if (numOfRows == 0) { // no data from this vnode anymore
if (tscProjectionQueryOnMetric(&pParentSql->cmd)) { if (tscProjectionQueryOnMetric(&pParentSql->cmd)) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfTables == 1); assert(pSql->cmd.numOfTables == 1);
// for projection query, need to try next vnode // for projection query, need to try next vnode
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback; pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql); tscProcessSql(pSql);
return; return;
} }
} }
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres,
pSupporter->subqueryIndex); pSupporter->subqueryIndex);
doQuitSubquery(pParentSql); doQuitSubquery(pParentSql);
return; return;
} }
...@@ -471,31 +473,34 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { ...@@ -471,31 +473,34 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSupporter->pState->code = numOfRows; pSupporter->pState->code = numOfRows;
tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
} }
if (numOfRows >= 0) {
pSql->res.numOfTotal += pSql->res.numOfRows;
}
if (tscProjectionQueryOnMetric(&pSql->cmd) && numOfRows == 0) { if (tscProjectionQueryOnMetric(&pSql->cmd) && numOfRows == 0) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfTables == 1); assert(pSql->cmd.numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted // for projection query, need to try next vnode if current vnode is exhausted
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSupporter->pState->numOfCompleted = 0; pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1; pSupporter->pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback; pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql); tscProcessSql(pSql);
return; return;
} }
} }
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal); assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal);
tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal, tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal,
pParentSql->res.code); pParentSql->res.code);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = abs(pSupporter->pState->code); pParentSql->res.code = abs(pSupporter->pState->code);
freeSubqueryObj(pParentSql); freeSubqueryObj(pParentSql);
...@@ -510,62 +515,68 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -510,62 +515,68 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
int32_t numOfFetch = 0; int32_t numOfFetch = 0;
assert(pSql->numOfSubs >= 1); assert(pSql->numOfSubs >= 1);
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes* pRes = &pSql->pSubs[i]->res; SSqlRes *pRes = &pSql->pSubs[i]->res;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); SSqlCmd *pCmd = &pSql->pSubs[i]->cmd;
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
if (tscProjectionQueryOnMetric(pCmd)) {
if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes &&
(!tscHasReachLimitation(pSql->pSubs[i]))) {
numOfFetch++; numOfFetch++;
} }
} else { } else {
if (pRes->row >= pRes->numOfRows) { if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && tscProjectionQueryOnTable(pSql))
|| (pRes->numOfRows == 0)) {
numOfFetch++; numOfFetch++;
} }
} }
} }
if (numOfFetch > 0) { if (numOfFetch <= 0) {
tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); return ;
}
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param; // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch);
pSupporter->pState->numOfCompleted = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param;
SSqlObj* pSql1 = pSql->pSubs[i]; pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed
pSupporter->pState->numOfCompleted = 0;
SSqlRes* pRes1 = &pSql1->res; for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlCmd* pCmd1 = &pSql1->cmd; SSqlObj* pSql1 = pSql->pSubs[i];
pSupporter = (SJoinSubquerySupporter*)pSql1->param; SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd;
// wait for all subqueries completed pSupporter = (SJoinSubquerySupporter*)pSql1->param;
pSupporter->pState->numOfTotal = numOfFetch;
assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0);
if (pRes1->row >= pRes1->numOfRows) {
tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1,
pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex);
tscResetForNextRetrieve(pRes1); // wait for all subqueries completed
pSupporter->pState->numOfTotal = numOfFetch;
assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1);
pSql1->fp = joinRetrieveCallback; SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0);
if (pRes1->row >= pRes1->numOfRows) {
tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1,
pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex);
if (pCmd1->command < TSDB_SQL_LOCAL) { tscResetForNextRetrieve(pRes1);
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pSql1->fp = joinRetrieveCallback;
}
tscProcessSql(pSql1); if (pCmd1->command < TSDB_SQL_LOCAL) {
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
} }
}
// wait for all subquery completed tscProcessSql(pSql1);
tsem_wait(&pSql->rspSem); }
} }
// wait for all subquery completed
tsem_wait(&pSql->rspSem);
} }
// all subqueries return, set the result output index // all subqueries return, set the result output index
...@@ -574,11 +585,11 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -574,11 +585,11 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
tscTrace("%p all subquery response, retrieve data", pSql); tscTrace("%p all subquery response, retrieve data", pSql);
if (pRes->pColumnIndex != NULL) { if (pRes->pColumnIndex != NULL) {
return; // the column transfer support struct has been built return; // the column transfer support struct has been built
} }
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols);
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
...@@ -669,22 +680,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -669,22 +680,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscSetupOutputColumnIndex(pParentSql); tscSetupOutputColumnIndex(pParentSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
/** /**
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of data instead of returning to its invoker * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/ */
if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd)) { if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd)) {
assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes); assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value pSupporter->pState->numOfCompleted = 0; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data pSql->fp = joinRetrieveCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH; pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql); tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query } else { // first retrieve from vnode during the secondary stage sub-query
if (pParentSql->fp == NULL) { if (pParentSql->fp == NULL) {
tsem_wait(&pParentSql->emptyRspSem); tsem_wait(&pParentSql->emptyRspSem);
tsem_wait(&pParentSql->emptyRspSem); tsem_wait(&pParentSql->emptyRspSem);
tsem_post(&pParentSql->rspSem); tsem_post(&pParentSql->rspSem);
} else { } else {
// set the command flag must be after the semaphore been correctly set. // set the command flag must be after the semaphore been correctly set.
...@@ -848,7 +860,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { ...@@ -848,7 +860,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes; size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
int64_t pos = ftell(pTSBuf->f); int64_t pos = ftell(pTSBuf->f);
fread(buf, infoSize, 1, pTSBuf->f); fread(buf, infoSize, 1, pTSBuf->f);
// the length value for each vnode is not kept in file, so does not set the length value // the length value for each vnode is not kept in file, so does not set the length value
...@@ -864,17 +876,17 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { ...@@ -864,17 +876,17 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
struct stat fileStat; struct stat fileStat;
fstat(fileno(pTSBuf->f), &fileStat); fstat(fileno(pTSBuf->f), &fileStat);
pTSBuf->fileSize = (uint32_t) fileStat.st_size; pTSBuf->fileSize = (uint32_t)fileStat.st_size;
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
// ascending by default // ascending by default
pTSBuf->cur.order = TSQL_SO_ASC; pTSBuf->cur.order = TSQL_SO_ASC;
pTSBuf->autoDelete = autoDelete; pTSBuf->autoDelete = autoDelete;
tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete);
return pTSBuf; return pTSBuf;
} }
...@@ -899,12 +911,11 @@ void tsBufDestory(STSBuf* pTSBuf) { ...@@ -899,12 +911,11 @@ void tsBufDestory(STSBuf* pTSBuf) {
} }
free(pTSBuf); free(pTSBuf);
} }
static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) {
int32_t last = pTSBuf->numOfVnodes - 1; int32_t last = pTSBuf->numOfVnodes - 1;
assert(last >= 0); assert(last >= 0);
return &pTSBuf->pData[last]; return &pTSBuf->pData[last];
} }
...@@ -944,9 +955,9 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { ...@@ -944,9 +955,9 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
pTSBuf->numOfVnodes += 1; pTSBuf->numOfVnodes += 1;
// update the header info // update the header info
STSBufFileHeader header = STSBufFileHeader header = {
{.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header); STSBufUpdateHeader(pTSBuf, &header);
return tsBufGetLastVnodeInfo(pTSBuf); return tsBufGetLastVnodeInfo(pTSBuf);
} }
...@@ -994,9 +1005,9 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -994,9 +1005,9 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
pTSBuf->fileSize += blockSize; pTSBuf->fileSize += blockSize;
pTSBuf->tsData.len = 0; pTSBuf->tsData.len = 0;
STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
pVnodeBlockInfoEx->info.compLen += blockSize; pVnodeBlockInfoEx->info.compLen += blockSize;
pVnodeBlockInfoEx->info.numOfBlocks += 1; pVnodeBlockInfoEx->info.numOfBlocks += 1;
...@@ -1250,9 +1261,9 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex ...@@ -1250,9 +1261,9 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
} }
STSBlock* pBlock = &pTSBuf->block; STSBlock* pBlock = &pTSBuf->block;
size_t s = pBlock->numOfElem * TSDB_KEYSIZE; size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
/* /*
* In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
...@@ -1260,7 +1271,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex ...@@ -1260,7 +1271,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
if (s > pTSBuf->tsData.allocSize) { if (s > pTSBuf->tsData.allocSize) {
expandBuffer(&pTSBuf->tsData, s); expandBuffer(&pTSBuf->tsData, s);
} }
pTSBuf->tsData.len = pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
...@@ -1307,20 +1318,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) { ...@@ -1307,20 +1318,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
if (pCur->vnodeIndex == -1) { if (pCur->vnodeIndex == -1) {
if (pCur->order == TSQL_SO_ASC) { if (pCur->order == TSQL_SO_ASC) {
tsBufGetBlock(pTSBuf, 0, 0); tsBufGetBlock(pTSBuf, 0, 0);
if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return
tsBufResetPos(pTSBuf); tsBufResetPos(pTSBuf);
return false; return false;
} else { } else {
return true; return true;
} }
} else { // get the last timestamp record in the last block of the last vnode } else { // get the last timestamp record in the last block of the last vnode
assert(pTSBuf->numOfVnodes > 0); assert(pTSBuf->numOfVnodes > 0);
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; int32_t vnodeIndex = pTSBuf->numOfVnodes - 1;
pCur->vnodeIndex = vnodeIndex; pCur->vnodeIndex = vnodeIndex;
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
int32_t blockIndex = pBlockInfo->numOfBlocks - 1; int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
...@@ -1397,8 +1408,6 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { ...@@ -1397,8 +1408,6 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
return elem1; return elem1;
} }
/** /**
* current only support ts comp data from two vnode merge * current only support ts comp data from two vnode merge
* @param pDestBuf * @param pDestBuf
...@@ -1452,7 +1461,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { ...@@ -1452,7 +1461,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
pDestBuf->numOfVnodes = newSize; pDestBuf->numOfVnodes = newSize;
} else { } else {
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf);
pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->len += pSrcBuf->pData[0].len;
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
...@@ -1470,7 +1479,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { ...@@ -1470,7 +1479,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
#else #else
ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size); ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size);
#endif #endif
if (rc == -1) { if (rc == -1) {
tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
return -1; return -1;
...@@ -1482,18 +1491,18 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { ...@@ -1482,18 +1491,18 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
} }
pDestBuf->numOfTotal += pSrcBuf->numOfTotal; pDestBuf->numOfTotal += pSrcBuf->numOfTotal;
int32_t oldSize = pDestBuf->fileSize; int32_t oldSize = pDestBuf->fileSize;
struct stat fileStat; struct stat fileStat;
fstat(fileno(pDestBuf->f), &fileStat); fstat(fileno(pDestBuf->f), &fileStat);
pDestBuf->fileSize = (uint32_t) fileStat.st_size; pDestBuf->fileSize = (uint32_t)fileStat.st_size;
assert(pDestBuf->fileSize == oldSize + size); assert(pDestBuf->fileSize == oldSize + size);
tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, pDestBuf->path, tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf,
fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete);
return 0; return 0;
} }
...@@ -1510,7 +1519,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ ...@@ -1510,7 +1519,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
fwrite((void*) pData, 1, len, pTSBuf->f); fwrite((void*)pData, 1, len, pTSBuf->f);
pTSBuf->fileSize += len; pTSBuf->fileSize += len;
pTSBuf->tsOrder = order; pTSBuf->tsOrder = order;
......
...@@ -5008,6 +5008,8 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) { ...@@ -5008,6 +5008,8 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) {
// handle the limit offset value, validate the limit // handle the limit offset value, validate the limit
pCmd->limit = pQuerySql->limit; pCmd->limit = pQuerySql->limit;
pCmd->globalLimit = pCmd->limit.limit;
pCmd->slimit = pQuerySql->slimit; pCmd->slimit = pQuerySql->slimit;
if (pCmd->slimit.offset < 0 || pCmd->limit.offset < 0) { if (pCmd->slimit.offset < 0 || pCmd->limit.offset < 0) {
......
...@@ -16,21 +16,21 @@ ...@@ -16,21 +16,21 @@
#include "os.h" #include "os.h"
#include "tcache.h" #include "tcache.h"
#include "tlog.h" #include "tlog.h"
#include "tnote.h"
#include "trpc.h" #include "trpc.h"
#include "tscJoinProcess.h" #include "tscJoinProcess.h"
#include "tscProfile.h" #include "tscProfile.h"
#include "tscSQLParser.h"
#include "tscSecondaryMerge.h" #include "tscSecondaryMerge.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsclient.h" #include "tsclient.h"
#include "tscompression.h" #include "tscompression.h"
#include "tsocket.h" #include "tsocket.h"
#include "tscSQLParser.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tnote.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void *param, void **taos) { void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
STscObj *pObj; STscObj *pObj;
taos_init(); taos_init();
...@@ -81,7 +81,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -81,7 +81,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
return NULL; return NULL;
} }
memset(pObj, 0, sizeof(STscObj)); memset(pObj, 0, sizeof(STscObj));
pObj->signature = pObj; pObj->signature = pObj;
...@@ -113,7 +113,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -113,7 +113,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
free(pObj); free(pObj);
return NULL; return NULL;
} }
memset(pSql, 0, sizeof(SSqlObj)); memset(pSql, 0, sizeof(SSqlObj));
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->signature = pSql; pSql->signature = pSql;
...@@ -162,14 +162,14 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -162,14 +162,14 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
void *taos = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL); void *taos = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL);
if (taos != NULL) { if (taos != NULL) {
STscObj* pObj = (STscObj*) taos; STscObj *pObj = (STscObj *)taos;
// version compare only requires the first 3 segments of the version string // version compare only requires the first 3 segments of the version string
int32_t comparedSegments = 3; int32_t comparedSegments = 3;
char client_version[64] = {0}; char client_version[64] = {0};
char server_version[64] = {0}; char server_version[64] = {0};
int clientVersionNumber[4] = {0}; int clientVersionNumber[4] = {0};
int serverVersionNumber[4] = {0}; int serverVersionNumber[4] = {0};
strcpy(client_version, version); strcpy(client_version, version);
strcpy(server_version, taos_get_server_info(taos)); strcpy(server_version, taos_get_server_info(taos));
...@@ -188,7 +188,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha ...@@ -188,7 +188,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
return NULL; return NULL;
} }
for(int32_t i = 0; i < comparedSegments; ++i) { for (int32_t i = 0; i < comparedSegments; ++i) {
if (clientVersionNumber[i] != serverVersionNumber[i]) { if (clientVersionNumber[i] != serverVersionNumber[i]) {
tscError("taos:%p, the %d-th number of server version:%s not matched with client version:%s, close connection", tscError("taos:%p, the %d-th number of server version:%s not matched with client version:%s, close connection",
taos, i, server_version, version); taos, i, server_version, version);
...@@ -225,7 +225,7 @@ void taos_close(TAOS *taos) { ...@@ -225,7 +225,7 @@ void taos_close(TAOS *taos) {
} }
} }
int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
pRes->numOfRows = 1; pRes->numOfRows = 1;
...@@ -251,7 +251,7 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { ...@@ -251,7 +251,7 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
} else { } else {
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj); tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
} }
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql); tscFreeSqlObjPartial(pSql);
} }
...@@ -271,9 +271,10 @@ int taos_query(TAOS *taos, const char *sqlstr) { ...@@ -271,9 +271,10 @@ int taos_query(TAOS *taos, const char *sqlstr) {
size_t sqlLen = strlen(sqlstr); size_t sqlLen = strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) { if (sqlLen > tsMaxSQLStringLen) {
pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql pRes->code =
tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code; return pRes->code;
} }
...@@ -283,7 +284,7 @@ int taos_query(TAOS *taos, const char *sqlstr) { ...@@ -283,7 +284,7 @@ int taos_query(TAOS *taos, const char *sqlstr) {
if (sql == NULL) { if (sql == NULL) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno)); tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno));
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code; return pRes->code;
} }
...@@ -451,59 +452,56 @@ static void **getOneRowFromBuf(SSqlObj *pSql) { ...@@ -451,59 +452,56 @@ static void **getOneRowFromBuf(SSqlObj *pSql) {
return pRes->tsrow; return pRes->tsrow;
} }
static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
while (1) { if (tscProjectionQueryOnMetric(pCmd)) {
bool hasData = true; bool allSubqueryExhausted = true;
if (tscProjectionQueryOnMetric(pCmd)) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
bool allSubqueryExhausted = true; SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfo(pCmd1, 0);
if (pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) { assert(pCmd1->numOfTables == 1);
allSubqueryExhausted = false;
break; /*
} * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
} * available, go on
*/
hasData = !allSubqueryExhausted; if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
} else { //otherwise, in case inner join, if any subquery exhausted, query completed. (!tscHasReachLimitation(pSql->pSubs[i]))) {
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { allSubqueryExhausted = false;
SSqlRes *pRes1 = &pSql->pSubs[i]->res; break;
if (pRes1->numOfRows == 0) {
hasData = false;
break;
}
} }
} }
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) { for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res; SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SMeterMetaInfo* pMeterMeta = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0);
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pSql->pSubs[i]) &&
if (tscProjectionQueryOnMetric(pCmd)) { tscProjectionQueryOnTable(&pSql->pSubs[i]->cmd)) ||
//For multi-vnode projection query, the results may locate in following vnode, so we needs to go on (pRes1->numOfRows == 0)) {
if (pMeterMeta->vnodeIndex < pMeterMeta->pMetricMeta->numOfVnodes) {
break; hasData = false;
} break;
} else { //otherwise, in case inner join, if any subquery exhausted, query completed.
if (pRes1->numOfRows == 0) {
hasData = false;
break;
}
} }
// if (pRes1->numOfRows == 0 && !tscProjectionQueryOnMetric(pCmd) ||
// (pMeterMeta->vnodeIndex >= pMeterMeta->pMetricMeta->numOfVnodes && )) {
// hasData = false;
// break;
// }
} }
}
return hasData;
}
static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (!hasData) { // free all sub sqlobj while (1) {
tscTrace("%p one subquery exhausted, free other %d subquery", pSql, pSql->numOfSubs - 1); if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL; SSubqueryState *pState = NULL;
...@@ -525,21 +523,20 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { ...@@ -525,21 +523,20 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
} }
bool success = false; bool success = false;
if (pSql->numOfSubs >= 2) { // do merge result if (pSql->numOfSubs >= 2) { // do merge result
SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes1 = &pSql->pSubs[0]->res;
SSqlRes *pRes2 = &pSql->pSubs[1]->res; SSqlRes *pRes2 = &pSql->pSubs[1]->res;
if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) { if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) {
doSetResultRowData(pSql->pSubs[0]); doSetResultRowData(pSql->pSubs[0]);
doSetResultRowData(pSql->pSubs[1]); doSetResultRowData(pSql->pSubs[1]);
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%lld, second:%lld\n", key1, key2); // printf("first:%lld, second:%lld\n", key1, key2);
success = true; success = true;
pRes1->row++; pRes1->row++;
pRes2->row++; pRes2->row++;
} }
} else { // only one subquery } else { // only one subquery
SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes1 = &pSql->pSubs[0]->res;
doSetResultRowData(pSql->pSubs[0]); doSetResultRowData(pSql->pSubs[0]);
...@@ -547,7 +544,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { ...@@ -547,7 +544,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
success = (pRes1->row++ < pRes1->numOfRows); success = (pRes1->row++ < pRes1->numOfRows);
} }
if (success) { if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
int32_t tableIndex = pRes->pColumnIndex[i].tableIndex; int32_t tableIndex = pRes->pColumnIndex[i].tableIndex;
int32_t columnIndex = pRes->pColumnIndex[i].columnIndex; int32_t columnIndex = pRes->pColumnIndex[i].columnIndex;
...@@ -557,7 +554,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { ...@@ -557,7 +554,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
} }
break; break;
} else { } else { // continue retrieve data from vnode
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
...@@ -579,7 +576,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { ...@@ -579,7 +576,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql); tscFetchDatablockFromSubquery(pSql);
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p data from all subqueries have been retrieved to client", pSql); tscTrace("%p data from all subqueries have been retrieved to client", pSql);
return tscJoinResultsetFromBuf(pSql); return tscJoinResultsetFromBuf(pSql);
...@@ -625,7 +622,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -625,7 +622,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
// reach the maximum number of output rows, abort // reach the maximum number of output rows, abort
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { if (tscHasReachLimitation(pSql)) {
return NULL; return NULL;
} }
...@@ -645,7 +642,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -645,7 +642,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed. * For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/ */
pSql->numOfSubs = 0; pSql->numOfSubs = 0;
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pCmd->command = TSDB_SQL_SELECT; pCmd->command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL); assert(pSql->fp == NULL);
...@@ -680,7 +677,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -680,7 +677,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
nRows = taos_fetch_block_impl(res, rows); nRows = taos_fetch_block_impl(res, rows);
while (*rows == NULL && tscProjectionQueryOnMetric(pCmd)) { while (*rows == NULL && tscProjectionQueryOnMetric(pCmd)) {
/* reach the maximum number of output rows, abort */ /* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { if (tscHasReachLimitation(pSql)) {
return 0; return 0;
} }
...@@ -690,7 +687,6 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -690,7 +687,6 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pCmd->limit.limit = pSql->cmd.globalLimit - pRes->numOfTotal; pCmd->limit.limit = pSql->cmd.globalLimit - pRes->numOfTotal;
pCmd->limit.offset = pRes->offset; pCmd->limit.offset = pRes->offset;
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL); assert(pSql->fp == NULL);
...@@ -925,12 +921,11 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) ...@@ -925,12 +921,11 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
size_t xlen = strlen(row[i]); size_t xlen = strlen(row[i]);
size_t trueLen = MIN(xlen, fields[i].bytes); size_t trueLen = MIN(xlen, fields[i].bytes);
memcpy(str + len, (char*) row[i], trueLen); memcpy(str + len, (char *)row[i], trueLen);
str[len + trueLen] = ' '; str[len + trueLen] = ' ';
len += (trueLen + 1); len += (trueLen + 1);
} } break;
break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%lld ", *((int64_t *)row[i])); len += sprintf(str + len, "%lld ", *((int64_t *)row[i]));
...@@ -987,7 +982,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -987,7 +982,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return code; return code;
} }
static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t tblListLen) { static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object // must before clean the sqlcmd object
tscRemoveAllMeterMetaInfo(&pSql->cmd, false); tscRemoveAllMeterMetaInfo(&pSql->cmd, false);
tscCleanSqlCmd(&pSql->cmd); tscCleanSqlCmd(&pSql->cmd);
...@@ -998,11 +993,11 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t ...@@ -998,11 +993,11 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t
pCmd->count = 0; pCmd->count = 0;
int code = TSDB_CODE_INVALID_METER_ID; int code = TSDB_CODE_INVALID_METER_ID;
char *str = (char*) tblNameList; char *str = (char *)tblNameList;
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd); SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd);
if ((code = tscAllocPayload(pCmd, tblListLen+16)) != TSDB_CODE_SUCCESS) { if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -1024,7 +1019,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t ...@@ -1024,7 +1019,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t
strtrim(tblName); strtrim(tblName);
len = (uint32_t)strlen(tblName); len = (uint32_t)strlen(tblName);
SSQLToken sToken = {.n = len, .type = TK_ID, .z = tblName}; SSQLToken sToken = {.n = len, .type = TK_ID, .z = tblName};
tSQLGetToken(tblName, &sToken.type); tSQLGetToken(tblName, &sToken.type);
...@@ -1068,7 +1063,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t ...@@ -1068,7 +1063,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t
} }
int taos_load_table_info(TAOS *taos, const char *tableNameList) { int taos_load_table_info(TAOS *taos, const char *tableNameList) {
const int32_t MAX_TABLE_NAME_LENGTH = 12*1024*1024; // 12MB list const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) { if (pObj == NULL || pObj->signature != pObj) {
...@@ -1092,7 +1087,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1092,7 +1087,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return pRes->code; return pRes->code;
} }
char* str = calloc(1, tblListLen + 1); char *str = calloc(1, tblListLen + 1);
if (str == NULL) { if (str == NULL) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
...@@ -1100,7 +1095,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1100,7 +1095,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
} }
strtolower(str, tableNameList); strtolower(str, tableNameList);
pRes->code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen); pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen);
/* /*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
......
...@@ -244,8 +244,7 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) { ...@@ -244,8 +244,7 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) {
//for project query, only the following two function is allowed //for project query, only the following two function is allowed
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); int32_t functionId = tscSqlExprGet(pCmd, i)->functionId;
int32_t functionId = pExpr->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ &&
functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS) { functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS) {
return false; return false;
...@@ -255,6 +254,17 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) { ...@@ -255,6 +254,17 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) {
return true; return true;
} }
bool tscProjectionQueryOnTable(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int32_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) {
return false;
}
}
return true;
}
bool tscIsPointInterpQuery(SSqlCmd* pCmd) { bool tscIsPointInterpQuery(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
...@@ -1673,8 +1683,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1673,8 +1683,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
char key[TSDB_MAX_TAGS_LEN + 1] = {0}; char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid);
printf("-----%s\n", key);
#ifdef _DEBUG_VIEW
printf("the metricmeta key is:%s\n", key);
#endif
char* name = pMeterMetaInfo->name; char* name = pMeterMetaInfo->name;
SMeterMetaInfo* pFinalInfo = NULL; SMeterMetaInfo* pFinalInfo = NULL;
...@@ -1768,3 +1781,12 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s ...@@ -1768,3 +1781,12 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
bool tscHasReachLimitation(SSqlObj* pSql) {
assert(pSql != NULL && pSql->cmd.globalLimit != 0);
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册