diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index 4c95994dfab1ea1c5a259ed7f13ddd91b91c6e1e..0c6472f6b367857edbdc92a08e0bc8a263572ee1 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -94,7 +94,7 @@ typedef struct SRetrieveSupport { tOrderDescriptor *pOrderDescriptor; tColModel * pFinalColModel; // colModel for final result SSubqueryState * pState; - int32_t vnodeIdx; // index of current vnode in vnode list + int32_t subqueryIndex; // index of current vnode in vnode list SSqlObj * pParentSqlObj; tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to uint32_t numOfRetry; // record the number of retry times diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index da4ebc4e9cadbeda7755ad35d57889035ffcd0b2..3deb4c463fc77e9b7dd8613514d954df96fd2337 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -23,14 +23,14 @@ extern "C" { /* * @date 2018/09/30 */ -#include -#include +#include "os.h" #include "textbuffer.h" +#include "tscSecondaryMerge.h" #include "tsclient.h" #include "tsdb.h" -#include "tscSecondaryMerge.h" -#define UTIL_METER_IS_METRIC(metaInfo) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC)) +#define UTIL_METER_IS_METRIC(metaInfo) \ + (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC)) #define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo))) #define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \ (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE)) @@ -52,7 +52,6 @@ typedef struct SParsedDataColInfo { typedef struct SJoinSubquerySupporter { SSubqueryState* pState; SSqlObj* pObj; // parent SqlObj - bool hasMore; // has data from vnode to fetch int32_t subqueryIndex; // index of sub query int64_t interval; // interval time SLimitVal limit; // limit info @@ -62,28 +61,27 @@ typedef struct SJoinSubquerySupporter { SFieldInfo fieldsInfo; STagCond tagCond; SSqlGroupbyExpr groupbyExpr; - - struct STSBuf* pTSBuf; - - FILE* f; - char path[PATH_MAX]; + struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array + FILE* f; // temporary file in order to create TSBuf + char path[PATH_MAX]; // temporary file path } SJoinSubquerySupporter; -void tscDestroyDataBlock(STableDataBlocks* pDataBlock); +void tscDestroyDataBlock(STableDataBlocks* pDataBlock); STableDataBlocks* tscCreateDataBlock(int32_t size); -void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); -SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset); - -SDataBlockList* tscCreateBlockArrayList(); -void* tscDestroyBlockArrayList(SDataBlockList* pList); -int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -void tscFreeUnusedDataBlocks(SDataBlockList* pList); -int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); +void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); +SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, + uint32_t offset); + +SDataBlockList* tscCreateBlockArrayList(); +void* tscDestroyBlockArrayList(SDataBlockList* pList); +int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); +void tscFreeUnusedDataBlocks(SDataBlockList* pList); +int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, char* tableId); STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name); -SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); +SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); /** @@ -97,6 +95,8 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); bool tscIsPointInterpQuery(SSqlCmd* pCmd); bool tscIsTWAQuery(SSqlCmd* pCmd); bool tscProjectionQueryOnMetric(SSqlCmd* pCmd); +bool tscProjectionQueryOnTable(SSqlCmd* pCmd); + bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd); bool tscQueryOnMetric(SSqlCmd* pCmd); bool tscQueryMetricTags(SSqlCmd* pCmd); @@ -108,7 +108,7 @@ void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex); int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex); -void tscClearInterpInfo(SSqlCmd* pCmd); +void tscClearInterpInfo(SSqlCmd* pCmd); bool tscIsInsertOrImportData(char* sqlstr); @@ -128,9 +128,9 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList void tscFieldInfoCopyAll(SFieldInfo* src, SFieldInfo* dst); TAOS_FIELD* tscFieldInfoGetField(SSqlCmd* pCmd, int32_t index); -int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index); -int32_t tscGetResRowLength(SSqlCmd* pCmd); -void tscClearFieldInfo(SFieldInfo* pFieldInfo); +int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index); +int32_t tscGetResRowLength(SSqlCmd* pCmd); +void tscClearFieldInfo(SFieldInfo* pFieldInfo); void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); @@ -142,15 +142,15 @@ SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int int16_t size); SSqlExpr* tscSqlExprGet(SSqlCmd* pCmd, int32_t index); -void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); +void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); SColumnBase* tscColumnBaseInfoInsert(SSqlCmd* pCmd, SColumnIndex* colIndex); -void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src); -void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src); +void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src); +void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src); -void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex); +void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex); SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index); -void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex); +void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex); void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size); void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo); @@ -163,11 +163,10 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId); // get starter position of metric query condition (query on tags) in SSqlCmd.payload SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex); -void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); +void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondRelease(STagCond* pCond); -void tscTagCondSetQueryCondType(STagCond* pCond, int16_t type); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SSqlCmd* pCmd); @@ -176,19 +175,19 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb); void tscCleanSqlCmd(SSqlCmd* pCmd); bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql); -void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache); +void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache); SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index); SMeterMetaInfo* tscGetMeterMetaInfoByUid(SSqlCmd* pCmd, uint64_t uid, int32_t* index); -void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); +void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta, int16_t numOfTags, int16_t* tags); SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SSqlCmd* pCmd); void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr, uint64_t uid); -int tscGetMetricMeta(SSqlObj* pSql); -int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex); -int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists); +int tscGetMetricMeta(SSqlObj* pSql); +int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex); +int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists); void tscResetForNextRetrieve(SSqlRes* pRes); @@ -212,9 +211,8 @@ void tscDoQuery(SSqlObj* pSql); * @param pPrevSql * @return */ -SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param, - SSqlObj* pPrevSql); -void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex); +SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql); +void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex); void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex); @@ -224,6 +222,9 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void* param, void** taos); void sortRemoveDuplicates(STableDataBlocks* dataBuf); + +void tscPrintSelectClause(SSqlCmd* pCmd); + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index d439ba9929ac7c58eb08d946cc7289fd1d8dbee7..b36d2362dacd0a08a2adb5169c162ae4039daf9b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -107,22 +107,25 @@ enum _sql_cmd { struct SSqlInfo; typedef struct SSqlGroupbyExpr { - int16_t tableIndex; - + int16_t tableIndex; int16_t numOfGroupCols; SColIndexEx columnInfo[TSDB_MAX_TAGS]; // group by columns information - - int16_t orderIndex; // order by column index - int16_t orderType; // order by type: asc/desc + int16_t orderIndex; // order by column index + int16_t orderType; // order by type: asc/desc } SSqlGroupbyExpr; typedef struct SMeterMetaInfo { - SMeterMeta * pMeterMeta; // metermeta - SMetricMeta *pMetricMeta; // metricmeta - - char name[TSDB_METER_ID_LEN + 1]; - int16_t numOfTags; // total required tags in query, including groupby tags - int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection + SMeterMeta * pMeterMeta; // metermeta + SMetricMeta *pMetricMeta; // metricmeta + + /* + * 1. keep the vnode index during the multi-vnode super table projection query + * 2. keep the vnode index for multi-vnode insertion + */ + int32_t vnodeIndex; + char name[TSDB_METER_ID_LEN + 1]; // table(super table) name + int16_t numOfTags; // total required tags in query, including groupby tags + int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection } SMeterMetaInfo; /* the structure for sql function in select clause */ @@ -188,7 +191,7 @@ typedef struct SString { typedef struct SCond { uint64_t uid; - char* cond; + char * cond; } SCond; typedef struct SJoinNode { @@ -262,15 +265,15 @@ typedef struct SDataBlockList { typedef struct { SOrderVal order; int command; - int count;// TODO refactor + int count; // TODO refactor union { - bool existsCheck; // check if the table exists - int8_t showType; // show command type + bool existsCheck; // check if the table exists + int8_t showType; // show command type }; - + int8_t isInsertFromFile; // load data from file or not - bool import; // import/insert type + bool import; // import/insert type char msgType; uint16_t type; // query type char intervalTimeUnit; @@ -296,7 +299,6 @@ typedef struct { SLimitVal slimit; int64_t globalLimit; STagCond tagCond; - int16_t vnodeIdx; // vnode index in pMetricMeta for metric query int16_t interpoType; // interpolate type int16_t numOfTables; @@ -366,25 +368,23 @@ typedef struct _sql_obj { STscObj *pTscObj; void (*fp)(); void (*fetchFp)(); - void * param; - uint32_t ip; - short vnode; - int64_t stime; - uint32_t queryId; - void * thandle; - void * pStream; - char * sqlstr; - char retry; - char maxRetry; - char index; - char freed : 4; - char listed : 4; - tsem_t rspSem; - tsem_t emptyRspSem; - - SSqlCmd cmd; - SSqlRes res; - + void * param; + uint32_t ip; + short vnode; + int64_t stime; + uint32_t queryId; + void * thandle; + void * pStream; + char * sqlstr; + char retry; + char maxRetry; + char index; + char freed : 4; + char listed : 4; + tsem_t rspSem; + tsem_t emptyRspSem; + SSqlCmd cmd; + SSqlRes res; char numOfSubs; struct _sql_obj **pSubs; struct _sql_obj * prev, *next; @@ -477,6 +477,8 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); bool tscIsUpdateQuery(STscObj *pObj); +bool tscHasReachLimitation(SSqlObj* pSql); + int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); // transfer SSqlInfo to SqlCmd struct diff --git a/src/client/src/tscAst.c b/src/client/src/tscAst.c index 1e0fac4dd2b76f3049cb9e9e3c6db6ec8cfc051b..a06a07524828aede3378e3f5487d729b1e8326a8 100644 --- a/src/client/src/tscAst.c +++ b/src/client/src/tscAst.c @@ -112,7 +112,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, tSQLSyntaxNode *pNode = NULL; if (pToken->type == TK_ID || pToken->type == TK_TBNAME) { - int32_t i = 0; + int32_t i = 0; if (pToken->type == TK_ID) { do { size_t len = strlen(pSchema[i].name); @@ -652,8 +652,7 @@ void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, bool (*fp)(tSkipList // brutal force search int64_t num = pResult->num; for (int32_t i = 0, j = 0; i < pResult->num; ++i) { - //if (fp == NULL || (fp != NULL && fp(pResult->pRes[i], pExpr->info) == true)) { - if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) { + if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) { pResult->pRes[j++] = pResult->pRes[i]; } else { num--; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 61db9602cf0f75cc2fc2f59430b6fc59e66bda82..99b9b571d7ffe513e87206c5cd0c5d380318ca95 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -121,7 +121,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf // sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx if (numOfRows == 0 && tscProjectionQueryOnMetric(pCmd)) { // vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx - assert(pCmd->vnodeIdx >= 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + assert(pMeterMetaInfo->vnodeIndex >= 0); /* reach the maximum number of output rows, abort */ if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { @@ -133,8 +134,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal; pCmd->limit.offset = pRes->offset; - if ((++(pCmd->vnodeIdx)) < tscGetMeterMetaInfo(pCmd, 0)->pMetricMeta->numOfVnodes) { - tscTrace("%p retrieve data from next vnode:%d", pSql, pCmd->vnodeIdx); + if ((++(pMeterMetaInfo->vnodeIndex)) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + tscTrace("%p retrieve data from next vnode:%d", pSql, pMeterMetaInfo->vnodeIndex); pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first. @@ -271,7 +272,8 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { /* * vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved */ - assert(pCmd->vnodeIdx >= 1); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + assert(pMeterMetaInfo->vnodeIndex >= 0); /* reach the maximum number of output rows, abort */ if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { @@ -282,7 +284,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { /* update the limit value according to current retrieval results */ pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal; - if ((++pCmd->vnodeIdx) <= tscGetMeterMetaInfo(pCmd, 0)->pMetricMeta->numOfVnodes) { + if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) { pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first. tscResetForNextRetrieve(pRes); @@ -403,9 +405,12 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) int32_t code = TSDB_CODE_SUCCESS; assert(!pCmd->isInsertFromFile && pSql->signature == pSql); - + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + assert(pCmd->numOfTables == 1); + SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - if (pDataBlocks == NULL || pCmd->vnodeIdx >= pDataBlocks->nSize) { + if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { // restore user defined fp pSql->fp = pSql->fetchFp; tscTrace("%p Async insertion completed, destroy data block list", pSql); @@ -417,17 +422,17 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) (*pSql->fp)(pSql->param, tres, numOfRows); } else { do { - code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pCmd->vnodeIdx++]); + code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pMeterMetaInfo->vnodeIndex++]); if (code != TSDB_CODE_SUCCESS) { tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", - pSql, pCmd->vnodeIdx - 1, pDataBlocks->nSize, code); + pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize, code); } - } while (code != TSDB_CODE_SUCCESS && pCmd->vnodeIdx < pDataBlocks->nSize); + } while (code != TSDB_CODE_SUCCESS && pMeterMetaInfo->vnodeIndex < pDataBlocks->nSize); // build submit msg may fail if (code == TSDB_CODE_SUCCESS) { - tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pCmd->vnodeIdx - 1, pDataBlocks->nSize); + tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize); tscProcessSql(pSql); } } @@ -483,11 +488,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { // check if it is a sub-query of metric query first, if true, enter another routine if ((pSql->cmd.type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); - assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pCmd->vnodeIdx >= 0 && pSql->param != NULL); + assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL); SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SSqlObj * pParObj = trs->pParentSqlObj; - assert(pParObj->signature == pParObj && trs->vnodeIdx == pCmd->vnodeIdx && + assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex && pMeterMetaInfo->pMeterMeta->numOfTags != 0); tscTrace("%p get metricMeta during metric query successfully", pSql); diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index ed44d540667284e6af66e184152d0b2fe0a27dc3..1e7355d1b174a722be93eda3b8e0a926520709a4 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ +#include "tscJoinProcess.h" #include "os.h" #include "tcache.h" -#include "tscJoinProcess.h" #include "tscUtil.h" #include "tsclient.h" #include "tscompression.h" @@ -45,8 +45,8 @@ static bool doCompare(int32_t order, int64_t left, int64_t right) { } } -static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, SJoinSubquerySupporter* pSupporter2, - TSKEY* st, TSKEY* et) { +static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, + SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) { STSBuf* output1 = tsBufCreate(true); STSBuf* output2 = tsBufCreate(true); @@ -150,22 +150,21 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor tsBufDestory(pSupporter1->pTSBuf); tsBufDestory(pSupporter2->pTSBuf); - tscTrace("%p input1:%lld, input2:%lld, %lld for secondary query after ts blocks intersecting", - pSql, numOfInput1, numOfInput2, output1->numOfTotal); + tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", pSql, + numOfInput1, numOfInput2, output1->numOfTotal); return output1->numOfTotal; } -//todo handle failed to create sub query -SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, /*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) { +// todo handle failed to create sub query +SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, + /*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) { SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); if (pSupporter == NULL) { return NULL; } pSupporter->pObj = pSql; - pSupporter->hasMore = true; - pSupporter->pState = pState; pSupporter->subqueryIndex = index; @@ -226,12 +225,6 @@ bool needSecondaryQuery(SSqlObj* pSql) { * launch secondary stage query to fetch the result that contains timestamp in set */ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { - // TODO not launch secondary stage query - // if (!needSecondaryQuery(pSql)) { - // return; - // } - - // sub query may not be necessary int32_t numOfSub = 0; SJoinSubquerySupporter* pSupporter = NULL; @@ -239,15 +232,22 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { pSupporter = pSql->pSubs[i]->param; pSupporter->pState->numOfCompleted = 0; + /* + * If the columns are not involved in the final select clause, the secondary query will not be launched + * for the subquery. + */ if (pSupporter->exprsInfo.numOfExprs > 0) { ++numOfSub; } } // scan all subquery, if one sub query has only ts, ignore it - int32_t j = 0; - tscTrace("%p start to launch secondary subqueries: %d", pSql, pSql->numOfSubs); + tscTrace( + "%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " + "select clause", + pSql, pSql->numOfSubs, numOfSub); + int32_t j = 0; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; pSupporter = pSub->param; @@ -259,15 +259,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { continue; } - SSqlObj* pNew = createSubqueryObj(pSql, 0, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); + SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); if (pNew == NULL) { - pSql->numOfSubs = i; //revise the number of subquery + pSql->numOfSubs = i; // revise the number of subquery pSupporter->pState->numOfTotal = i; pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscDestroyJoinSupporter(pSupporter); - - return NULL; + return 0; } tscFreeSqlCmdData(&pNew->cmd); @@ -282,7 +281,6 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { pNew->cmd.type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; pNew->cmd.nAggTimeInterval = pSupporter->interval; - pNew->cmd.limit = pSupporter->limit; pNew->cmd.groupbyExpr = pSupporter->groupbyExpr; tscColumnBaseInfoCopy(&pNew->cmd.colList, &pSupporter->colList, 0); @@ -302,6 +300,13 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); + /* + * When handling the projection query, the offset value will be modified for table-table join, which is changed + * during the timestamp intersection. + */ + pSupporter->limit = pSql->cmd.limit; + pNew->cmd.limit = pSupporter->limit; + // fetch the join tag column if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0); @@ -310,10 +315,12 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd.tagCond, pMeterMetaInfo->pMeterMeta->uid); pExpr->param[0].i64Key = tagColIndex; pExpr->numOfParams = 1; - - addRequiredTagColumn(&pNew->cmd, tagColIndex, 0); } +#ifdef _DEBUG_VIEW + tscPrintSelectClause(&pNew->cmd); +#endif + tscProcessSql(pNew); } @@ -384,9 +391,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { return; } - if (numOfRows > 0) { // write the data into disk + if (numOfRows > 0) { // write the data into disk fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f); - fflush(pSupporter->f); + fclose(pSupporter->f); STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true); if (pBuf == NULL) { @@ -401,7 +408,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path); pSupporter->pTSBuf = pBuf; } else { - tsBufMerge(pSupporter->pTSBuf, pBuf, pSql->cmd.vnodeIdx); + assert(pSql->cmd.numOfTables == 1); // for subquery, only one metermetaInfo + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + + tsBufMerge(pSupporter->pTSBuf, pBuf, pMeterMetaInfo->vnodeIndex); tsBufDestory(pBuf); } @@ -411,12 +421,25 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { pSql->res.row = pSql->res.numOfRows; taos_fetch_rows_a(tres, joinRetrieveCallback, param); - } else if (numOfRows == 0) { // no data from this vnode anymore - if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + } else if (numOfRows == 0) { // no data from this vnode anymore + if (tscProjectionQueryOnMetric(&pParentSql->cmd)) { + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + assert(pSql->cmd.numOfTables == 1); + + // for projection query, need to try next vnode + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, - pSupporter->subqueryIndex); + pSupporter->subqueryIndex); doQuitSubquery(pParentSql); return; } @@ -451,8 +474,33 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex); } + if (numOfRows >= 0) { + pSql->res.numOfTotal += pSql->res.numOfRows; + } + + if (tscProjectionQueryOnMetric(&pSql->cmd) && numOfRows == 0) { + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + assert(pSql->cmd.numOfTables == 1); + + // for projection query, need to try next vnode if current vnode is exhausted + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + pSupporter->pState->numOfCompleted = 0; + pSupporter->pState->numOfTotal = 1; + + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->fp = tscJoinQueryCallback; + tscProcessSql(pSql); + + return; + } + } + if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { - tscTrace("%p secondary retrieve completed, global code:%d", tres, pParentSql->res.code); + assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal); + + tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal, + pParentSql->res.code); + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { pParentSql->res.code = abs(pSupporter->pState->code); freeSubqueryObj(pParentSql); @@ -466,50 +514,69 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { void tscFetchDatablockFromSubquery(SSqlObj* pSql) { int32_t numOfFetch = 0; - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param; + assert(pSql->numOfSubs >= 1); - SSqlRes* pRes = &pSql->pSubs[i]->res; - if (pRes->row >= pRes->numOfRows && pSupporter->hasMore) { - numOfFetch++; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlRes *pRes = &pSql->pSubs[i]->res; + SSqlCmd *pCmd = &pSql->pSubs[i]->cmd; + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + + if (tscProjectionQueryOnMetric(pCmd)) { + if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes && + (!tscHasReachLimitation(pSql->pSubs[i]))) { + numOfFetch++; + } + } else { + if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && tscProjectionQueryOnTable(pSql)) + || (pRes->numOfRows == 0)) { + numOfFetch++; + } } } - if (numOfFetch > 0) { - tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); + if (numOfFetch <= 0) { + return ; + } - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param; - pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed - pSupporter->pState->numOfCompleted = 0; + // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled + tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj* pSql1 = pSql->pSubs[i]; + SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param; + pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed + pSupporter->pState->numOfCompleted = 0; - SSqlRes* pRes1 = &pSql1->res; - SSqlCmd* pCmd1 = &pSql1->cmd; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj* pSql1 = pSql->pSubs[i]; - pSupporter = (SJoinSubquerySupporter*)pSql1->param; + SSqlRes* pRes1 = &pSql1->res; + SSqlCmd* pCmd1 = &pSql1->cmd; - // wait for all subqueries completed - pSupporter->pState->numOfTotal = numOfFetch; - if (pRes1->row >= pRes1->numOfRows && pSupporter->hasMore) { - tscTrace("%p subquery:%p retrieve data from vnode, index:%d", pSql, pSql1, pSupporter->subqueryIndex); + pSupporter = (SJoinSubquerySupporter*)pSql1->param; - tscResetForNextRetrieve(pRes1); + // wait for all subqueries completed + pSupporter->pState->numOfTotal = numOfFetch; + assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1); - pSql1->fp = joinRetrieveCallback; + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0); + + if (pRes1->row >= pRes1->numOfRows) { + tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1, + pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex); - if (pCmd1->command < TSDB_SQL_LOCAL) { - pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - } + tscResetForNextRetrieve(pRes1); + pSql1->fp = joinRetrieveCallback; - tscProcessSql(pSql1); + if (pCmd1->command < TSDB_SQL_LOCAL) { + pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } - } - // wait for all subquery completed - tsem_wait(&pSql->rspSem); + tscProcessSql(pSql1); + } } + + // wait for all subquery completed + tsem_wait(&pSql->rspSem); } // all subqueries return, set the result output index @@ -519,6 +586,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { tscTrace("%p all subquery response, retrieve data", pSql); + if (pRes->pColumnIndex != NULL) { + return; // the column transfer support struct has been built + } + pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols); for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { @@ -608,20 +679,35 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { tscSetupOutputColumnIndex(pParentSql); - if (pParentSql->fp == NULL) { - tsem_wait(&pParentSql->emptyRspSem); - tsem_wait(&pParentSql->emptyRspSem); - - tsem_post(&pParentSql->rspSem); - } else { - // set the command flag must be after the semaphore been correctly set. - // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - // if (pPObj->res.code == TSDB_CODE_SUCCESS) { - // (*pPObj->fp)(pPObj->param, pPObj, 0); - // } else { - // tscQueueAsyncRes(pPObj); - // } - assert(0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + + /** + * if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of + * data instead of returning to its invoker + */ + if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd)) { + assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes); + pSupporter->pState->numOfCompleted = 0; // reset the record value + + pSql->fp = joinRetrieveCallback; // continue retrieve data + pSql->cmd.command = TSDB_SQL_FETCH; + tscProcessSql(pSql); + } else { // first retrieve from vnode during the secondary stage sub-query + if (pParentSql->fp == NULL) { + tsem_wait(&pParentSql->emptyRspSem); + tsem_wait(&pParentSql->emptyRspSem); + + tsem_post(&pParentSql->rspSem); + } else { + // set the command flag must be after the semaphore been correctly set. + // pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC; + // if (pPObj->res.code == TSDB_CODE_SUCCESS) { + // (*pPObj->fp)(pPObj->param, pPObj, 0); + // } else { + // tscQueueAsyncRes(pPObj); + // } + assert(0); + } } } } @@ -731,7 +817,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { strncpy(pTSBuf->path, path, PATH_MAX); - pTSBuf->f = fopen(pTSBuf->path, "r"); + pTSBuf->f = fopen(pTSBuf->path, "r+"); if (pTSBuf->f == NULL) { return NULL; } @@ -774,7 +860,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes; STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); - int64_t pos = ftell(pTSBuf->f); + int64_t pos = ftell(pTSBuf->f); fread(buf, infoSize, 1, pTSBuf->f); // the length value for each vnode is not kept in file, so does not set the length value @@ -790,13 +876,17 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { struct stat fileStat; fstat(fileno(pTSBuf->f), &fileStat); - pTSBuf->fileSize = (uint32_t) fileStat.st_size; + pTSBuf->fileSize = (uint32_t)fileStat.st_size; tsBufResetPos(pTSBuf); // ascending by default pTSBuf->cur.order = TSQL_SO_ASC; pTSBuf->autoDelete = autoDelete; + + tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), + pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); + return pTSBuf; } @@ -814,12 +904,22 @@ void tsBufDestory(STSBuf* pTSBuf) { fclose(pTSBuf->f); if (pTSBuf->autoDelete) { + tscTrace("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); unlink(pTSBuf->path); + } else { + tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); } free(pTSBuf); } +static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { + int32_t last = pTSBuf->numOfVnodes - 1; + + assert(last >= 0); + return &pTSBuf->pData[last]; +} + static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) { uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); @@ -836,10 +936,10 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { } if (pTSBuf->numOfVnodes > 0) { - STSVnodeBlockInfo* pPrevBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1].info; + STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); // update prev vnode length info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pPrevBlockInfo); + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info); } // set initial value for vnode block @@ -857,9 +957,9 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { // update the header info STSBufFileHeader header = { .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; - STSBufUpdateHeader(pTSBuf, &header); - return &pTSBuf->pData[pTSBuf->numOfVnodes - 1]; + STSBufUpdateHeader(pTSBuf, &header); + return tsBufGetLastVnodeInfo(pTSBuf); } static void shrinkBuffer(STSList* ptsData) { @@ -906,8 +1006,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) { pTSBuf->tsData.len = 0; - pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.compLen += blockSize; - pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.numOfBlocks += 1; + STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); + + pVnodeBlockInfoEx->info.compLen += blockSize; + pVnodeBlockInfoEx->info.numOfBlocks += 1; shrinkBuffer(&pTSBuf->tsData); } @@ -1008,13 +1110,13 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData STSVnodeBlockInfoEx* pBlockInfo = NULL; STSList* ptsData = &pTSBuf->tsData; - if (pTSBuf->numOfVnodes == 0 || pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.vnode != vnodeId) { + if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId); } else { - pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1]; + pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf); } assert(pBlockInfo->info.vnode == vnodeId); @@ -1037,6 +1139,8 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData pTSBuf->numOfTotal += len / TSDB_KEYSIZE; + // the size of raw data exceeds the size of the default prepared buffer, so + // during getBufBlock, the output buffer needs to be large enough. if (ptsData->len >= ptsData->threshold) { writeDataToDisk(pTSBuf); shrinkBuffer(ptsData); @@ -1053,10 +1157,10 @@ void tsBufFlush(STSBuf* pTSBuf) { writeDataToDisk(pTSBuf); shrinkBuffer(&pTSBuf->tsData); - STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1].info; + STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); // update prev vnode length info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info); // save the ts order into header STSBufFileHeader header = { @@ -1157,11 +1261,22 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex } STSBlock* pBlock = &pTSBuf->block; + + size_t s = pBlock->numOfElem * TSDB_KEYSIZE; + + /* + * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value + * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function + */ + if (s > pTSBuf->tsData.allocSize) { + expandBuffer(&pTSBuf->tsData, s); + } + pTSBuf->tsData.len = tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - assert(pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem); + assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); pCur->vnodeIndex = vnodeIndex; pCur->blockIndex = blockIndex; @@ -1203,20 +1318,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) { if (pCur->vnodeIndex == -1) { if (pCur->order == TSQL_SO_ASC) { tsBufGetBlock(pTSBuf, 0, 0); - - if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return + + if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return tsBufResetPos(pTSBuf); return false; } else { return true; } - - } else { // get the last timestamp record in the last block of the last vnode + + } else { // get the last timestamp record in the last block of the last vnode assert(pTSBuf->numOfVnodes > 0); - + int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; pCur->vnodeIndex = vnodeIndex; - + int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); int32_t blockIndex = pBlockInfo->numOfBlocks - 1; @@ -1318,7 +1433,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { tsBufFlush(pDestBuf); // compared with the last vnode id - if (vnodeId != pDestBuf->pData[pDestBuf->numOfVnodes - 1].info.vnode) { + if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { int32_t oldSize = pDestBuf->numOfVnodes; int32_t newSize = oldSize + pSrcBuf->numOfVnodes; @@ -1345,36 +1460,49 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { pDestBuf->numOfVnodes = newSize; } else { - STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[pDestBuf->numOfVnodes - 1]; + STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); + pBlockInfoEx->len += pSrcBuf->pData[0].len; pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; pBlockInfoEx->info.vnode = vnodeId; } - int64_t r = fseek(pDestBuf->f, 0, SEEK_END); + int32_t r = fseek(pDestBuf->f, 0, SEEK_END); assert(r == 0); int64_t offset = getDataStartOffset(); int32_t size = pSrcBuf->fileSize - offset; #ifdef LINUX - ssize_t rc = sendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size); + ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size); #else ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size); #endif + if (rc == -1) { - printf("%s\n", strerror(errno)); + tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); return -1; } if (rc != size) { - printf("%s\n", strerror(errno)); + tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); return -1; } pDestBuf->numOfTotal += pSrcBuf->numOfTotal; + int32_t oldSize = pDestBuf->fileSize; + + struct stat fileStat; + fstat(fileno(pDestBuf->f), &fileStat); + pDestBuf->fileSize = (uint32_t)fileStat.st_size; + + assert(pDestBuf->fileSize == oldSize + size); + + tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, + pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); + return 0; } @@ -1391,7 +1519,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_ TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); - fwrite((void*) pData, 1, len, pTSBuf->f); + fwrite((void*)pData, 1, len, pTSBuf->f); pTSBuf->fileSize += len; pTSBuf->tsOrder = order; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 48423417e782a860c9c95d8e08e14b37b4a22a91..d4ab40b4c7bd049c92eb97612fcff089a7370232 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -519,11 +519,12 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe *str += index; if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) { int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize); - if (0 == tSize) { + if (0 == tSize) { //TODO pass the correct error code to client strcpy(error, "client out of memory"); *code = TSDB_CODE_CLI_OUT_OF_MEMORY; return -1; } + maxRows += tSize; } @@ -1091,8 +1092,10 @@ int doParserInsertSql(SSqlObj *pSql, char *str) { goto _error_clean; } + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + // set the next sent data vnode index in data block arraylist - pCmd->vnodeIdx = 1; + pMeterMetaInfo->vnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } @@ -1310,19 +1313,19 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) { int32_t code = TSDB_CODE_SUCCESS; /* the first block has been sent to server in processSQL function */ - assert(pCmd->isInsertFromFile != -1 && pCmd->vnodeIdx >= 1 && pCmd->pDataBlocks != NULL); + assert(pCmd->isInsertFromFile != -1 && pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL); - if (pCmd->vnodeIdx < pCmd->pDataBlocks->nSize) { + if (pMeterMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) { SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - for (int32_t i = pCmd->vnodeIdx; i < pDataBlocks->nSize; ++i) { + for (int32_t i = pMeterMetaInfo->vnodeIndex; i < pDataBlocks->nSize; ++i) { pDataBlock = pDataBlocks->pData[i]; if (pDataBlock == NULL) { continue; } if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { - tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pCmd->vnodeIdx, pDataBlocks->nSize); + tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, pDataBlocks->nSize); continue; } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 532baec20545b941433268d15323c9f29a5bb27f..7e62afefe6f4feb95d46e13e25c614ee8cd60629 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -409,7 +409,9 @@ static int insertStmtReset(STscStmt* pStmt) { } } pCmd->batchSize = 0; - pCmd->vnodeIdx = 0; + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + pMeterMetaInfo->vnodeIndex = 0; return TSDB_CODE_SUCCESS; } @@ -422,6 +424,8 @@ static int insertStmtExecute(STscStmt* stmt) { ++pCmd->batchSize; } + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); + if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgid int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); @@ -436,7 +440,7 @@ static int insertStmtExecute(STscStmt* stmt) { } // set the next sent data vnode index in data block arraylist - pCmd->vnodeIdx = 1; + pMeterMetaInfo->vnodeIndex = 1; } else { pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks); } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index d0aa290d3183dd549f3065863037e6d80db0d4ae..752c5d123f78155bd501d0c008560edca8da2a40 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1020,7 +1020,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { } setColumnOffsetValueInResultset(pCmd); - updateTagColumnIndex(pCmd, 0); + + for(int32_t i = 0; i < pCmd->numOfTables; ++i) { + updateTagColumnIndex(pCmd, i); + } break; } @@ -1796,12 +1799,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, tSQLExprItem* pItem) { } if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { - SColumnIndex index1 = {0, TSDB_TBNAME_COLUMN_INDEX}; SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN}; strcpy(colSchema.name, TSQL_TBNAME_L); pCmd->type = TSDB_QUERY_TYPE_STABLE_QUERY; - tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index1, &colSchema, true); + tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true); } else { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex); SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -2739,15 +2741,20 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) { void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); - // update tags column index for group by tags - for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) { - int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx; - - for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { - int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j]; - if (tagColIndex == index) { - pCmd->groupbyExpr.columnInfo[i].colIdx = j; - break; + /* + * update tags column index for group by tags + * group by columns belong to this table + */ + if (pCmd->groupbyExpr.numOfGroupCols > 0 && pCmd->groupbyExpr.tableIndex == tableIndex) { + for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) { + int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx; + + for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { + int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j]; + if (tagColIndex == index) { + pCmd->groupbyExpr.columnInfo[i].colIdx = j; + break; + } } } } @@ -2755,9 +2762,15 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) { // update tags column index for expression for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); + if (!TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { // not tags, continue continue; } + + // not belongs to this table + if (pExpr->uid != pMeterMetaInfo->pMeterMeta->uid) { + continue; + } for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) { if (pExpr->colInfo.colIdx == pMeterMetaInfo->tagColumnIndex[j]) { @@ -2766,6 +2779,32 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) { } } } + + // update join condition tag column index + SJoinInfo* pJoinInfo = &pCmd->tagCond.joinInfo; + if (!pJoinInfo->hasJoin) { // not join query + return; + } + + assert(pJoinInfo->left.uid != pJoinInfo->right.uid); + + // the join condition expression node belongs to this table(super table) + if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->left.uid) { + for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) { + if (pJoinInfo->left.tagCol == pMeterMetaInfo->tagColumnIndex[i]) { + pJoinInfo->left.tagCol = i; + } + } + } + + if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->right.uid) { + for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) { + if (pJoinInfo->right.tagCol == pMeterMetaInfo->tagColumnIndex[i]) { + pJoinInfo->right.tagCol = i; + } + } + } + } int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList) { @@ -2987,8 +3026,6 @@ typedef struct SCondExpr { static int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision); -static int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr); - static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) { if (pExpr->nSQLOptr == TK_ID) { // column name strncpy(*str, pExpr->colInfo.z, pExpr->colInfo.n); @@ -4018,129 +4055,128 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) { } } -int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) { - SSqlCmd* pCmd = &pSql->cmd; - - if (pExpr == NULL) { - return TSDB_CODE_SUCCESS; - } - - pCmd->stime = 0; - pCmd->etime = INT64_MAX; - - int32_t ret = TSDB_CODE_SUCCESS; - - const char* msg1 = "invalid expression"; - SCondExpr condExpr = {0}; - - if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) { - return invalidSqlErrMsg(pCmd, msg1); - } - - ret = doParseWhereClause(pSql, pExpr, &condExpr); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } +static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SCondExpr* pCondExpr) { SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); if (QUERY_IS_JOIN_QUERY(pCmd->type) && UTIL_METER_IS_METRIC(pMeterMetaInfo)) { SColumnIndex index = {0}; - - getColumnIndexByNameEx(&condExpr.pJoinExpr->pLeft->colInfo, pCmd, &index); + + getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pLeft->colInfo, pCmd, &index); pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex); - + int32_t columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns; addRequiredTagColumn(pCmd, columnInfo, index.tableIndex); - - getColumnIndexByNameEx(&condExpr.pJoinExpr->pRight->colInfo, pCmd, &index); + + getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pRight->colInfo, pCmd, &index); pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex); - + columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns; addRequiredTagColumn(pCmd, columnInfo, index.tableIndex); } +} - cleanQueryExpr(&condExpr); +static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SCondExpr* pCondExpr, tSQLExpr** pExpr) { + int32_t ret = TSDB_CODE_SUCCESS; + + if (pCondExpr->pTagCond != NULL) { + for (int32_t i = 0; i < pCmd->numOfTables; ++i) { + tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i); + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i); + + char c[TSDB_MAX_TAGS_LEN] = {0}; + char* str = c; + + if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) { + return ret; + } + + tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c); + + doCompactQueryExpr(pExpr); + tSQLExprDestroy(p1); + } + + pCondExpr->pTagCond = NULL; + } + return ret; } - -int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr) { +int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) { + if (pExpr == NULL) { + return TSDB_CODE_SUCCESS; + } + const char* msg = "invalid filter expression"; - - int32_t type = 0; + const char* msg1 = "invalid expression"; + + int32_t ret = TSDB_CODE_SUCCESS; + SSqlCmd* pCmd = &pSql->cmd; + pCmd->stime = 0; + pCmd->etime = INT64_MAX; - /* - * tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space - */ + //tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space SStringBuilder sb = {0}; + SCondExpr condExpr = {0}; - int32_t ret = TSDB_CODE_SUCCESS; - if ((ret = getQueryCondExpr(pCmd, pExpr, condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) { - return ret; + if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) { + return invalidSqlErrMsg(pCmd, msg1); } + int32_t type = 0; + if ((ret = getQueryCondExpr(pCmd, pExpr, &condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) { + return ret; + } + doCompactQueryExpr(pExpr); - + // after expression compact, the expression tree is only include tag query condition - condExpr->pTagCond = (*pExpr); - + condExpr.pTagCond = (*pExpr); + // 1. check if it is a join query - if ((ret = validateJoinExpr(pCmd, condExpr)) != TSDB_CODE_SUCCESS) { + if ((ret = validateJoinExpr(pCmd, &condExpr)) != TSDB_CODE_SUCCESS) { return ret; } - + // 2. get the query time range - if ((ret = getTimeRangeFromExpr(pCmd, condExpr->pTimewindow)) != TSDB_CODE_SUCCESS) { + if ((ret = getTimeRangeFromExpr(pCmd, condExpr.pTimewindow)) != TSDB_CODE_SUCCESS) { return ret; } - + // 3. get the tag query condition - if (condExpr->pTagCond != NULL) { - for (int32_t i = 0; i < pCmd->numOfTables; ++i) { - tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i); - - char c[TSDB_MAX_TAGS_LEN] = {0}; - char* str = c; - if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) { - return ret; - } - - tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c); - - doCompactQueryExpr(pExpr); - tSQLExprDestroy(p1); - } - - condExpr->pTagCond = NULL; + if ((ret = getTagQueryCondExpr(pCmd, &condExpr, pExpr)) != TSDB_CODE_SUCCESS) { + return ret; } - + // 4. get the table name query condition - if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, &sb)) != TSDB_CODE_SUCCESS) { + if ((ret = getTablenameCond(pCmd, condExpr.pTableCond, &sb)) != TSDB_CODE_SUCCESS) { return ret; } - + // 5. other column query condition - if ((ret = getColumnQueryCondInfo(pCmd, condExpr->pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) { + if ((ret = getColumnQueryCondInfo(pCmd, condExpr.pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) { return ret; } - + // 6. join condition - if ((ret = getJoinCondInfo(pSql, condExpr->pJoinExpr)) != TSDB_CODE_SUCCESS) { + if ((ret = getJoinCondInfo(pSql, condExpr.pJoinExpr)) != TSDB_CODE_SUCCESS) { return ret; } - + // 7. query condition for table name - pCmd->tagCond.relType = (condExpr->relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR; + pCmd->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR; - ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, &sb); + ret = setTableCondForMetricQuery(pSql, condExpr.pTableCond, condExpr.tableCondIndex, &sb); taosStringBuilderDestroy(&sb); if (!validateFilterExpr(pCmd)) { return invalidSqlErrMsg(pCmd, msg); } - + + doAddJoinTagsColumnsIntoTagList(pCmd, &condExpr); + + cleanQueryExpr(&condExpr); return ret; } @@ -4972,6 +5008,8 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) { // handle the limit offset value, validate the limit pCmd->limit = pQuerySql->limit; + pCmd->globalLimit = pCmd->limit.limit; + pCmd->slimit = pQuerySql->slimit; if (pCmd->slimit.offset < 0 || pCmd->limit.offset < 0) { @@ -5684,3 +5722,30 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg *pCreate) { return TSDB_CODE_SUCCESS; } + +// for debug purpose +void tscPrintSelectClause(SSqlCmd* pCmd) { + if (pCmd == NULL || pCmd->exprsInfo.numOfExprs == 0) { + return; + } + + char* str = calloc(1, 10240); + int32_t offset = 0; + + offset += sprintf(str, "%d [", pCmd->exprsInfo.numOfExprs); + for(int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { + SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); + + int32_t size = sprintf(str + offset, "%s(%d)", aAggs[pExpr->functionId].aName, pExpr->colInfo.colId); + offset += size; + + if (i < pCmd->exprsInfo.numOfExprs - 1) { + str[offset++] = ','; + } + } + + str[offset] = ']'; + printf("%s\n", str); + + free(str); +} diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index bf175c55405d940e0d8da9e6552436193c13d0f5..1805eac38dcd1414fbcd4af6f758ce7778ebcc79 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -222,7 +222,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { // multiple vnode query - SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex); if (vnodeList != NULL) { pVPeersDesc = vnodeList->vpeerDesc; } @@ -528,7 +528,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { if (pMeterMetaInfo->pMeterMeta) // it may be deleted pMeterMetaInfo->pMeterMeta->index = pSql->index; } else { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pSql->cmd.vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex); pVnodeSidList->index = pSql->index; } } else { @@ -639,7 +639,7 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu static int tscLaunchMetricSubQueries(SSqlObj *pSql); // todo merge with callback -int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeIdx, SJoinSubquerySupporter *pSupporter) { +int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) { SSqlCmd *pCmd = &pSql->cmd; pSql->res.qhandle = 0x1; @@ -652,12 +652,13 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId } } - SSqlObj *pNew = createSubqueryObj(pSql, vnodeIdx, tableIndex, tscJoinQueryCallback, pSupporter, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL); if (pNew == NULL) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } - + pSql->pSubs[pSql->numOfSubs++] = pNew; + assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal); if (QUERY_IS_JOIN_QUERY(pCmd->type)) { addGroupInfoForSubquery(pSql, pNew, tableIndex); @@ -694,8 +695,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId pExpr->param->i64Key = tagColIndex; pExpr->numOfParams = 1; - addRequiredTagColumn(pCmd, tagColIndex, 0); - // add the filter tag column for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) { SColumnBase *pColBase = &pSupporter->colList.pColList[i]; @@ -707,7 +706,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId } else { pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY; } - + +#ifdef _DEBUG_VIEW + tscPrintSelectClause(&pNew->cmd); +#endif + return tscProcessSql(pNew); } @@ -774,7 +777,7 @@ int tscProcessSql(SSqlObj *pSql) { pSql->index = pMeterMetaInfo->pMeterMeta->index; } else { // it must be the parent SSqlObj for super table query if ((pSql->cmd.type & TSDB_QUERY_TYPE_SUBQUERY) != 0) { - int32_t idx = pSql->cmd.vnodeIdx; + int32_t idx = pMeterMetaInfo->vnodeIndex; SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx); pSql->index = pSidList->index; } @@ -802,7 +805,7 @@ int tscProcessSql(SSqlObj *pSql) { return pSql->res.code; } - int32_t code = tscLaunchJoinSubquery(pSql, i, 0, pSupporter); + int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter); if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query tscDestroyJoinSupporter(pSupporter); pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -944,7 +947,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { trs->pOrderDescriptor = pDesc; trs->pState = pState; trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage)); - trs->vnodeIdx = i; + trs->subqueryIndex = i; trs->pParentSqlObj = pSql; trs->pFinalColModel = pModel; @@ -971,7 +974,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) { pNew->cmd.tsBuf = tsBufClone(pSql->cmd.tsBuf); } - tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, pNew->cmd.vnodeIdx); + tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, trs->subqueryIndex); tscProcessSql(pNew); } @@ -1020,7 +1023,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { SSqlObj *pPObj = trsupport->pParentSqlObj; - int32_t idx = trsupport->vnodeIdx; + int32_t subqueryIndex = trsupport->subqueryIndex; assert(pSql != NULL); @@ -1035,27 +1038,27 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq pSql->res.numOfRows = 0; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql, - trsupport->vnodeIdx, trsupport->pState->code); + subqueryIndex, trsupport->pState->code); } if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query. - tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, idx); - tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, idx, - trsupport->pState->code); + tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex); + tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, + subqueryIndex, trsupport->pState->code); } else { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && trsupport->pState->code == TSDB_CODE_SUCCESS) { /* * current query failed, and the retry count is less than the available * count, retry query clear previous retrieved data, then launch a new sub query */ - tExtMemBufferClear(trsupport->pExtMemBuffer[idx]); + tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]); // clear local saved number of results trsupport->localBuffer->numOfElems = 0; pthread_mutex_unlock(&trsupport->queryMutex); tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows, - idx, trsupport->numOfRetry); + subqueryIndex, trsupport->numOfRetry); SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); if (pNew == NULL) { @@ -1072,7 +1075,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq } else { // reach the maximum retry count, abort atomic_val_compare_exchange_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows); tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql, - numOfRows, idx, trsupport->pState->code); + numOfRows, subqueryIndex, trsupport->pState->code); } } @@ -1115,13 +1118,12 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { SRetrieveSupport *trsupport = (SRetrieveSupport *)param; - int32_t idx = trsupport->vnodeIdx; + int32_t idx = trsupport->subqueryIndex; SSqlObj * pPObj = trsupport->pParentSqlObj; tOrderDescriptor *pDesc = trsupport->pOrderDescriptor; SSqlObj *pSql = (SSqlObj *)tres; - if (pSql == NULL) { - /* sql object has been released in error process, return immediately */ + if (pSql == NULL) { // sql object has been released in error process, return immediately tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx); return; } @@ -1172,7 +1174,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) { } else { // all data has been retrieved to client /* data in from current vnode is stored in cache and disk */ uint32_t numOfRowsFromVnode = - trsupport->pExtMemBuffer[pCmd->vnodeIdx]->numOfAllElems + trsupport->localBuffer->numOfElems; + trsupport->pExtMemBuffer[idx]->numOfAllElems + trsupport->localBuffer->numOfElems; tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip, pSvd->vnode, numOfRowsFromVnode, idx); @@ -1285,10 +1287,10 @@ void tscKillMetricQuery(SSqlObj *pSql) { static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode); static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { - SSqlObj *pNew = createSubqueryObj(pSql, trsupport->vnodeIdx, 0, tscRetrieveDataRes, trsupport, prevSqlObj); + SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; - pSql->pSubs[trsupport->vnodeIdx] = pNew; + pSql->pSubs[trsupport->subqueryIndex] = pNew; } return pNew; @@ -1298,8 +1300,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SRetrieveSupport *trsupport = (SRetrieveSupport *)param; SSqlObj * pSql = (SSqlObj *)tres; - int32_t idx = pSql->cmd.vnodeIdx; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + int32_t idx = pMeterMetaInfo->vnodeIndex; SVnodeSidList *vnodeInfo = NULL; SVPeerDesc * pSvd = NULL; @@ -1317,7 +1319,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { code = trsupport->pState->code; } tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", trsupport->pParentSqlObj, pSql, - trsupport->vnodeIdx, code); + trsupport->subqueryIndex, code); } /* @@ -1337,7 +1339,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql); if (pNew == NULL) { tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d", - trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->vnodeIdx); + trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex); trsupport->pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; @@ -1353,17 +1355,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (vnodeInfo != NULL) { tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, - trsupport->vnodeIdx, trsupport->pState->code); + trsupport->subqueryIndex, trsupport->pState->code); } else { tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql, - trsupport->vnodeIdx, trsupport->pState->code); + trsupport->subqueryIndex, trsupport->pState->code); } tscRetrieveFromVnodeCallBack(param, tres, trsupport->pState->code); } else { // success, proceed to retrieve data from dnode tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql, vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode, - trsupport->vnodeIdx); + trsupport->subqueryIndex); taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param); } @@ -1438,7 +1440,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) { pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); } else { // query on metric SMetricMeta * pMetricMeta = pMeterMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode); } } @@ -1461,7 +1463,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) { SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids; int32_t outputColumnSize = pCmd->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg); @@ -1506,12 +1508,12 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->numOfTagsCols = 0; } else { // query on metric SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; - if (pCmd->vnodeIdx < 0) { - tscError("%p error vnodeIdx:%d", pSql, pCmd->vnodeIdx); + if (pMeterMetaInfo->vnodeIndex < 0) { + tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex); return -1; } - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode; numOfMeters = pVnodeSidList->numOfSids; @@ -1693,7 +1695,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->colNameLen = htonl(len); // set sids list - tscTrace("%p vid:%d, query on %d meters", pSql, pSql->cmd.vnodeIdx, numOfMeters); + tscTrace("%p vid:%d, query on %d meters", pSql, htons(pQueryMsg->vnode), numOfMeters); if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { #ifdef _DEBUG_VIEW @@ -1703,7 +1705,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pSMeterTagInfo->sid = htonl(pMeterMeta->sid); pMsg += sizeof(SMeterSidExtInfo); } else { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); for (int32_t i = 0; i < numOfMeters; ++i) { SMeterSidExtInfo *pMeterTagInfo = (SMeterSidExtInfo *)pMsg; @@ -1774,7 +1776,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) { int32_t numOfBlocks = 0; if (pCmd->tsBuf != NULL) { - STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pCmd->tsBuf, pCmd->vnodeIdx); + STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pCmd->tsBuf, pMeterMetaInfo->vnodeIndex); assert(QUERY_IS_JOIN_QUERY(pCmd->type) && pBlockInfo != NULL); // this query should not be sent // todo refactor diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 4d7f2734a940500f04af0c2cf1abb4ac8f6e6335..04f9fc0aa62b01d2875909376ec9a626e59246be 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -16,21 +16,21 @@ #include "os.h" #include "tcache.h" #include "tlog.h" +#include "tnote.h" #include "trpc.h" #include "tscJoinProcess.h" #include "tscProfile.h" +#include "tscSQLParser.h" #include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tsclient.h" #include "tscompression.h" #include "tsocket.h" -#include "tscSQLParser.h" #include "ttimer.h" #include "tutil.h" -#include "tnote.h" -TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), - void *param, void **taos) { +TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, + void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { STscObj *pObj; taos_init(); @@ -81,7 +81,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; return NULL; } - + memset(pObj, 0, sizeof(STscObj)); pObj->signature = pObj; @@ -113,7 +113,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const free(pObj); return NULL; } - + memset(pSql, 0, sizeof(SSqlObj)); pSql->pTscObj = pObj; pSql->signature = pSql; @@ -162,14 +162,14 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha void *taos = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL); if (taos != NULL) { - STscObj* pObj = (STscObj*) taos; + STscObj *pObj = (STscObj *)taos; // version compare only requires the first 3 segments of the version string int32_t comparedSegments = 3; - char client_version[64] = {0}; - char server_version[64] = {0}; - int clientVersionNumber[4] = {0}; - int serverVersionNumber[4] = {0}; + char client_version[64] = {0}; + char server_version[64] = {0}; + int clientVersionNumber[4] = {0}; + int serverVersionNumber[4] = {0}; strcpy(client_version, version); strcpy(server_version, taos_get_server_info(taos)); @@ -188,7 +188,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha return NULL; } - for(int32_t i = 0; i < comparedSegments; ++i) { + for (int32_t i = 0; i < comparedSegments; ++i) { if (clientVersionNumber[i] != serverVersionNumber[i]) { tscError("taos:%p, the %d-th number of server version:%s not matched with client version:%s, close connection", taos, i, server_version, version); @@ -225,7 +225,7 @@ void taos_close(TAOS *taos) { } } -int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { +int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; pRes->numOfRows = 1; @@ -251,7 +251,7 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) { } else { tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj); } - + if (pRes->code != TSDB_CODE_SUCCESS) { tscFreeSqlObjPartial(pSql); } @@ -271,9 +271,10 @@ int taos_query(TAOS *taos, const char *sqlstr) { size_t sqlLen = strlen(sqlstr); if (sqlLen > tsMaxSQLStringLen) { - pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql + pRes->code = + tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); - + return pRes->code; } @@ -283,7 +284,7 @@ int taos_query(TAOS *taos, const char *sqlstr) { if (sql == NULL) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno)); - + tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj); return pRes->code; } @@ -451,25 +452,56 @@ static void **getOneRowFromBuf(SSqlObj *pSql) { return pRes->tsrow; } -static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { +static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { + bool hasData = true; SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - while (1) { - bool hasData = true; + if (tscProjectionQueryOnMetric(pCmd)) { + bool allSubqueryExhausted = true; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlRes *pRes1 = &pSql->pSubs[i]->res; + SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; + + SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfo(pCmd1, 0); + assert(pCmd1->numOfTables == 1); + + /* + * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are + * available, go on + */ + if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && + (!tscHasReachLimitation(pSql->pSubs[i]))) { + allSubqueryExhausted = false; + break; + } + } - // in case inner join, if any subquery exhausted, query completed - if (pRes1->numOfRows == 0) { + hasData = !allSubqueryExhausted; + } else { // otherwise, in case inner join, if any subquery exhausted, query completed. + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlRes *pRes1 = &pSql->pSubs[i]->res; + + if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pSql->pSubs[i]) && + tscProjectionQueryOnTable(&pSql->pSubs[i]->cmd)) || + (pRes1->numOfRows == 0)) { + hasData = false; break; } } + } + + return hasData; +} + +static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; - if (!hasData) { // free all sub sqlobj - tscTrace("%p one subquery exhausted, free other %d subquery", pSql, pSql->numOfSubs - 1); + while (1) { + if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj + tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); SSubqueryState *pState = NULL; @@ -487,41 +519,32 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { } if (pRes->tsrow == NULL) { - pRes->tsrow = malloc(sizeof(void *) * pCmd->exprsInfo.numOfExprs); + pRes->tsrow = malloc(POINTER_BYTES * pCmd->exprsInfo.numOfExprs); } bool success = false; - if (pSql->numOfSubs >= 2) { - // do merge result + if (pSql->numOfSubs >= 2) { // do merge result SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes2 = &pSql->pSubs[1]->res; - while (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) { + if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) { doSetResultRowData(pSql->pSubs[0]); doSetResultRowData(pSql->pSubs[1]); - - TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; - TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; - - if (key1 == key2) { - success = true; - pRes1->row++; - pRes2->row++; - break; - } else if (key1 < key2) { - pRes1->row++; - } else if (key1 > key2) { - pRes2->row++; - } + // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; + // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; + // printf("first:%lld, second:%lld\n", key1, key2); + success = true; + pRes1->row++; + pRes2->row++; } - } else { + } else { // only one subquery SSqlRes *pRes1 = &pSql->pSubs[0]->res; doSetResultRowData(pSql->pSubs[0]); success = (pRes1->row++ < pRes1->numOfRows); } - if (success) { + if (success) { // current row of final output has been built, return to app for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { int32_t tableIndex = pRes->pColumnIndex[i].tableIndex; int32_t columnIndex = pRes->pColumnIndex[i].columnIndex; @@ -531,7 +554,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { } break; - } else { + } else { // continue retrieve data from vnode tscFetchDatablockFromSubquery(pSql); if (pRes->code != TSDB_CODE_SUCCESS) { return NULL; @@ -553,9 +576,12 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); + if (pRes->code == TSDB_CODE_SUCCESS) { + tscTrace("%p data from all subqueries have been retrieved to client", pSql); return tscJoinResultsetFromBuf(pSql); } else { + tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code); return NULL; } @@ -596,7 +622,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); // reach the maximum number of output rows, abort - if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { + if (tscHasReachLimitation(pSql)) { return NULL; } @@ -609,7 +635,15 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - if ((++pCmd->vnodeIdx) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + /* + * For project query with super table join, the numOfSub is equalled to the number of all subqueries, so + * we need to reset the value of numOfSubs to be 0. + * + * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. + */ + pSql->numOfSubs = 0; + + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { pCmd->command = TSDB_SQL_SELECT; assert(pSql->fp == NULL); tscProcessSql(pSql); @@ -617,7 +651,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } // check!!! - if (rows != NULL || pCmd->vnodeIdx >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { + if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { break; } } @@ -643,7 +677,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { nRows = taos_fetch_block_impl(res, rows); while (*rows == NULL && tscProjectionQueryOnMetric(pCmd)) { /* reach the maximum number of output rows, abort */ - if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { + if (tscHasReachLimitation(pSql)) { return 0; } @@ -653,8 +687,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { pCmd->limit.limit = pSql->cmd.globalLimit - pRes->numOfTotal; pCmd->limit.offset = pRes->offset; - - if ((++pSql->cmd.vnodeIdx) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { + if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { pSql->cmd.command = TSDB_SQL_SELECT; assert(pSql->fp == NULL); tscProcessSql(pSql); @@ -662,7 +695,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { } // check!!! - if (*rows != NULL || pCmd->vnodeIdx >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { + if (*rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) { break; } } @@ -888,12 +921,11 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) size_t xlen = strlen(row[i]); size_t trueLen = MIN(xlen, fields[i].bytes); - memcpy(str + len, (char*) row[i], trueLen); + memcpy(str + len, (char *)row[i], trueLen); str[len + trueLen] = ' '; len += (trueLen + 1); - } - break; + } break; case TSDB_DATA_TYPE_TIMESTAMP: len += sprintf(str + len, "%lld ", *((int64_t *)row[i])); @@ -950,7 +982,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { return code; } -static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t tblListLen) { +static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) { // must before clean the sqlcmd object tscRemoveAllMeterMetaInfo(&pSql->cmd, false); tscCleanSqlCmd(&pSql->cmd); @@ -961,11 +993,11 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t pCmd->count = 0; int code = TSDB_CODE_INVALID_METER_ID; - char *str = (char*) tblNameList; + char *str = (char *)tblNameList; SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd); - if ((code = tscAllocPayload(pCmd, tblListLen+16)) != TSDB_CODE_SUCCESS) { + if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) { return code; } @@ -987,7 +1019,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t strtrim(tblName); len = (uint32_t)strlen(tblName); - + SSQLToken sToken = {.n = len, .type = TK_ID, .z = tblName}; tSQLGetToken(tblName, &sToken.type); @@ -1031,7 +1063,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t } int taos_load_table_info(TAOS *taos, const char *tableNameList) { - const int32_t MAX_TABLE_NAME_LENGTH = 12*1024*1024; // 12MB list + const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) { @@ -1055,7 +1087,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { return pRes->code; } - char* str = calloc(1, tblListLen + 1); + char *str = calloc(1, tblListLen + 1); if (str == NULL) { pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; tscError("%p failed to malloc sql string buffer", pSql); @@ -1063,7 +1095,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } strtolower(str, tableNameList); - pRes->code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen); + pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen); /* * set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query. diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7fd3d7706bc4bf7234593b29f51bff7fc04f417d..d0da79651e6c73cad34b56ed346d67c7c2b97e56 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -244,8 +244,7 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) { //for project query, only the following two function is allowed for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { - SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); - int32_t functionId = pExpr->functionId; + int32_t functionId = tscSqlExprGet(pCmd, i)->functionId; if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS) { return false; @@ -255,6 +254,17 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) { return true; } +bool tscProjectionQueryOnTable(SSqlCmd* pCmd) { + for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) { + int32_t functionId = tscSqlExprGet(pCmd, i)->functionId; + if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) { + return false; + } + } + + return true; +} + bool tscIsPointInterpQuery(SSqlCmd* pCmd) { for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pCmd, i); @@ -1474,7 +1484,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { * data blocks have been submit to vnode. */ SDataBlockList* pDataBlocks = pCmd->pDataBlocks; - if (pDataBlocks == NULL || pCmd->vnodeIdx >= pDataBlocks->nSize) { + + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0); + assert(pSql->cmd.numOfTables == 1); + + if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { tscTrace("%p object should be release since all data blocks have been submit", pSql); return true; } else { @@ -1487,10 +1501,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index) { - if (pCmd == NULL || index >= pCmd->numOfTables || index < 0) { + if (pCmd == NULL || pCmd->numOfTables == 0) { return NULL; } + assert(index >= 0 && index <= pCmd->numOfTables && pCmd->pMeterInfo != NULL); return pCmd->pMeterInfo[index]; } @@ -1533,7 +1548,7 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta* pMeterMetaInfo->numOfTags = numOfTags; if (tags != NULL) { - memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(int16_t) * numOfTags); + memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(pMeterMetaInfo->tagColumnIndex[0]) * numOfTags); } pCmd->numOfTables += 1; @@ -1587,13 +1602,13 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { pRes->numOfRows = 0; } -SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param, - SSqlObj* pPrevSql) { +SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) { SSqlCmd* pCmd = &pSql->cmd; + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); if (pNew == NULL) { - tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex); + tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); return NULL; } @@ -1602,7 +1617,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex pNew->sqlstr = strdup(pSql->sqlstr); if (pNew->sqlstr == NULL) { - tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex); + tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); free(pNew); return NULL; @@ -1627,15 +1642,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex tscTagCondCopy(&pNew->cmd.tagCond, &pCmd->tagCond); if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) { - tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex); + tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex); tscFreeSqlObj(pNew); return NULL; } tscColumnBaseInfoCopy(&pNew->cmd.colList, &pCmd->colList, (int16_t)tableIndex); - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); - + // set the correct query type if (pPrevSql != NULL) { pNew->cmd.type = pPrevSql->cmd.type; @@ -1666,12 +1679,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex pNew->fp = fp; pNew->param = param; - pNew->cmd.vnodeIdx = vnodeIndex; SMeterMetaInfo* pMetermetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); char key[TSDB_MAX_TAGS_LEN + 1] = {0}; tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); - + +#ifdef _DEBUG_VIEW + printf("the metricmeta key is:%s\n", key); +#endif + char* name = pMeterMetaInfo->name; SMeterMetaInfo* pFinalInfo = NULL; @@ -1695,8 +1711,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex assert(pFinalInfo->pMetricMeta != NULL); } - tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex, - pNew->cmd.type); + tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d", pSql, pNew, tableIndex, + pMeterMetaInfo->vnodeIndex, pNew->cmd.type); return pNew; } @@ -1765,3 +1781,12 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s return TSDB_CODE_INVALID_SQL; } +bool tscHasReachLimitation(SSqlObj* pSql) { + assert(pSql != NULL && pSql->cmd.globalLimit != 0); + + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + + return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit); +} + diff --git a/src/system/detail/src/mgmtShell.c b/src/system/detail/src/mgmtShell.c index 6084c5489d8ff446468af1dc5c72db881d542915..fe1932b24a123c00bc8e3c4e66f3d0f567df7c36 100644 --- a/src/system/detail/src/mgmtShell.c +++ b/src/system/detail/src/mgmtShell.c @@ -978,12 +978,19 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { if (code == 1) { //mTrace("table:%s, wait vgroup create finish", pCreate->meterId, code); - } - else if (code != 0) { - mError("table:%s, failed to create table, code:%d", pCreate->meterId, code); + } else if (code != TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_TABLE_ALREADY_EXIST) { // table already created when the second attempt to create table + + STabObj* pMeter = mgmtGetMeter(pCreate->meterId); + assert(pMeter != NULL); + + mWarn("table:%s, table already created, failed to create table, ts:%lld, code:%d", pCreate->meterId, + pMeter->createdTime, code); + } else { // other errors + mError("table:%s, failed to create table, code:%d", pCreate->meterId, code); + } } else { mTrace("table:%s, table is created by %s", pCreate->meterId, pConn->pUser->user); - //mLPrint("meter:%s is created by %s", pCreate->meterId, pConn->pUser->user); } taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_TABLE_RSP, code);