diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 9ea6ba7c3f22df5a177a9e6f4aa1996b296fcd1a..3deb4c463fc77e9b7dd8613514d954df96fd2337 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -95,6 +95,8 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); bool tscIsPointInterpQuery(SSqlCmd* pCmd); bool tscIsTWAQuery(SSqlCmd* pCmd); bool tscProjectionQueryOnMetric(SSqlCmd* pCmd); +bool tscProjectionQueryOnTable(SSqlCmd* pCmd); + bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd); bool tscQueryOnMetric(SSqlCmd* pCmd); bool tscQueryMetricTags(SSqlCmd* pCmd); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4101cbfc9e3a713dc9af5a6ff67412db5e9d58e1..b36d2362dacd0a08a2adb5169c162ae4039daf9b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -477,6 +477,8 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(STscObj *pObj); +bool tscHasReachLimitation(SSqlObj* pSql); + int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); // transfer SSqlInfo to SqlCmd struct diff --git a/src/client/src/tscAst.c b/src/client/src/tscAst.c index 6e72fa84389054e2047dbe26a87496aea38a9282..003a3aaf7a8bfef56954a0821746271ec6f8a799 100644 --- a/src/client/src/tscAst.c +++ b/src/client/src/tscAst.c @@ -112,8 +112,9 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, tSQLSyntaxNode *pNode = NULL; if (pToken->type == TK_ID || pToken->type == TK_TBNAME) { + int32_t i = 0; + if (pToken->type == TK_ID) { - int32_t i = 0; do { size_t len = strlen(pSchema[i].name); if (strncmp(pToken->z, pSchema[i].name, pToken->n) == 0 && pToken->n == len) break; @@ -326,8 +327,8 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha uint8_t localOptr = getBinaryExprOptr(&t0); if (localOptr == 0) { pError("not support binary operator:%d", t0.type); + free(pBinExpr); return NULL; - free(pBinExpr) } return parseRemainStr(str, pBinExpr, pSchema, localOptr, numOfCols, i); @@ -652,8 +653,7 @@ void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, bool (*fp)(tSkipList // brutal force search int64_t num = pResult->num; for (int32_t i = 0, j = 0; i < pResult->num; ++i) { - //if (fp == NULL || (fp != NULL && fp(pResult->pRes[i], pExpr->info) == true)) { - if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) { + if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) { pResult->pRes[j++] = pResult->pRes[i]; } else { num--; diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 3126c3a867dc03bc14b5dcb7be18c31cb22fdda0..1e7355d1b174a722be93eda3b8e0a926520709a4 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ +#include "tscJoinProcess.h" #include "os.h" #include "tcache.h" -#include "tscJoinProcess.h" #include "tscUtil.h" #include "tsclient.h" #include "tscompression.h" @@ -45,8 +45,8 @@ static bool doCompare(int32_t order, int64_t left, int64_t right) { } } -static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, SJoinSubquerySupporter* pSupporter2, - TSKEY* st, TSKEY* et) { +static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, + SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) { STSBuf* output1 = tsBufCreate(true); STSBuf* output2 = tsBufCreate(true); @@ -150,14 +150,15 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor tsBufDestory(pSupporter1->pTSBuf); tsBufDestory(pSupporter2->pTSBuf); - tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", - pSql, numOfInput1, numOfInput2, output1->numOfTotal); + tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", pSql, + numOfInput1, numOfInput2, output1->numOfTotal); return output1->numOfTotal; } -//todo handle failed to create sub query -SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, /*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) { +// todo handle failed to create sub query +SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, + /*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) { SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); if (pSupporter == NULL) { return NULL; @@ -241,8 +242,10 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { } // scan all subquery, if one sub query has only ts, ignore it - tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " - "select clause", pSql, pSql->numOfSubs, numOfSub); + tscTrace( + "%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " + "select clause", + pSql, pSql->numOfSubs, numOfSub); int32_t j = 0; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { @@ -258,7 +261,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); if (pNew == NULL) { - pSql->numOfSubs = i; //revise the number of subquery + pSql->numOfSubs = i; // revise the number of subquery pSupporter->pState->numOfTotal = i; pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -296,14 +299,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { tscFieldInfoCalOffset(&pNew->cmd); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); - + /* * When handling the projection query, the offset value will be modified for table-table join, which is changed * during the timestamp intersection. */ pSupporter->limit = pSql->cmd.limit; pNew->cmd.limit = pSupporter->limit; - + // fetch the join tag column if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0); @@ -317,7 +320,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { #ifdef _DEBUG_VIEW tscPrintSelectClause(&pNew->cmd); #endif - + tscProcessSql(pNew); } @@ -388,10 +391,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { return; } - if (numOfRows > 0) { // write the data into disk + if (numOfRows > 0) { // write the data into disk fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); fclose(pSupporter->f); - + STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); if (pBuf == NULL) { tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows); @@ -405,9 +408,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path); pSupporter->pTSBuf = pBuf; } else { - assert(pSql->cmd.numOfTables == 1); // for subquery, only one metermetaInfo + assert(pSql->cmd.numOfTables == 1); // for subquery, only one metermetaInfo SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); - + tsBufMerge(pSupporter->pTSBuf, pBuf, pMeterMetaInfo->vnodeIndex); tsBufDestory(pBuf); } @@ -418,26 +421,25 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { pSql->res.row = pSql->res.numOfRows; taos_fetch_rows_a(tres, joinRetrieveCallback, param); - } else if (numOfRows == 0) { // no data from this vnode anymore + } else if (numOfRows == 0) { // no data from this vnode anymore if (tscProjectionQueryOnMetric(&pParentSql->cmd)) { - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); assert(pSql->cmd.numOfTables == 1); - + // for projection query, need to try next vnode if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; tscProcessSql(pSql); - + return; } } - - if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, - pSupporter->subqueryIndex); + pSupporter->subqueryIndex); doQuitSubquery(pParentSql); return; } @@ -471,31 +473,34 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { pSupporter->pState->code = numOfRows; tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); } - + + if (numOfRows >= 0) { + pSql->res.numOfTotal += pSql->res.numOfRows; + } + if (tscProjectionQueryOnMetric(&pSql->cmd) && numOfRows == 0) { - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); assert(pSql->cmd.numOfTables == 1); - + // for projection query, need to try next vnode if current vnode is exhausted if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { - pSupporter->pState->numOfCompleted = 0; pSupporter->pState->numOfTotal = 1; - + pSql->cmd.command = TSDB_SQL_SELECT; pSql->fp = tscJoinQueryCallback; tscProcessSql(pSql); - + return; } } - + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal); - + tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal, - pParentSql->res.code); - + pParentSql->res.code); + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { pParentSql->res.code = abs(pSupporter->pState->code); freeSubqueryObj(pParentSql); @@ -510,62 +515,68 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { int32_t numOfFetch = 0; assert(pSql->numOfSubs >= 1); - + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlRes* pRes = &pSql->pSubs[i]->res; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); - - if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { - if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + SSqlRes *pRes = &pSql->pSubs[i]->res; + SSqlCmd *pCmd = &pSql->pSubs[i]->cmd; + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + + if (tscProjectionQueryOnMetric(pCmd)) { + if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes && + (!tscHasReachLimitation(pSql->pSubs[i]))) { numOfFetch++; } } else { - if (pRes->row >= pRes->numOfRows) { + if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && tscProjectionQueryOnTable(pSql)) + || (pRes->numOfRows == 0)) { numOfFetch++; } } } - if (numOfFetch > 0) { - tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); + if (numOfFetch <= 0) { + return ; + } - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param; - pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed - pSupporter->pState->numOfCompleted = 0; + // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled + tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj* pSql1 = pSql->pSubs[i]; + SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param; + pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed + pSupporter->pState->numOfCompleted = 0; - SSqlRes* pRes1 = &pSql1->res; - SSqlCmd* pCmd1 = &pSql1->cmd; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj* pSql1 = pSql->pSubs[i]; - pSupporter = (SJoinSubquerySupporter*)pSql1->param; + SSqlRes* pRes1 = &pSql1->res; + SSqlCmd* pCmd1 = &pSql1->cmd; - // wait for all subqueries completed - pSupporter->pState->numOfTotal = numOfFetch; - - assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0); - if (pRes1->row >= pRes1->numOfRows) { - tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, - pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex); + pSupporter = (SJoinSubquerySupporter*)pSql1->param; - tscResetForNextRetrieve(pRes1); + // wait for all subqueries completed + pSupporter->pState->numOfTotal = numOfFetch; + assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1); - pSql1->fp = joinRetrieveCallback; + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0); + + if (pRes1->row >= pRes1->numOfRows) { + tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, + pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex); - if (pCmd1->command < TSDB_SQL_LOCAL) { - pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - } + tscResetForNextRetrieve(pRes1); + pSql1->fp = joinRetrieveCallback; - tscProcessSql(pSql1); + if (pCmd1->command < TSDB_SQL_LOCAL) { + pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - } - // wait for all subquery completed - tsem_wait(&pSql->rspSem); + tscProcessSql(pSql1); + } } + + // wait for all subquery completed + tsem_wait(&pSql->rspSem); } // all subqueries return, set the result output index @@ -574,11 +585,11 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { SSqlRes* pRes = &pSql->res; tscTrace("%p all subquery response, retrieve data", pSql); - + if (pRes->pColumnIndex != NULL) { return; // the column transfer support struct has been built } - + pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols); for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { @@ -669,22 +680,23 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { tscSetupOutputColumnIndex(pParentSql); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); - + /** - * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of data instead of returning to its invoker + * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of + * data instead of returning to its invoker */ if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd)) { assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes); pSupporter->pState->numOfCompleted = 0; // reset the record value - + pSql->fp = joinRetrieveCallback; // continue retrieve data pSql->cmd.command = TSDB_SQL_FETCH; tscProcessSql(pSql); - } else { // first retrieve from vnode during the secondary stage sub-query + } else { // first retrieve from vnode during the secondary stage sub-query if (pParentSql->fp == NULL) { tsem_wait(&pParentSql->emptyRspSem); tsem_wait(&pParentSql->emptyRspSem); - + tsem_post(&pParentSql->rspSem); } else { // set the command flag must be after the semaphore been correctly set. @@ -848,7 +860,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes; STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); - int64_t pos = ftell(pTSBuf->f); + int64_t pos = ftell(pTSBuf->f); fread(buf, infoSize, 1, pTSBuf->f); // the length value for each vnode is not kept in file, so does not set the length value @@ -864,17 +876,17 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { struct stat fileStat; fstat(fileno(pTSBuf->f), &fileStat); - pTSBuf->fileSize = (uint32_t) fileStat.st_size; + pTSBuf->fileSize = (uint32_t)fileStat.st_size; tsBufResetPos(pTSBuf); // ascending by default pTSBuf->cur.order = TSQL_SO_ASC; pTSBuf->autoDelete = autoDelete; - + tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), - pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); - + pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); + return pTSBuf; } @@ -899,12 +911,11 @@ void tsBufDestory(STSBuf* pTSBuf) { } free(pTSBuf); - } static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { int32_t last = pTSBuf->numOfVnodes - 1; - + assert(last >= 0); return &pTSBuf->pData[last]; } @@ -944,9 +955,9 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { pTSBuf->numOfVnodes += 1; // update the header info - STSBufFileHeader header = - {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; - + STSBufFileHeader header = { + .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; + STSBufUpdateHeader(pTSBuf, &header); return tsBufGetLastVnodeInfo(pTSBuf); } @@ -994,9 +1005,9 @@ static void writeDataToDisk(STSBuf* pTSBuf) { pTSBuf->fileSize += blockSize; pTSBuf->tsData.len = 0; - + STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); - + pVnodeBlockInfoEx->info.compLen += blockSize; pVnodeBlockInfoEx->info.numOfBlocks += 1; @@ -1250,9 +1261,9 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex } STSBlock* pBlock = &pTSBuf->block; - + size_t s = pBlock->numOfElem * TSDB_KEYSIZE; - + /* * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function @@ -1260,7 +1271,7 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex if (s > pTSBuf->tsData.allocSize) { expandBuffer(&pTSBuf->tsData, s); } - + pTSBuf->tsData.len = tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); @@ -1307,20 +1318,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) { if (pCur->vnodeIndex == -1) { if (pCur->order == TSQL_SO_ASC) { tsBufGetBlock(pTSBuf, 0, 0); - - if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return + + if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return tsBufResetPos(pTSBuf); return false; } else { return true; } - - } else { // get the last timestamp record in the last block of the last vnode + + } else { // get the last timestamp record in the last block of the last vnode assert(pTSBuf->numOfVnodes > 0); - + int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; pCur->vnodeIndex = vnodeIndex; - + int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); int32_t blockIndex = pBlockInfo->numOfBlocks - 1; @@ -1397,8 +1408,6 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { return elem1; } - - /** * current only support ts comp data from two vnode merge * @param pDestBuf @@ -1452,7 +1461,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { pDestBuf->numOfVnodes = newSize; } else { STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); - + pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; @@ -1470,7 +1479,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { #else ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size); #endif - + if (rc == -1) { tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); return -1; @@ -1482,18 +1491,18 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { } pDestBuf->numOfTotal += pSrcBuf->numOfTotal; - + int32_t oldSize = pDestBuf->fileSize; - + struct stat fileStat; fstat(fileno(pDestBuf->f), &fileStat); - pDestBuf->fileSize = (uint32_t) fileStat.st_size; - + pDestBuf->fileSize = (uint32_t)fileStat.st_size; + assert(pDestBuf->fileSize == oldSize + size); - - tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, pDestBuf->path, - fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); - + + tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, + pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); + return 0; } @@ -1510,7 +1519,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); - fwrite((void*) pData, 1, len, pTSBuf->f); + fwrite((void*)pData, 1, len, pTSBuf->f); pTSBuf->fileSize += len; pTSBuf->tsOrder = order; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 128cc7f3a195c90c0f85aabdc4535c3b27da7984..752c5d123f78155bd501d0c008560edca8da2a40 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5008,6 +5008,8 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) { // handle the limit offset value, validate the limit pCmd->limit = pQuerySql->limit; + pCmd->globalLimit = pCmd->limit.limit; + pCmd->slimit = pQuerySql->slimit; if (pCmd->slimit.offset < 0 || pCmd->limit.offset < 0) { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index f3d34075822d445ef737bcca6b46784cff197602..04f9fc0aa62b01d2875909376ec9a626e59246be 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -16,21 +16,21 @@ #include "os.h" #include "tcache.h" #include "tlog.h" +#include "tnote.h" #include "trpc.h" #include "tscJoinProcess.h" #include "tscProfile.h" +#include "tscSQLParser.h" #include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tsclient.h" #include "tscompression.h" #include "tsocket.h" -#include "tscSQLParser.h" #include "ttimer.h" #include "tutil.h" -#include "tnote.h" -TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), - void *param, void **taos) { +TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, + void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { STscObj *pObj; taos_init(); @@ -81,7 +81,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; return NULL; } - + memset(pObj, 0, sizeof(STscObj)); pObj->signature = pObj; @@ -113,7 +113,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const free(pObj); return NULL; } - + memset(pSql, 0, sizeof(SSqlObj)); pSql->pTscObj = pObj; pSql->signature = pSql; @@ -162,14 +162,14 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha void *taos = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL); if (taos != NULL) { - STscObj* pObj = (STscObj*) taos; + STscObj *pObj = (STscObj *)taos; // version compare only requires the first 3 segments of the version string int32_t comparedSegments = 3; - char client_version[64] = {0}; - char server_version[64] = {0}; - int clientVersionNumber[4] = {0}; - int serverVersionNumber[4] = {0}; + char client_version[64] = {0}; + char server_version[64] = {0}; + int clientVersionNumber[4] = {0}; + int serverVersionNumber[4] = {0}; strcpy(client_version, version); strcpy(server_version, taos_get_server_info(taos)); @@ -188,7 +188,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha return NULL; } - for(int32_t i = 0; i < comparedSegments; ++i) { + for (int32_t i = 0; i < comparedSegments; ++i) { if (clientVersionNumber[i] != serverVersionNumber[i]) { tscError("taos:%p, the %d-th number of server version:%s not matched with client version:%s, close connection", taos, i, server_version, version); @@ -225,7 +225,7 @@ void taos_close(TAOS *taos) { } } -int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { +int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; pRes->numOfRows = 1; @@ -251,7 +251,7 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { } else { tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj); } - + if (pRes->code != TSDB_CODE_SUCCESS) { tscFreeSqlObjPartial(pSql); } @@ -271,9 +271,10 @@ int taos_query(TAOS *taos, const char *sqlstr) { size_t sqlLen = strlen(sqlstr); if (sqlLen > tsMaxSQLStringLen) { - pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql + pRes->code = + tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); - + return pRes->code; } @@ -283,7 +284,7 @@ int taos_query(TAOS *taos, const char *sqlstr) { if (sql == NULL) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno)); - + tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); return pRes->code; } @@ -451,59 +452,56 @@ static void **getOneRowFromBuf(SSqlObj *pSql) { return pRes->tsrow; } -static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { +static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { + bool hasData = true; SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - while (1) { - bool hasData = true; + if (tscProjectionQueryOnMetric(pCmd)) { + bool allSubqueryExhausted = true; - if (tscProjectionQueryOnMetric(pCmd)) { - bool allSubqueryExhausted = true; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); - if (pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes) { - allSubqueryExhausted = false; - break; - } - } - - hasData = !allSubqueryExhausted; - } else { //otherwise, in case inner join, if any subquery exhausted, query completed. - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlRes *pRes1 = &pSql->pSubs[i]->res; - if (pRes1->numOfRows == 0) { - hasData = false; - break; - } + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlRes *pRes1 = &pSql->pSubs[i]->res; + SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; + + SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfo(pCmd1, 0); + assert(pCmd1->numOfTables == 1); + + /* + * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are + * available, go on + */ + if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && + (!tscHasReachLimitation(pSql->pSubs[i]))) { + allSubqueryExhausted = false; + break; } } - + + hasData = !allSubqueryExhausted; + } else { // otherwise, in case inner join, if any subquery exhausted, query completed. for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlRes *pRes1 = &pSql->pSubs[i]->res; - SMeterMetaInfo* pMeterMeta = tscGetMeterMetaInfo(&pSql->pSubs[i]->cmd, 0); - - if (tscProjectionQueryOnMetric(pCmd)) { - //For multi-vnode projection query, the results may locate in following vnode, so we needs to go on - if (pMeterMeta->vnodeIndex < pMeterMeta->pMetricMeta->numOfVnodes) { - break; - } - } else { //otherwise, in case inner join, if any subquery exhausted, query completed. - if (pRes1->numOfRows == 0) { - hasData = false; - break; - } + + if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pSql->pSubs[i]) && + tscProjectionQueryOnTable(&pSql->pSubs[i]->cmd)) || + (pRes1->numOfRows == 0)) { + + hasData = false; + break; } -// if (pRes1->numOfRows == 0 && !tscProjectionQueryOnMetric(pCmd) || -// (pMeterMeta->vnodeIndex >= pMeterMeta->pMetricMeta->numOfVnodes && )) { -// hasData = false; -// break; -// } } + } + + return hasData; +} + +static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; - if (!hasData) { // free all sub sqlobj - tscTrace("%p one subquery exhausted, free other %d subquery", pSql, pSql->numOfSubs - 1); + while (1) { + if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj + tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); SSubqueryState *pState = NULL; @@ -525,21 +523,20 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { } bool success = false; - if (pSql->numOfSubs >= 2) { // do merge result + if (pSql->numOfSubs >= 2) { // do merge result SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes2 = &pSql->pSubs[1]->res; if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) { doSetResultRowData(pSql->pSubs[0]); doSetResultRowData(pSql->pSubs[1]); -// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; -// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; -// printf("first:%lld, second:%lld\n", key1, key2); + // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; + // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; + // printf("first:%lld, second:%lld\n", key1, key2); success = true; pRes1->row++; pRes2->row++; } - } else { // only one subquery SSqlRes *pRes1 = &pSql->pSubs[0]->res; doSetResultRowData(pSql->pSubs[0]); @@ -547,7 +544,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { success = (pRes1->row++ < pRes1->numOfRows); } - if (success) { + if (success) { // current row of final output has been built, return to app for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { int32_t tableIndex = pRes->pColumnIndex[i].tableIndex; int32_t columnIndex = pRes->pColumnIndex[i].columnIndex; @@ -557,7 +554,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { } break; - } else { + } else { // continue retrieve data from vnode tscFetchDatablockFromSubquery(pSql); if (pRes->code != TSDB_CODE_SUCCESS) { return NULL; @@ -579,7 +576,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); - + if (pRes->code == TSDB_CODE_SUCCESS) { tscTrace("%p data from all subqueries have been retrieved to client", pSql); return tscJoinResultsetFromBuf(pSql); @@ -625,7 +622,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); // reach the maximum number of output rows, abort - if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { + if (tscHasReachLimitation(pSql)) { return NULL; } @@ -645,7 +642,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. */ pSql->numOfSubs = 0; - + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { pCmd->command = TSDB_SQL_SELECT; assert(pSql->fp == NULL); @@ -680,7 +677,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { nRows = taos_fetch_block_impl(res, rows); while (*rows == NULL && tscProjectionQueryOnMetric(pCmd)) { /* reach the maximum number of output rows, abort */ - if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { + if (tscHasReachLimitation(pSql)) { return 0; } @@ -690,7 +687,6 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { pCmd->limit.limit = pSql->cmd.globalLimit - pRes->numOfTotal; pCmd->limit.offset = pRes->offset; - if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { pSql->cmd.command = TSDB_SQL_SELECT; assert(pSql->fp == NULL); @@ -925,12 +921,11 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) size_t xlen = strlen(row[i]); size_t trueLen = MIN(xlen, fields[i].bytes); - memcpy(str + len, (char*) row[i], trueLen); + memcpy(str + len, (char *)row[i], trueLen); str[len + trueLen] = ' '; len += (trueLen + 1); - } - break; + } break; case TSDB_DATA_TYPE_TIMESTAMP: len += sprintf(str + len, "%lld ", *((int64_t *)row[i])); @@ -987,7 +982,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { return code; } -static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t tblListLen) { +static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) { // must before clean the sqlcmd object tscRemoveAllMeterMetaInfo(&pSql->cmd, false); tscCleanSqlCmd(&pSql->cmd); @@ -998,11 +993,11 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t pCmd->count = 0; int code = TSDB_CODE_INVALID_METER_ID; - char *str = (char*) tblNameList; + char *str = (char *)tblNameList; SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd); - if ((code = tscAllocPayload(pCmd, tblListLen+16)) != TSDB_CODE_SUCCESS) { + if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) { return code; } @@ -1024,7 +1019,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t strtrim(tblName); len = (uint32_t)strlen(tblName); - + SSQLToken sToken = {.n = len, .type = TK_ID, .z = tblName}; tSQLGetToken(tblName, &sToken.type); @@ -1068,7 +1063,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t } int taos_load_table_info(TAOS *taos, const char *tableNameList) { - const int32_t MAX_TABLE_NAME_LENGTH = 12*1024*1024; // 12MB list + const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { @@ -1092,7 +1087,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return pRes->code; } - char* str = calloc(1, tblListLen + 1); + char *str = calloc(1, tblListLen + 1); if (str == NULL) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscError("%p failed to malloc sql string buffer", pSql); @@ -1100,7 +1095,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } strtolower(str, tableNameList); - pRes->code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen); + pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen); /* * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4521bcb156aa232c98ab365ec713d1ef26edc02d..d0da79651e6c73cad34b56ed346d67c7c2b97e56 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -244,8 +244,7 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) { //for project query, only the following two function is allowed for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { - SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); - int32_t functionId = pExpr->functionId; + int32_t functionId = tscSqlExprGet(pCmd, i)->functionId; if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS) { return false; @@ -255,6 +254,17 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) { return true; } +bool tscProjectionQueryOnTable(SSqlCmd* pCmd) { + for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { + int32_t functionId = tscSqlExprGet(pCmd, i)->functionId; + if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) { + return false; + } + } + + return true; +} + bool tscIsPointInterpQuery(SSqlCmd* pCmd) { for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); @@ -1673,8 +1683,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void char key[TSDB_MAX_TAGS_LEN + 1] = {0}; tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); - printf("-----%s\n", key); - + +#ifdef _DEBUG_VIEW + printf("the metricmeta key is:%s\n", key); +#endif + char* name = pMeterMetaInfo->name; SMeterMetaInfo* pFinalInfo = NULL; @@ -1768,3 +1781,12 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s return TSDB_CODE_INVALID_SQL; } +bool tscHasReachLimitation(SSqlObj* pSql) { + assert(pSql != NULL && pSql->cmd.globalLimit != 0); + + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit); +} +