From 8de14a0c7d05c58b347e655cd7b22e565e5a9ddd Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 6 Dec 2019 10:08:42 +0800 Subject: [PATCH] [tbase-1282] --- src/client/inc/tscSecondaryMerge.h | 2 +- src/client/inc/tscUtil.h | 69 ++++++++------- src/client/inc/tsclient.h | 74 ++++++++-------- src/client/src/tscAsync.c | 31 ++++--- src/client/src/tscJoinProcess.c | 130 ++++++++++++++++++++++------- src/client/src/tscParseInsert.c | 15 ++-- src/client/src/tscPrepare.c | 8 +- src/client/src/tscServer.c | 74 ++++++++-------- src/client/src/tscSql.c | 16 +++- src/client/src/tscUtil.c | 28 ++++--- 10 files changed, 268 insertions(+), 179 deletions(-) diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index 4c95994dfa..0c6472f6b3 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -94,7 +94,7 @@ typedef struct SRetrieveSupport { tOrderDescriptor *pOrderDescriptor; tColModel * pFinalColModel; // colModel for final result SSubqueryState * pState; - int32_t vnodeIdx; // index of current vnode in vnode list + int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSqlObj; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to uint32_t numOfRetry; // record the number of retry times diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index da4ebc4e9c..41e1389c8e 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -26,11 +26,12 @@ extern "C" { #include #include #include "textbuffer.h" +#include "tscSecondaryMerge.h" #include "tsclient.h" #include "tsdb.h" -#include "tscSecondaryMerge.h" -#define UTIL_METER_IS_METRIC(metaInfo) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC)) +#define UTIL_METER_IS_METRIC(metaInfo) \ + (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC)) #define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo))) #define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \ (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE)) @@ -62,28 +63,27 @@ typedef struct SJoinSubquerySupporter { SFieldInfo fieldsInfo; STagCond tagCond; SSqlGroupbyExpr groupbyExpr; - - struct STSBuf* pTSBuf; - - FILE* f; - char path[PATH_MAX]; + struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array + FILE* f; // temporary file in order to create TSBuf + char path[PATH_MAX]; // temporary file path } SJoinSubquerySupporter; -void tscDestroyDataBlock(STableDataBlocks* pDataBlock); +void tscDestroyDataBlock(STableDataBlocks* pDataBlock); STableDataBlocks* tscCreateDataBlock(int32_t size); -void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); -SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset); - -SDataBlockList* tscCreateBlockArrayList(); -void* tscDestroyBlockArrayList(SDataBlockList* pList); -int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -void tscFreeUnusedDataBlocks(SDataBlockList* pList); -int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); +void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); +SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, + uint32_t offset); + +SDataBlockList* tscCreateBlockArrayList(); +void* tscDestroyBlockArrayList(SDataBlockList* pList); +int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); +void tscFreeUnusedDataBlocks(SDataBlockList* pList); +int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, char* tableId); STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name); -SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); +SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); /** @@ -108,7 +108,7 @@ void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex); int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex); -void tscClearInterpInfo(SSqlCmd* pCmd); +void tscClearInterpInfo(SSqlCmd* pCmd); bool tscIsInsertOrImportData(char* sqlstr); @@ -128,9 +128,9 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList void tscFieldInfoCopyAll(SFieldInfo* src, SFieldInfo* dst); TAOS_FIELD* tscFieldInfoGetField(SSqlCmd* pCmd, int32_t index); -int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index); -int32_t tscGetResRowLength(SSqlCmd* pCmd); -void tscClearFieldInfo(SFieldInfo* pFieldInfo); +int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index); +int32_t tscGetResRowLength(SSqlCmd* pCmd); +void tscClearFieldInfo(SFieldInfo* pFieldInfo); void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); @@ -142,15 +142,15 @@ SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int int16_t size); SSqlExpr* tscSqlExprGet(SSqlCmd* pCmd, int32_t index); -void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); +void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); SColumnBase* tscColumnBaseInfoInsert(SSqlCmd* pCmd, SColumnIndex* colIndex); -void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src); -void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src); +void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src); +void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src); -void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex); +void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex); SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index); -void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex); +void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex); void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size); void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo); @@ -163,7 +163,7 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId); // get starter position of metric query condition (query on tags) in SSqlCmd.payload SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex); -void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); +void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); @@ -176,19 +176,19 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb); void tscCleanSqlCmd(SSqlCmd* pCmd); bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql); -void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache); +void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache); SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index); SMeterMetaInfo* tscGetMeterMetaInfoByUid(SSqlCmd* pCmd, uint64_t uid, int32_t* index); -void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); +void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta, int16_t numOfTags, int16_t* tags); SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SSqlCmd* pCmd); void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr, uint64_t uid); -int tscGetMetricMeta(SSqlObj* pSql); -int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex); -int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists); +int tscGetMetricMeta(SSqlObj* pSql); +int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex); +int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists); void tscResetForNextRetrieve(SSqlRes* pRes); @@ -212,9 +212,8 @@ void tscDoQuery(SSqlObj* pSql); * @param pPrevSql * @return */ -SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param, - SSqlObj* pPrevSql); -void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex); +SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql); +void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex); void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d439ba9929..4101cbfc9e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -107,22 +107,25 @@ enum _sql_cmd { struct SSqlInfo; typedef struct SSqlGroupbyExpr { - int16_t tableIndex; - + int16_t tableIndex; int16_t numOfGroupCols; SColIndexEx columnInfo[TSDB_MAX_TAGS]; // group by columns information - - int16_t orderIndex; // order by column index - int16_t orderType; // order by type: asc/desc + int16_t orderIndex; // order by column index + int16_t orderType; // order by type: asc/desc } SSqlGroupbyExpr; typedef struct SMeterMetaInfo { - SMeterMeta * pMeterMeta; // metermeta - SMetricMeta *pMetricMeta; // metricmeta - - char name[TSDB_METER_ID_LEN + 1]; - int16_t numOfTags; // total required tags in query, including groupby tags - int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection + SMeterMeta * pMeterMeta; // metermeta + SMetricMeta *pMetricMeta; // metricmeta + + /* + * 1. keep the vnode index during the multi-vnode super table projection query + * 2. keep the vnode index for multi-vnode insertion + */ + int32_t vnodeIndex; + char name[TSDB_METER_ID_LEN + 1]; // table(super table) name + int16_t numOfTags; // total required tags in query, including groupby tags + int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection } SMeterMetaInfo; /* the structure for sql function in select clause */ @@ -188,7 +191,7 @@ typedef struct SString { typedef struct SCond { uint64_t uid; - char* cond; + char * cond; } SCond; typedef struct SJoinNode { @@ -262,15 +265,15 @@ typedef struct SDataBlockList { typedef struct { SOrderVal order; int command; - int count;// TODO refactor + int count; // TODO refactor union { - bool existsCheck; // check if the table exists - int8_t showType; // show command type + bool existsCheck; // check if the table exists + int8_t showType; // show command type }; - + int8_t isInsertFromFile; // load data from file or not - bool import; // import/insert type + bool import; // import/insert type char msgType; uint16_t type; // query type char intervalTimeUnit; @@ -296,7 +299,6 @@ typedef struct { SLimitVal slimit; int64_t globalLimit; STagCond tagCond; - int16_t vnodeIdx; // vnode index in pMetricMeta for metric query int16_t interpoType; // interpolate type int16_t numOfTables; @@ -366,25 +368,23 @@ typedef struct _sql_obj { STscObj *pTscObj; void (*fp)(); void (*fetchFp)(); - void * param; - uint32_t ip; - short vnode; - int64_t stime; - uint32_t queryId; - void * thandle; - void * pStream; - char * sqlstr; - char retry; - char maxRetry; - char index; - char freed : 4; - char listed : 4; - tsem_t rspSem; - tsem_t emptyRspSem; - - SSqlCmd cmd; - SSqlRes res; - + void * param; + uint32_t ip; + short vnode; + int64_t stime; + uint32_t queryId; + void * thandle; + void * pStream; + char * sqlstr; + char retry; + char maxRetry; + char index; + char freed : 4; + char listed : 4; + tsem_t rspSem; + tsem_t emptyRspSem; + SSqlCmd cmd; + SSqlRes res; char numOfSubs; struct _sql_obj **pSubs; struct _sql_obj * prev, *next; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index abf91e7c43..1268844d77 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -121,7 +121,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf // sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx if (numOfRows == 0 && tscProjectionQueryOnMetric(pCmd)) { // vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx - assert(pCmd->vnodeIdx >= 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + assert(pMeterMetaInfo->vnodeIndex >= 0); /* reach the maximum number of output rows, abort */ if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { @@ -133,8 +134,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal; pCmd->limit.offset = pRes->offset; - if ((++(pCmd->vnodeIdx)) < tscGetMeterMetaInfo(pCmd, 0)->pMetricMeta->numOfVnodes) { - tscTrace("%p retrieve data from next vnode:%d", pSql, pCmd->vnodeIdx); + if ((++(pMeterMetaInfo->vnodeIndex)) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + tscTrace("%p retrieve data from next vnode:%d", pSql, pMeterMetaInfo->vnodeIndex); pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first. @@ -272,7 +273,8 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { /* * vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved */ - assert(pCmd->vnodeIdx >= 1); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + assert(pMeterMetaInfo->vnodeIndex >= 0); /* reach the maximum number of output rows, abort */ if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { @@ -283,7 +285,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { /* update the limit value according to current retrieval results */ pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal; - if ((++pCmd->vnodeIdx) <= tscGetMeterMetaInfo(pCmd, 0)->pMetricMeta->numOfVnodes) { + if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) { pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first. tscResetForNextRetrieve(pRes); @@ -404,9 +406,12 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) int32_t code = TSDB_CODE_SUCCESS; assert(!pCmd->isInsertFromFile && pSql->signature == pSql); - + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + assert(pCmd->numOfTables == 1); + SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - if (pDataBlocks == NULL || pCmd->vnodeIdx >= pDataBlocks->nSize) { + if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { // restore user defined fp pSql->fp = pSql->fetchFp; tscTrace("%p Async insertion completed, destroy data block list", pSql); @@ -418,17 +423,17 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) (*pSql->fp)(pSql->param, tres, numOfRows); } else { do { - code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pCmd->vnodeIdx++]); + code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pMeterMetaInfo->vnodeIndex++]); if (code != TSDB_CODE_SUCCESS) { tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", - pSql, pCmd->vnodeIdx - 1, pDataBlocks->nSize, code); + pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize, code); } - } while (code != TSDB_CODE_SUCCESS && pCmd->vnodeIdx < pDataBlocks->nSize); + } while (code != TSDB_CODE_SUCCESS && pMeterMetaInfo->vnodeIndex < pDataBlocks->nSize); // build submit msg may fail if (code == TSDB_CODE_SUCCESS) { - tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pCmd->vnodeIdx - 1, pDataBlocks->nSize); + tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize); tscProcessSql(pSql); } } @@ -484,11 +489,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { // check if it is a sub-query of metric query first, if true, enter another routine if ((pSql->cmd.type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); - assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pCmd->vnodeIdx >= 0 && pSql->param != NULL); + assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL); SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SSqlObj * pParObj = trs->pParentSqlObj; - assert(pParObj->signature == pParObj && trs->vnodeIdx == pCmd->vnodeIdx && + assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex && pMeterMetaInfo->pMeterMeta->numOfTags != 0); tscTrace("%p get metricMeta during metric query successfully", pSql); diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index ed44d54066..b470d84440 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -150,7 +150,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor tsBufDestory(pSupporter1->pTSBuf); tsBufDestory(pSupporter2->pTSBuf); - tscTrace("%p input1:%lld, input2:%lld, %lld for secondary query after ts blocks intersecting", + tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", pSql, numOfInput1, numOfInput2, output1->numOfTotal); return output1->numOfTotal; @@ -239,15 +239,20 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { pSupporter = pSql->pSubs[i]->param; pSupporter->pState->numOfCompleted = 0; + /* + * If the columns are not involved in the final select clause, the secondary query will not be launched + * for the subquery. + */ if (pSupporter->exprsInfo.numOfExprs > 0) { ++numOfSub; } } // scan all subquery, if one sub query has only ts, ignore it - int32_t j = 0; - tscTrace("%p start to launch secondary subqueries: %d", pSql, pSql->numOfSubs); + tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " + "select clause", pSql, pSql->numOfSubs, numOfSub); + int32_t j = 0; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; pSupporter = pSub->param; @@ -259,15 +264,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { continue; } - SSqlObj* pNew = createSubqueryObj(pSql, 0, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); + SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); if (pNew == NULL) { pSql->numOfSubs = i; //revise the number of subquery pSupporter->pState->numOfTotal = i; pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscDestroyJoinSupporter(pSupporter); - - return NULL; + return 0; } tscFreeSqlCmdData(&pNew->cmd); @@ -386,8 +390,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { if (numOfRows > 0) { // write the data into disk fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); - fflush(pSupporter->f); - + fclose(pSupporter->f); + STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); if (pBuf == NULL) { tscError("%p invalid ts comp file from vnode, abort sub query, file size:%d", pSql, numOfRows); @@ -401,7 +405,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path); pSupporter->pTSBuf = pBuf; } else { - tsBufMerge(pSupporter->pTSBuf, pBuf, pSql->cmd.vnodeIdx); + assert(pSql->cmd.numOfTables == 1); // for subquery, only one metermetaInfo + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + + tsBufMerge(pSupporter->pTSBuf, pBuf, pMeterMetaInfo->vnodeIndex); tsBufDestory(pBuf); } @@ -412,6 +419,20 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { taos_fetch_rows_a(tres, joinRetrieveCallback, param); } else if (numOfRows == 0) { // no data from this vnode anymore + if (tscProjectionQueryOnMetric(&pParentSql->cmd)) { + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + assert(pSql->cmd.numOfTables == 1); + + // for projection query, need to try next vnode + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { @@ -466,6 +487,8 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { void tscFetchDatablockFromSubquery(SSqlObj* pSql) { int32_t numOfFetch = 0; + assert(pSql->numOfSubs >= 1); + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param; @@ -731,7 +754,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { strncpy(pTSBuf->path, path, PATH_MAX); - pTSBuf->f = fopen(pTSBuf->path, "r"); + pTSBuf->f = fopen(pTSBuf->path, "r+"); if (pTSBuf->f == NULL) { return NULL; } @@ -797,6 +820,10 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { pTSBuf->cur.order = TSQL_SO_ASC; pTSBuf->autoDelete = autoDelete; + + tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), + pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); + return pTSBuf; } @@ -814,10 +841,21 @@ void tsBufDestory(STSBuf* pTSBuf) { fclose(pTSBuf->f); if (pTSBuf->autoDelete) { + tscTrace("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); unlink(pTSBuf->path); + } else { + tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); } free(pTSBuf); + +} + +static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { + int32_t last = pTSBuf->numOfVnodes - 1; + + assert(last >= 0); + return &pTSBuf->pData[last]; } static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { @@ -836,10 +874,10 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { } if (pTSBuf->numOfVnodes > 0) { - STSVnodeBlockInfo* pPrevBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1].info; + STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); // update prev vnode length info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pPrevBlockInfo); + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info); } // set initial value for vnode block @@ -855,11 +893,11 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { pTSBuf->numOfVnodes += 1; // update the header info - STSBufFileHeader header = { - .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; + STSBufFileHeader header = + {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; + STSBufUpdateHeader(pTSBuf, &header); - - return &pTSBuf->pData[pTSBuf->numOfVnodes - 1]; + return tsBufGetLastVnodeInfo(pTSBuf); } static void shrinkBuffer(STSList* ptsData) { @@ -905,9 +943,11 @@ static void writeDataToDisk(STSBuf* pTSBuf) { pTSBuf->fileSize += blockSize; pTSBuf->tsData.len = 0; - - pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.compLen += blockSize; - pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.numOfBlocks += 1; + + STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); + + pVnodeBlockInfoEx->info.compLen += blockSize; + pVnodeBlockInfoEx->info.numOfBlocks += 1; shrinkBuffer(&pTSBuf->tsData); } @@ -1008,13 +1048,13 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData STSVnodeBlockInfoEx* pBlockInfo = NULL; STSList* ptsData = &pTSBuf->tsData; - if (pTSBuf->numOfVnodes == 0 || pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.vnode != vnodeId) { + if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId); } else { - pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1]; + pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf); } assert(pBlockInfo->info.vnode == vnodeId); @@ -1037,6 +1077,8 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData pTSBuf->numOfTotal += len / TSDB_KEYSIZE; + // the size of raw data exceeds the size of the default prepared buffer, so + // during getBufBlock, the output buffer needs to be large enough. if (ptsData->len >= ptsData->threshold) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); @@ -1053,10 +1095,10 @@ void tsBufFlush(STSBuf* pTSBuf) { writeDataToDisk(pTSBuf); shrinkBuffer(&pTSBuf->tsData); - STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1].info; + STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); // update prev vnode length info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info); // save the ts order into header STSBufFileHeader header = { @@ -1157,11 +1199,22 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex } STSBlock* pBlock = &pTSBuf->block; + + size_t s = pBlock->numOfElem * TSDB_KEYSIZE; + + /* + * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value + * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function + */ + if (s > pTSBuf->tsData.allocSize) { + expandBuffer(&pTSBuf->tsData, s); + } + pTSBuf->tsData.len = tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - assert(pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem); + assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); pCur->vnodeIndex = vnodeIndex; pCur->blockIndex = blockIndex; @@ -1293,6 +1346,8 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) { return elem1; } + + /** * current only support ts comp data from two vnode merge * @param pDestBuf @@ -1318,7 +1373,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { tsBufFlush(pDestBuf); // compared with the last vnode id - if (vnodeId != pDestBuf->pData[pDestBuf->numOfVnodes - 1].info.vnode) { + if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { int32_t oldSize = pDestBuf->numOfVnodes; int32_t newSize = oldSize + pSrcBuf->numOfVnodes; @@ -1345,36 +1400,49 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { pDestBuf->numOfVnodes = newSize; } else { - STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[pDestBuf->numOfVnodes - 1]; + STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); + pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; pBlockInfoEx->info.vnode = vnodeId; } - int64_t r = fseek(pDestBuf->f, 0, SEEK_END); + int32_t r = fseek(pDestBuf->f, 0, SEEK_END); assert(r == 0); int64_t offset = getDataStartOffset(); int32_t size = pSrcBuf->fileSize - offset; #ifdef LINUX - ssize_t rc = sendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size); + ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size); #else ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size); #endif + if (rc == -1) { - printf("%s\n", strerror(errno)); + tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); return -1; } if (rc != size) { - printf("%s\n", strerror(errno)); + tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); return -1; } pDestBuf->numOfTotal += pSrcBuf->numOfTotal; - + + int32_t oldSize = pDestBuf->fileSize; + + struct stat fileStat; + fstat(fileno(pDestBuf->f), &fileStat); + pDestBuf->fileSize = (uint32_t) fileStat.st_size; + + assert(pDestBuf->fileSize == oldSize + size); + + tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, vnode:%d, autoDelete:%d", pDestBuf, pDestBuf->path, + fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); + return 0; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 58cfcda17e..572b65f364 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -498,10 +498,11 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe *str += index; if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) { int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize); - if (0 == tSize) { + if (0 == tSize) { //TODO pass the correct error code to client strcpy(error, "client out of memory"); return -1; } + maxRows += tSize; } @@ -1060,8 +1061,10 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + // set the next sent data vnode index in data block arraylist - pCmd->vnodeIdx = 1; + pMeterMetaInfo->vnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } @@ -1279,19 +1282,19 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) { int32_t code = TSDB_CODE_SUCCESS; /* the first block has been sent to server in processSQL function */ - assert(pCmd->isInsertFromFile != -1 && pCmd->vnodeIdx >= 1 && pCmd->pDataBlocks != NULL); + assert(pCmd->isInsertFromFile != -1 && pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL); - if (pCmd->vnodeIdx < pCmd->pDataBlocks->nSize) { + if (pMeterMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) { SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - for (int32_t i = pCmd->vnodeIdx; i < pDataBlocks->nSize; ++i) { + for (int32_t i = pMeterMetaInfo->vnodeIndex; i < pDataBlocks->nSize; ++i) { pDataBlock = pDataBlocks->pData[i]; if (pDataBlock == NULL) { continue; } if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { - tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pCmd->vnodeIdx, pDataBlocks->nSize); + tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, pDataBlocks->nSize); continue; } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 532baec205..7e62afefe6 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -409,7 +409,9 @@ static int insertStmtReset(STscStmt* pStmt) { } } pCmd->batchSize = 0; - pCmd->vnodeIdx = 0; + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + pMeterMetaInfo->vnodeIndex = 0; return TSDB_CODE_SUCCESS; } @@ -422,6 +424,8 @@ static int insertStmtExecute(STscStmt* stmt) { ++pCmd->batchSize; } + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgid int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); @@ -436,7 +440,7 @@ static int insertStmtExecute(STscStmt* stmt) { } // set the next sent data vnode index in data block arraylist - pCmd->vnodeIdx = 1; + pMeterMetaInfo->vnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bf175c5540..81ef5d13e6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -222,7 +222,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { // multiple vnode query - SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex); if (vnodeList != NULL) { pVPeersDesc = vnodeList->vpeerDesc; } @@ -528,7 +528,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { if (pMeterMetaInfo->pMeterMeta) // it may be deleted pMeterMetaInfo->pMeterMeta->index = pSql->index; } else { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pSql->cmd.vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex); pVnodeSidList->index = pSql->index; } } else { @@ -639,7 +639,7 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu static int tscLaunchMetricSubQueries(SSqlObj *pSql); // todo merge with callback -int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeIdx, SJoinSubquerySupporter *pSupporter) { +int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) { SSqlCmd *pCmd = &pSql->cmd; pSql->res.qhandle = 0x1; @@ -652,12 +652,13 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId } } - SSqlObj *pNew = createSubqueryObj(pSql, vnodeIdx, tableIndex, tscJoinQueryCallback, pSupporter, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL); if (pNew == NULL) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - + pSql->pSubs[pSql->numOfSubs++] = pNew; + assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); if (QUERY_IS_JOIN_QUERY(pCmd->type)) { addGroupInfoForSubquery(pSql, pNew, tableIndex); @@ -774,7 +775,7 @@ int tscProcessSql(SSqlObj *pSql) { pSql->index = pMeterMetaInfo->pMeterMeta->index; } else { // it must be the parent SSqlObj for super table query if ((pSql->cmd.type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { - int32_t idx = pSql->cmd.vnodeIdx; + int32_t idx = pMeterMetaInfo->vnodeIndex; SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); pSql->index = pSidList->index; } @@ -802,7 +803,7 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } - int32_t code = tscLaunchJoinSubquery(pSql, i, 0, pSupporter); + int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query tscDestroyJoinSupporter(pSupporter); pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -944,7 +945,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { trs->pOrderDescriptor = pDesc; trs->pState = pState; trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); - trs->vnodeIdx = i; + trs->subqueryIndex = i; trs->pParentSqlObj = pSql; trs->pFinalColModel = pModel; @@ -971,7 +972,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { pNew->cmd.tsBuf = tsBufClone(pSql->cmd.tsBuf); } - tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, pNew->cmd.vnodeIdx); + tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, trs->subqueryIndex); tscProcessSql(pNew); } @@ -1020,7 +1021,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { SSqlObj *pPObj = trsupport->pParentSqlObj; - int32_t idx = trsupport->vnodeIdx; + int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); @@ -1035,27 +1036,27 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq pSql->res.numOfRows = 0; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql, - trsupport->vnodeIdx, trsupport->pState->code); + subqueryIndex, trsupport->pState->code); } if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. - tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, idx); - tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, idx, - trsupport->pState->code); + tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex); + tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, + subqueryIndex, trsupport->pState->code); } else { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && trsupport->pState->code == TSDB_CODE_SUCCESS) { /* * current query failed, and the retry count is less than the available * count, retry query clear previous retrieved data, then launch a new sub query */ - tExtMemBufferClear(trsupport->pExtMemBuffer[idx]); + tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); // clear local saved number of results trsupport->localBuffer->numOfElems = 0; pthread_mutex_unlock(&trsupport->queryMutex); tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows, - idx, trsupport->numOfRetry); + subqueryIndex, trsupport->numOfRetry); SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); if (pNew == NULL) { @@ -1072,7 +1073,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq } else { // reach the maximum retry count, abort atomic_val_compare_exchange_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows); tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql, - numOfRows, idx, trsupport->pState->code); + numOfRows, subqueryIndex, trsupport->pState->code); } } @@ -1115,13 +1116,12 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { SRetrieveSupport *trsupport = (SRetrieveSupport *)param; - int32_t idx = trsupport->vnodeIdx; + int32_t idx = trsupport->subqueryIndex; SSqlObj * pPObj = trsupport->pParentSqlObj; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; SSqlObj *pSql = (SSqlObj *)tres; - if (pSql == NULL) { - /* sql object has been released in error process, return immediately */ + if (pSql == NULL) { // sql object has been released in error process, return immediately tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx); return; } @@ -1172,7 +1172,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { } else { // all data has been retrieved to client /* data in from current vnode is stored in cache and disk */ uint32_t numOfRowsFromVnode = - trsupport->pExtMemBuffer[pCmd->vnodeIdx]->numOfAllElems + trsupport->localBuffer->numOfElems; + trsupport->pExtMemBuffer[idx]->numOfAllElems + trsupport->localBuffer->numOfElems; tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, pSvd->vnode, numOfRowsFromVnode, idx); @@ -1285,10 +1285,10 @@ void tscKillMetricQuery(SSqlObj *pSql) { static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { - SSqlObj *pNew = createSubqueryObj(pSql, trsupport->vnodeIdx, 0, tscRetrieveDataRes, trsupport, prevSqlObj); + SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - pSql->pSubs[trsupport->vnodeIdx] = pNew; + pSql->pSubs[trsupport->subqueryIndex] = pNew; } return pNew; @@ -1298,8 +1298,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SRetrieveSupport *trsupport = (SRetrieveSupport *)param; SSqlObj * pSql = (SSqlObj *)tres; - int32_t idx = pSql->cmd.vnodeIdx; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + int32_t idx = pMeterMetaInfo->vnodeIndex; SVnodeSidList *vnodeInfo = NULL; SVPeerDesc * pSvd = NULL; @@ -1317,7 +1317,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { code = trsupport->pState->code; } tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", trsupport->pParentSqlObj, pSql, - trsupport->vnodeIdx, code); + trsupport->subqueryIndex, code); } /* @@ -1337,7 +1337,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->vnodeIdx); + trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex); trsupport->pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -1353,17 +1353,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (vnodeInfo != NULL) { tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, - trsupport->vnodeIdx, trsupport->pState->code); + trsupport->subqueryIndex, trsupport->pState->code); } else { tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql, - trsupport->vnodeIdx, trsupport->pState->code); + trsupport->subqueryIndex, trsupport->pState->code); } tscRetrieveFromVnodeCallBack(param, tres, trsupport->pState->code); } else { // success, proceed to retrieve data from dnode tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, - trsupport->vnodeIdx); + trsupport->subqueryIndex); taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); } @@ -1438,7 +1438,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); } else { // query on metric SMetricMeta * pMetricMeta = pMeterMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode); } } @@ -1461,7 +1461,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) { SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids; int32_t outputColumnSize = pCmd->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg); @@ -1506,12 +1506,12 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->numOfTagsCols = 0; } else { // query on metric SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; - if (pCmd->vnodeIdx < 0) { - tscError("%p error vnodeIdx:%d", pSql, pCmd->vnodeIdx); + if (pMeterMetaInfo->vnodeIndex < 0) { + tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex); return -1; } - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode; numOfMeters = pVnodeSidList->numOfSids; @@ -1693,7 +1693,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->colNameLen = htonl(len); // set sids list - tscTrace("%p vid:%d, query on %d meters", pSql, pSql->cmd.vnodeIdx, numOfMeters); + tscTrace("%p vid:%d, query on %d meters", pSql, htons(pQueryMsg->vnode), numOfMeters); if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { #ifdef _DEBUG_VIEW @@ -1703,7 +1703,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pSMeterTagInfo->sid = htonl(pMeterMeta->sid); pMsg += sizeof(SMeterSidExtInfo); } else { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); for (int32_t i = 0; i < numOfMeters; ++i) { SMeterSidExtInfo *pMeterTagInfo = (SMeterSidExtInfo *)pMsg; @@ -1774,7 +1774,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) { int32_t numOfBlocks = 0; if (pCmd->tsBuf != NULL) { - STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pCmd->tsBuf, pCmd->vnodeIdx); + STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pCmd->tsBuf, pMeterMetaInfo->vnodeIndex); assert(QUERY_IS_JOIN_QUERY(pCmd->type) && pBlockInfo != NULL); // this query should not be sent // todo refactor diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 4d7f2734a9..fe097b15d9 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -609,7 +609,15 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - if ((++pCmd->vnodeIdx) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + /* + * For project query with super table join, the numOfSub is equalled to the number of all subqueries, so + * we need to reset the value of numOfSubs to be 0. + * + * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. + */ + pSql->numOfSubs = 0; + + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { pCmd->command = TSDB_SQL_SELECT; assert(pSql->fp == NULL); tscProcessSql(pSql); @@ -617,7 +625,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } // check!!! - if (rows != NULL || pCmd->vnodeIdx >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { + if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { break; } } @@ -654,7 +662,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { pCmd->limit.offset = pRes->offset; - if ((++pSql->cmd.vnodeIdx) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { pSql->cmd.command = TSDB_SQL_SELECT; assert(pSql->fp == NULL); tscProcessSql(pSql); @@ -662,7 +670,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { } // check!!! - if (*rows != NULL || pCmd->vnodeIdx >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { + if (*rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { break; } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7fd3d7706b..5ca55a486f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1474,7 +1474,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { * data blocks have been submit to vnode. */ SDataBlockList* pDataBlocks = pCmd->pDataBlocks; - if (pDataBlocks == NULL || pCmd->vnodeIdx >= pDataBlocks->nSize) { + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + assert(pSql->cmd.numOfTables == 1); + + if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { tscTrace("%p object should be release since all data blocks have been submit", pSql); return true; } else { @@ -1487,10 +1491,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index) { - if (pCmd == NULL || index >= pCmd->numOfTables || index < 0) { + if (pCmd == NULL || pCmd->numOfTables == 0) { return NULL; } + assert(index >= 0 && index <= pCmd->numOfTables && pCmd->pMeterInfo != NULL); return pCmd->pMeterInfo[index]; } @@ -1587,13 +1592,13 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { pRes->numOfRows = 0; } -SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param, - SSqlObj* pPrevSql) { +SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) { SSqlCmd* pCmd = &pSql->cmd; + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); if (pNew == NULL) { - tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex); + tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); return NULL; } @@ -1602,7 +1607,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex); + tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); free(pNew); return NULL; @@ -1627,15 +1632,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex tscTagCondCopy(&pNew->cmd.tagCond, &pCmd->tagCond); if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { - tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex); + tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); tscFreeSqlObj(pNew); return NULL; } tscColumnBaseInfoCopy(&pNew->cmd.colList, &pCmd->colList, (int16_t)tableIndex); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); - + // set the correct query type if (pPrevSql != NULL) { pNew->cmd.type = pPrevSql->cmd.type; @@ -1666,7 +1669,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex pNew->fp = fp; pNew->param = param; - pNew->cmd.vnodeIdx = vnodeIndex; SMeterMetaInfo* pMetermetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); char key[TSDB_MAX_TAGS_LEN + 1] = {0}; @@ -1695,8 +1697,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex assert(pFinalInfo->pMetricMeta != NULL); } - tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex, - pNew->cmd.type); + tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d", pSql, pNew, tableIndex, + pMeterMetaInfo->vnodeIndex, pNew->cmd.type); return pNew; } -- GitLab