未验证 提交 9d8b9691 编写于 作者: S slguan 提交者: GitHub

Merge pull request #879 from taosdata/feature/liaohj

Feature/liaohj
......@@ -94,7 +94,7 @@ typedef struct SRetrieveSupport {
tOrderDescriptor *pOrderDescriptor;
tColModel * pFinalColModel; // colModel for final result
SSubqueryState * pState;
int32_t vnodeIdx; // index of current vnode in vnode list
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSqlObj;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
......
......@@ -23,14 +23,14 @@ extern "C" {
/*
* @date 2018/09/30
*/
#include <limits.h>
#include <stdio.h>
#include "os.h"
#include "textbuffer.h"
#include "tscSecondaryMerge.h"
#include "tsclient.h"
#include "tsdb.h"
#include "tscSecondaryMerge.h"
#define UTIL_METER_IS_METRIC(metaInfo) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
......@@ -52,7 +52,6 @@ typedef struct SParsedDataColInfo {
typedef struct SJoinSubquerySupporter {
SSubqueryState* pState;
SSqlObj* pObj; // parent SqlObj
bool hasMore; // has data from vnode to fetch
int32_t subqueryIndex; // index of sub query
int64_t interval; // interval time
SLimitVal limit; // limit info
......@@ -62,28 +61,27 @@ typedef struct SJoinSubquerySupporter {
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupbyExpr;
struct STSBuf* pTSBuf;
FILE* f;
char path[PATH_MAX];
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path
} SJoinSubquerySupporter;
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
STableDataBlocks* tscCreateDataBlock(int32_t size);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset);
SDataBlockList* tscCreateBlockArrayList();
void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset);
SDataBlockList* tscCreateBlockArrayList();
void* tscDestroyBlockArrayList(SDataBlockList* pList);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
STableDataBlocks* tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, char* tableId);
STableDataBlocks* tscCreateDataBlockEx(size_t size, int32_t rowSize, int32_t startOffset, char* name);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/**
......@@ -97,6 +95,8 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
bool tscIsPointInterpQuery(SSqlCmd* pCmd);
bool tscIsTWAQuery(SSqlCmd* pCmd);
bool tscProjectionQueryOnMetric(SSqlCmd* pCmd);
bool tscProjectionQueryOnTable(SSqlCmd* pCmd);
bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SSqlCmd* pCmd);
......@@ -108,7 +108,7 @@ void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t
void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex);
int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex);
void tscClearInterpInfo(SSqlCmd* pCmd);
void tscClearInterpInfo(SSqlCmd* pCmd);
bool tscIsInsertOrImportData(char* sqlstr);
......@@ -128,9 +128,9 @@ void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList
void tscFieldInfoCopyAll(SFieldInfo* src, SFieldInfo* dst);
TAOS_FIELD* tscFieldInfoGetField(SSqlCmd* pCmd, int32_t index);
int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index);
int32_t tscGetResRowLength(SSqlCmd* pCmd);
void tscClearFieldInfo(SFieldInfo* pFieldInfo);
int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index);
int32_t tscGetResRowLength(SSqlCmd* pCmd);
void tscClearFieldInfo(SFieldInfo* pFieldInfo);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);
......@@ -142,15 +142,15 @@ SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int
int16_t size);
SSqlExpr* tscSqlExprGet(SSqlCmd* pCmd, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid);
SColumnBase* tscColumnBaseInfoInsert(SSqlCmd* pCmd, SColumnIndex* colIndex);
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src);
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src);
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src);
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src);
void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex);
void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex);
SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index);
void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex);
void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex);
void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size);
void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo);
......@@ -163,11 +163,10 @@ bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex);
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tscTagCondCopy(STagCond* dest, const STagCond* src);
void tscTagCondRelease(STagCond* pCond);
void tscTagCondSetQueryCondType(STagCond* pCond, int16_t type);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SSqlCmd* pCmd);
......@@ -176,19 +175,19 @@ bool tscShouldFreeHeatBeat(SSqlObj* pHb);
void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache);
void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache);
SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index);
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SSqlCmd* pCmd, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache);
void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache);
SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta,
int16_t numOfTags, int16_t* tags);
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql);
int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex);
int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists);
int tscGetMetricMeta(SSqlObj* pSql);
int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex);
int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
......@@ -212,9 +211,8 @@ void tscDoQuery(SSqlObj* pSql);
* @param pPrevSql
* @return
*/
SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param,
SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex);
void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex);
......@@ -224,6 +222,9 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port,
void* param, void** taos);
void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlCmd* pCmd);
#ifdef __cplusplus
}
#endif
......
......@@ -107,22 +107,25 @@ enum _sql_cmd {
struct SSqlInfo;
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
int16_t tableIndex;
int16_t numOfGroupCols;
SColIndexEx columnInfo[TSDB_MAX_TAGS]; // group by columns information
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr;
typedef struct SMeterMetaInfo {
SMeterMeta * pMeterMeta; // metermeta
SMetricMeta *pMetricMeta; // metricmeta
char name[TSDB_METER_ID_LEN + 1];
int16_t numOfTags; // total required tags in query, including groupby tags
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
SMeterMeta * pMeterMeta; // metermeta
SMetricMeta *pMetricMeta; // metricmeta
/*
* 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion
*/
int32_t vnodeIndex;
char name[TSDB_METER_ID_LEN + 1]; // table(super table) name
int16_t numOfTags; // total required tags in query, including groupby tags
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
} SMeterMetaInfo;
/* the structure for sql function in select clause */
......@@ -188,7 +191,7 @@ typedef struct SString {
typedef struct SCond {
uint64_t uid;
char* cond;
char * cond;
} SCond;
typedef struct SJoinNode {
......@@ -262,15 +265,15 @@ typedef struct SDataBlockList {
typedef struct {
SOrderVal order;
int command;
int count;// TODO refactor
int count; // TODO refactor
union {
bool existsCheck; // check if the table exists
int8_t showType; // show command type
bool existsCheck; // check if the table exists
int8_t showType; // show command type
};
int8_t isInsertFromFile; // load data from file or not
bool import; // import/insert type
bool import; // import/insert type
char msgType;
uint16_t type; // query type
char intervalTimeUnit;
......@@ -296,7 +299,6 @@ typedef struct {
SLimitVal slimit;
int64_t globalLimit;
STagCond tagCond;
int16_t vnodeIdx; // vnode index in pMetricMeta for metric query
int16_t interpoType; // interpolate type
int16_t numOfTables;
......@@ -366,25 +368,23 @@ typedef struct _sql_obj {
STscObj *pTscObj;
void (*fp)();
void (*fetchFp)();
void * param;
uint32_t ip;
short vnode;
int64_t stime;
uint32_t queryId;
void * thandle;
void * pStream;
char * sqlstr;
char retry;
char maxRetry;
char index;
char freed : 4;
char listed : 4;
tsem_t rspSem;
tsem_t emptyRspSem;
SSqlCmd cmd;
SSqlRes res;
void * param;
uint32_t ip;
short vnode;
int64_t stime;
uint32_t queryId;
void * thandle;
void * pStream;
char * sqlstr;
char retry;
char maxRetry;
char index;
char freed : 4;
char listed : 4;
tsem_t rspSem;
tsem_t emptyRspSem;
SSqlCmd cmd;
SSqlRes res;
char numOfSubs;
struct _sql_obj **pSubs;
struct _sql_obj * prev, *next;
......@@ -477,6 +477,8 @@ void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj);
bool tscHasReachLimitation(SSqlObj* pSql);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
// transfer SSqlInfo to SqlCmd struct
......
......@@ -112,7 +112,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
tSQLSyntaxNode *pNode = NULL;
if (pToken->type == TK_ID || pToken->type == TK_TBNAME) {
int32_t i = 0;
int32_t i = 0;
if (pToken->type == TK_ID) {
do {
size_t len = strlen(pSchema[i].name);
......@@ -652,8 +652,7 @@ void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, bool (*fp)(tSkipList
// brutal force search
int64_t num = pResult->num;
for (int32_t i = 0, j = 0; i < pResult->num; ++i) {
//if (fp == NULL || (fp != NULL && fp(pResult->pRes[i], pExpr->info) == true)) {
if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) {
if (fp == NULL || (fp(pResult->pRes[i], pExpr->info) == true)) {
pResult->pRes[j++] = pResult->pRes[i];
} else {
num--;
......
......@@ -121,7 +121,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx
if (numOfRows == 0 && tscProjectionQueryOnMetric(pCmd)) {
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx
assert(pCmd->vnodeIdx >= 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
/* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
......@@ -133,8 +134,8 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
pCmd->limit.offset = pRes->offset;
if ((++(pCmd->vnodeIdx)) < tscGetMeterMetaInfo(pCmd, 0)->pMetricMeta->numOfVnodes) {
tscTrace("%p retrieve data from next vnode:%d", pSql, pCmd->vnodeIdx);
if ((++(pMeterMetaInfo->vnodeIndex)) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
tscTrace("%p retrieve data from next vnode:%d", pSql, pMeterMetaInfo->vnodeIndex);
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
......@@ -271,7 +272,8 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
/*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved
*/
assert(pCmd->vnodeIdx >= 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
/* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
......@@ -282,7 +284,7 @@ void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
/* update the limit value according to current retrieval results */
pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
if ((++pCmd->vnodeIdx) <= tscGetMeterMetaInfo(pCmd, 0)->pMetricMeta->numOfVnodes) {
if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
tscResetForNextRetrieve(pRes);
......@@ -403,9 +405,12 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
int32_t code = TSDB_CODE_SUCCESS;
assert(!pCmd->isInsertFromFile && pSql->signature == pSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pCmd->numOfTables == 1);
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
if (pDataBlocks == NULL || pCmd->vnodeIdx >= pDataBlocks->nSize) {
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
// restore user defined fp
pSql->fp = pSql->fetchFp;
tscTrace("%p Async insertion completed, destroy data block list", pSql);
......@@ -417,17 +422,17 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
(*pSql->fp)(pSql->param, tres, numOfRows);
} else {
do {
code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pCmd->vnodeIdx++]);
code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pMeterMetaInfo->vnodeIndex++]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d",
pSql, pCmd->vnodeIdx - 1, pDataBlocks->nSize, code);
pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize, code);
}
} while (code != TSDB_CODE_SUCCESS && pCmd->vnodeIdx < pDataBlocks->nSize);
} while (code != TSDB_CODE_SUCCESS && pMeterMetaInfo->vnodeIndex < pDataBlocks->nSize);
// build submit msg may fail
if (code == TSDB_CODE_SUCCESS) {
tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pCmd->vnodeIdx - 1, pDataBlocks->nSize);
tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize);
tscProcessSql(pSql);
}
}
......@@ -483,11 +488,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
// check if it is a sub-query of metric query first, if true, enter another routine
if ((pSql->cmd.type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pCmd->vnodeIdx >= 0 && pSql->param != NULL);
assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj;
assert(pParObj->signature == pParObj && trs->vnodeIdx == pCmd->vnodeIdx &&
assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex &&
pMeterMetaInfo->pMeterMeta->numOfTags != 0);
tscTrace("%p get metricMeta during metric query successfully", pSql);
......
......@@ -13,9 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscJoinProcess.h"
#include "os.h"
#include "tcache.h"
#include "tscJoinProcess.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscompression.h"
......@@ -45,8 +45,8 @@ static bool doCompare(int32_t order, int64_t left, int64_t right) {
}
}
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1, SJoinSubquerySupporter* pSupporter2,
TSKEY* st, TSKEY* et) {
static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSupporter1,
SJoinSubquerySupporter* pSupporter2, TSKEY* st, TSKEY* et) {
STSBuf* output1 = tsBufCreate(true);
STSBuf* output2 = tsBufCreate(true);
......@@ -150,22 +150,21 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
tsBufDestory(pSupporter1->pTSBuf);
tsBufDestory(pSupporter2->pTSBuf);
tscTrace("%p input1:%lld, input2:%lld, %lld for secondary query after ts blocks intersecting",
pSql, numOfInput1, numOfInput2, output1->numOfTotal);
tscTrace("%p input1:%lld, input2:%lld, final:%lld for secondary query after ts blocks intersecting", pSql,
numOfInput1, numOfInput2, output1->numOfTotal);
return output1->numOfTotal;
}
//todo handle failed to create sub query
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, /*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) {
// todo handle failed to create sub query
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState,
/*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) {
SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter));
if (pSupporter == NULL) {
return NULL;
}
pSupporter->pObj = pSql;
pSupporter->hasMore = true;
pSupporter->pState = pState;
pSupporter->subqueryIndex = index;
......@@ -226,12 +225,6 @@ bool needSecondaryQuery(SSqlObj* pSql) {
* launch secondary stage query to fetch the result that contains timestamp in set
*/
int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
// TODO not launch secondary stage query
// if (!needSecondaryQuery(pSql)) {
// return;
// }
// sub query may not be necessary
int32_t numOfSub = 0;
SJoinSubquerySupporter* pSupporter = NULL;
......@@ -239,15 +232,22 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
pSupporter = pSql->pSubs[i]->param;
pSupporter->pState->numOfCompleted = 0;
/*
* If the columns are not involved in the final select clause, the secondary query will not be launched
* for the subquery.
*/
if (pSupporter->exprsInfo.numOfExprs > 0) {
++numOfSub;
}
}
// scan all subquery, if one sub query has only ts, ignore it
int32_t j = 0;
tscTrace("%p start to launch secondary subqueries: %d", pSql, pSql->numOfSubs);
tscTrace(
"%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in "
"select clause",
pSql, pSql->numOfSubs, numOfSub);
int32_t j = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
pSupporter = pSub->param;
......@@ -259,15 +259,14 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
continue;
}
SSqlObj* pNew = createSubqueryObj(pSql, 0, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL);
SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) {
pSql->numOfSubs = i; //revise the number of subquery
pSql->numOfSubs = i; // revise the number of subquery
pSupporter->pState->numOfTotal = i;
pSupporter->pState->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscDestroyJoinSupporter(pSupporter);
return NULL;
return 0;
}
tscFreeSqlCmdData(&pNew->cmd);
......@@ -282,7 +281,6 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
pNew->cmd.type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
pNew->cmd.nAggTimeInterval = pSupporter->interval;
pNew->cmd.limit = pSupporter->limit;
pNew->cmd.groupbyExpr = pSupporter->groupbyExpr;
tscColumnBaseInfoCopy(&pNew->cmd.colList, &pSupporter->colList, 0);
......@@ -302,6 +300,13 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0);
/*
* When handling the projection query, the offset value will be modified for table-table join, which is changed
* during the timestamp intersection.
*/
pSupporter->limit = pSql->cmd.limit;
pNew->cmd.limit = pSupporter->limit;
// fetch the join tag column
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
SSqlExpr* pExpr = tscSqlExprGet(&pNew->cmd, 0);
......@@ -310,10 +315,12 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) {
int16_t tagColIndex = tscGetJoinTagColIndexByUid(&pNew->cmd.tagCond, pMeterMetaInfo->pMeterMeta->uid);
pExpr->param[0].i64Key = tagColIndex;
pExpr->numOfParams = 1;
addRequiredTagColumn(&pNew->cmd, tagColIndex, 0);
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause(&pNew->cmd);
#endif
tscProcessSql(pNew);
}
......@@ -384,9 +391,9 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
return;
}
if (numOfRows > 0) { // write the data into disk
if (numOfRows > 0) { // write the data into disk
fwrite(pSql->res.data, pSql->res.numOfRows, 1, pSupporter->f);
fflush(pSupporter->f);
fclose(pSupporter->f);
STSBuf* pBuf = tsBufCreateFromFile(pSupporter->path, true);
if (pBuf == NULL) {
......@@ -401,7 +408,10 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscTrace("%p create tmp file for ts block:%s", pSql, pBuf->path);
pSupporter->pTSBuf = pBuf;
} else {
tsBufMerge(pSupporter->pTSBuf, pBuf, pSql->cmd.vnodeIdx);
assert(pSql->cmd.numOfTables == 1); // for subquery, only one metermetaInfo
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pMeterMetaInfo->vnodeIndex);
tsBufDestory(pBuf);
}
......@@ -411,12 +421,25 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
pSql->res.row = pSql->res.numOfRows;
taos_fetch_rows_a(tres, joinRetrieveCallback, param);
} else if (numOfRows == 0) { // no data from this vnode anymore
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
} else if (numOfRows == 0) { // no data from this vnode anymore
if (tscProjectionQueryOnMetric(&pParentSql->cmd)) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfTables == 1);
// for projection query, need to try next vnode
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres,
pSupporter->subqueryIndex);
pSupporter->subqueryIndex);
doQuitSubquery(pParentSql);
return;
}
......@@ -451,8 +474,33 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscError("%p retrieve failed, code:%d, index:%d", pSql, numOfRows, pSupporter->subqueryIndex);
}
if (numOfRows >= 0) {
pSql->res.numOfTotal += pSql->res.numOfRows;
}
if (tscProjectionQueryOnMetric(&pSql->cmd) && numOfRows == 0) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSupporter->pState->numOfCompleted = 0;
pSupporter->pState->numOfTotal = 1;
pSql->cmd.command = TSDB_SQL_SELECT;
pSql->fp = tscJoinQueryCallback;
tscProcessSql(pSql);
return;
}
}
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
tscTrace("%p secondary retrieve completed, global code:%d", tres, pParentSql->res.code);
assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal);
tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal,
pParentSql->res.code);
if (pSupporter->pState->code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = abs(pSupporter->pState->code);
freeSubqueryObj(pParentSql);
......@@ -466,50 +514,69 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
int32_t numOfFetch = 0;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param;
assert(pSql->numOfSubs >= 1);
SSqlRes* pRes = &pSql->pSubs[i]->res;
if (pRes->row >= pRes->numOfRows && pSupporter->hasMore) {
numOfFetch++;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes = &pSql->pSubs[i]->res;
SSqlCmd *pCmd = &pSql->pSubs[i]->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (tscProjectionQueryOnMetric(pCmd)) {
if (pRes->row >= pRes->numOfRows && pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes &&
(!tscHasReachLimitation(pSql->pSubs[i]))) {
numOfFetch++;
}
} else {
if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && tscProjectionQueryOnTable(pSql))
|| (pRes->numOfRows == 0)) {
numOfFetch++;
}
}
}
if (numOfFetch > 0) {
tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch);
if (numOfFetch <= 0) {
return ;
}
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param;
pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed
pSupporter->pState->numOfCompleted = 0;
// TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch);
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param;
pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed
pSupporter->pState->numOfCompleted = 0;
SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i];
pSupporter = (SJoinSubquerySupporter*)pSql1->param;
SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd;
// wait for all subqueries completed
pSupporter->pState->numOfTotal = numOfFetch;
if (pRes1->row >= pRes1->numOfRows && pSupporter->hasMore) {
tscTrace("%p subquery:%p retrieve data from vnode, index:%d", pSql, pSql1, pSupporter->subqueryIndex);
pSupporter = (SJoinSubquerySupporter*)pSql1->param;
tscResetForNextRetrieve(pRes1);
// wait for all subqueries completed
pSupporter->pState->numOfTotal = numOfFetch;
assert(pRes1->numOfRows >= 0 && pCmd1->numOfTables == 1);
pSql1->fp = joinRetrieveCallback;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd1, 0);
if (pRes1->row >= pRes1->numOfRows) {
tscTrace("%p subquery:%p retrieve data from vnode, subquery:%d, vnodeIndex:%d", pSql, pSql1,
pSupporter->subqueryIndex, pMeterMetaInfo->vnodeIndex);
if (pCmd1->command < TSDB_SQL_LOCAL) {
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscResetForNextRetrieve(pRes1);
pSql1->fp = joinRetrieveCallback;
tscProcessSql(pSql1);
if (pCmd1->command < TSDB_SQL_LOCAL) {
pCmd1->command = (pCmd1->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
}
// wait for all subquery completed
tsem_wait(&pSql->rspSem);
tscProcessSql(pSql1);
}
}
// wait for all subquery completed
tsem_wait(&pSql->rspSem);
}
// all subqueries return, set the result output index
......@@ -519,6 +586,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
tscTrace("%p all subquery response, retrieve data", pSql);
if (pRes->pColumnIndex != NULL) {
return; // the column transfer support struct has been built
}
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pCmd->fieldsInfo.numOfOutputCols);
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
......@@ -608,20 +679,35 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) {
tscSetupOutputColumnIndex(pParentSql);
if (pParentSql->fp == NULL) {
tsem_wait(&pParentSql->emptyRspSem);
tsem_wait(&pParentSql->emptyRspSem);
tsem_post(&pParentSql->rspSem);
} else {
// set the command flag must be after the semaphore been correctly set.
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
// if (pPObj->res.code == TSDB_CODE_SUCCESS) {
// (*pPObj->fp)(pPObj->param, pPObj, 0);
// } else {
// tscQueueAsyncRes(pPObj);
// }
assert(0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
/**
* if the query is a continue query (vnodeIndex > 0 for projection query) for next vnode, do the retrieval of
* data instead of returning to its invoker
*/
if (pMeterMetaInfo->vnodeIndex > 0 && tscProjectionQueryOnMetric(&pSql->cmd)) {
assert(pMeterMetaInfo->vnodeIndex < pMeterMetaInfo->pMetricMeta->numOfVnodes);
pSupporter->pState->numOfCompleted = 0; // reset the record value
pSql->fp = joinRetrieveCallback; // continue retrieve data
pSql->cmd.command = TSDB_SQL_FETCH;
tscProcessSql(pSql);
} else { // first retrieve from vnode during the secondary stage sub-query
if (pParentSql->fp == NULL) {
tsem_wait(&pParentSql->emptyRspSem);
tsem_wait(&pParentSql->emptyRspSem);
tsem_post(&pParentSql->rspSem);
} else {
// set the command flag must be after the semaphore been correctly set.
// pPObj->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
// if (pPObj->res.code == TSDB_CODE_SUCCESS) {
// (*pPObj->fp)(pPObj->param, pPObj, 0);
// } else {
// tscQueueAsyncRes(pPObj);
// }
assert(0);
}
}
}
}
......@@ -731,7 +817,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
strncpy(pTSBuf->path, path, PATH_MAX);
pTSBuf->f = fopen(pTSBuf->path, "r");
pTSBuf->f = fopen(pTSBuf->path, "r+");
if (pTSBuf->f == NULL) {
return NULL;
}
......@@ -774,7 +860,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
int64_t pos = ftell(pTSBuf->f);
int64_t pos = ftell(pTSBuf->f);
fread(buf, infoSize, 1, pTSBuf->f);
// the length value for each vnode is not kept in file, so does not set the length value
......@@ -790,13 +876,17 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
struct stat fileStat;
fstat(fileno(pTSBuf->f), &fileStat);
pTSBuf->fileSize = (uint32_t) fileStat.st_size;
pTSBuf->fileSize = (uint32_t)fileStat.st_size;
tsBufResetPos(pTSBuf);
// ascending by default
pTSBuf->cur.order = TSQL_SO_ASC;
pTSBuf->autoDelete = autoDelete;
tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete);
return pTSBuf;
}
......@@ -814,12 +904,22 @@ void tsBufDestory(STSBuf* pTSBuf) {
fclose(pTSBuf->f);
if (pTSBuf->autoDelete) {
tscTrace("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
unlink(pTSBuf->path);
} else {
tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
}
free(pTSBuf);
}
static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) {
int32_t last = pTSBuf->numOfVnodes - 1;
assert(last >= 0);
return &pTSBuf->pData[last];
}
static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) {
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
......@@ -836,10 +936,10 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
}
if (pTSBuf->numOfVnodes > 0) {
STSVnodeBlockInfo* pPrevBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1].info;
STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pPrevBlockInfo);
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info);
}
// set initial value for vnode block
......@@ -857,9 +957,9 @@ static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
// update the header info
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
return &pTSBuf->pData[pTSBuf->numOfVnodes - 1];
STSBufUpdateHeader(pTSBuf, &header);
return tsBufGetLastVnodeInfo(pTSBuf);
}
static void shrinkBuffer(STSList* ptsData) {
......@@ -906,8 +1006,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
pTSBuf->tsData.len = 0;
pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.compLen += blockSize;
pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.numOfBlocks += 1;
STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
pVnodeBlockInfoEx->info.compLen += blockSize;
pVnodeBlockInfoEx->info.numOfBlocks += 1;
shrinkBuffer(&pTSBuf->tsData);
}
......@@ -1008,13 +1110,13 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData
STSVnodeBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->numOfVnodes == 0 || pTSBuf->pData[pTSBuf->numOfVnodes - 1].info.vnode != vnodeId) {
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) {
writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData);
pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId);
} else {
pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1];
pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf);
}
assert(pBlockInfo->info.vnode == vnodeId);
......@@ -1037,6 +1139,8 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData
pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
// the size of raw data exceeds the size of the default prepared buffer, so
// during getBufBlock, the output buffer needs to be large enough.
if (ptsData->len >= ptsData->threshold) {
writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData);
......@@ -1053,10 +1157,10 @@ void tsBufFlush(STSBuf* pTSBuf) {
writeDataToDisk(pTSBuf);
shrinkBuffer(&pTSBuf->tsData);
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes - 1].info;
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info);
// save the ts order into header
STSBufFileHeader header = {
......@@ -1157,11 +1261,22 @@ static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex
}
STSBlock* pBlock = &pTSBuf->block;
size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
/*
* In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
*/
if (s > pTSBuf->tsData.allocSize) {
expandBuffer(&pTSBuf->tsData, s);
}
pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
assert(pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem);
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
pCur->vnodeIndex = vnodeIndex;
pCur->blockIndex = blockIndex;
......@@ -1203,20 +1318,20 @@ bool tsBufNextPos(STSBuf* pTSBuf) {
if (pCur->vnodeIndex == -1) {
if (pCur->order == TSQL_SO_ASC) {
tsBufGetBlock(pTSBuf, 0, 0);
if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return
if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return
tsBufResetPos(pTSBuf);
return false;
} else {
return true;
}
} else { // get the last timestamp record in the last block of the last vnode
} else { // get the last timestamp record in the last block of the last vnode
assert(pTSBuf->numOfVnodes > 0);
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1;
pCur->vnodeIndex = vnodeIndex;
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
......@@ -1318,7 +1433,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
tsBufFlush(pDestBuf);
// compared with the last vnode id
if (vnodeId != pDestBuf->pData[pDestBuf->numOfVnodes - 1].info.vnode) {
if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) {
int32_t oldSize = pDestBuf->numOfVnodes;
int32_t newSize = oldSize + pSrcBuf->numOfVnodes;
......@@ -1345,36 +1460,49 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
pDestBuf->numOfVnodes = newSize;
} else {
STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[pDestBuf->numOfVnodes - 1];
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf);
pBlockInfoEx->len += pSrcBuf->pData[0].len;
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
pBlockInfoEx->info.vnode = vnodeId;
}
int64_t r = fseek(pDestBuf->f, 0, SEEK_END);
int32_t r = fseek(pDestBuf->f, 0, SEEK_END);
assert(r == 0);
int64_t offset = getDataStartOffset();
int32_t size = pSrcBuf->fileSize - offset;
#ifdef LINUX
ssize_t rc = sendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size);
ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size);
#else
ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size);
#endif
if (rc == -1) {
printf("%s\n", strerror(errno));
tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
return -1;
}
if (rc != size) {
printf("%s\n", strerror(errno));
tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
return -1;
}
pDestBuf->numOfTotal += pSrcBuf->numOfTotal;
int32_t oldSize = pDestBuf->fileSize;
struct stat fileStat;
fstat(fileno(pDestBuf->f), &fileStat);
pDestBuf->fileSize = (uint32_t)fileStat.st_size;
assert(pDestBuf->fileSize == oldSize + size);
tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf,
pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete);
return 0;
}
......@@ -1391,7 +1519,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
fwrite((void*) pData, 1, len, pTSBuf->f);
fwrite((void*)pData, 1, len, pTSBuf->f);
pTSBuf->fileSize += len;
pTSBuf->tsOrder = order;
......
......@@ -519,11 +519,12 @@ int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMe
*str += index;
if (numOfRows >= maxRows || pDataBlock->size + pMeterMeta->rowSize >= pDataBlock->nAllocSize) {
int32_t tSize = tscAllocateMemIfNeed(pDataBlock, pMeterMeta->rowSize);
if (0 == tSize) {
if (0 == tSize) { //TODO pass the correct error code to client
strcpy(error, "client out of memory");
*code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return -1;
}
maxRows += tSize;
}
......@@ -1091,8 +1092,10 @@ int doParserInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
// set the next sent data vnode index in data block arraylist
pCmd->vnodeIdx = 1;
pMeterMetaInfo->vnodeIndex = 1;
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
}
......@@ -1310,19 +1313,19 @@ void tscProcessMultiVnodesInsert(SSqlObj *pSql) {
int32_t code = TSDB_CODE_SUCCESS;
/* the first block has been sent to server in processSQL function */
assert(pCmd->isInsertFromFile != -1 && pCmd->vnodeIdx >= 1 && pCmd->pDataBlocks != NULL);
assert(pCmd->isInsertFromFile != -1 && pMeterMetaInfo->vnodeIndex >= 1 && pCmd->pDataBlocks != NULL);
if (pCmd->vnodeIdx < pCmd->pDataBlocks->nSize) {
if (pMeterMetaInfo->vnodeIndex < pCmd->pDataBlocks->nSize) {
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
for (int32_t i = pCmd->vnodeIdx; i < pDataBlocks->nSize; ++i) {
for (int32_t i = pMeterMetaInfo->vnodeIndex; i < pDataBlocks->nSize; ++i) {
pDataBlock = pDataBlocks->pData[i];
if (pDataBlock == NULL) {
continue;
}
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pCmd->vnodeIdx, pDataBlocks->nSize);
tscTrace("%p build submit data block failed, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex, pDataBlocks->nSize);
continue;
}
......
......@@ -409,7 +409,9 @@ static int insertStmtReset(STscStmt* pStmt) {
}
}
pCmd->batchSize = 0;
pCmd->vnodeIdx = 0;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
pMeterMetaInfo->vnodeIndex = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -422,6 +424,8 @@ static int insertStmtExecute(STscStmt* stmt) {
++pCmd->batchSize;
}
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (pCmd->pDataBlocks->nSize > 0) {
// merge according to vgid
int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks);
......@@ -436,7 +440,7 @@ static int insertStmtExecute(STscStmt* stmt) {
}
// set the next sent data vnode index in data block arraylist
pCmd->vnodeIdx = 1;
pMeterMetaInfo->vnodeIndex = 1;
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
}
......
......@@ -1020,7 +1020,10 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
setColumnOffsetValueInResultset(pCmd);
updateTagColumnIndex(pCmd, 0);
for(int32_t i = 0; i < pCmd->numOfTables; ++i) {
updateTagColumnIndex(pCmd, i);
}
break;
}
......@@ -1796,12 +1799,11 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, tSQLExprItem* pItem) {
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SColumnIndex index1 = {0, TSDB_TBNAME_COLUMN_INDEX};
SSchema colSchema = {.type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN};
strcpy(colSchema.name, TSQL_TBNAME_L);
pCmd->type = TSDB_QUERY_TYPE_STABLE_QUERY;
tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index1, &colSchema, true);
tscAddSpecialColumnForSelect(pCmd, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, true);
} else {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta;
......@@ -2739,15 +2741,20 @@ static bool functionCompatibleCheck(SSqlCmd* pCmd) {
void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
// update tags column index for group by tags
for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) {
int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx;
for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j];
if (tagColIndex == index) {
pCmd->groupbyExpr.columnInfo[i].colIdx = j;
break;
/*
* update tags column index for group by tags
* group by columns belong to this table
*/
if (pCmd->groupbyExpr.numOfGroupCols > 0 && pCmd->groupbyExpr.tableIndex == tableIndex) {
for (int32_t i = 0; i < pCmd->groupbyExpr.numOfGroupCols; ++i) {
int32_t index = pCmd->groupbyExpr.columnInfo[i].colIdx;
for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
int32_t tagColIndex = pMeterMetaInfo->tagColumnIndex[j];
if (tagColIndex == index) {
pCmd->groupbyExpr.columnInfo[i].colIdx = j;
break;
}
}
}
}
......@@ -2755,9 +2762,15 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) {
// update tags column index for expression
for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
if (!TSDB_COL_IS_TAG(pExpr->colInfo.flag)) { // not tags, continue
continue;
}
// not belongs to this table
if (pExpr->uid != pMeterMetaInfo->pMeterMeta->uid) {
continue;
}
for (int32_t j = 0; j < pMeterMetaInfo->numOfTags; ++j) {
if (pExpr->colInfo.colIdx == pMeterMetaInfo->tagColumnIndex[j]) {
......@@ -2766,6 +2779,32 @@ void updateTagColumnIndex(SSqlCmd* pCmd, int32_t tableIndex) {
}
}
}
// update join condition tag column index
SJoinInfo* pJoinInfo = &pCmd->tagCond.joinInfo;
if (!pJoinInfo->hasJoin) { // not join query
return;
}
assert(pJoinInfo->left.uid != pJoinInfo->right.uid);
// the join condition expression node belongs to this table(super table)
if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->left.uid) {
for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) {
if (pJoinInfo->left.tagCol == pMeterMetaInfo->tagColumnIndex[i]) {
pJoinInfo->left.tagCol = i;
}
}
}
if (pMeterMetaInfo->pMeterMeta->uid == pJoinInfo->right.uid) {
for(int32_t i = 0; i < pMeterMetaInfo->numOfTags; ++i) {
if (pJoinInfo->right.tagCol == pMeterMetaInfo->tagColumnIndex[i]) {
pJoinInfo->right.tagCol = i;
}
}
}
}
int32_t parseGroupbyClause(SSqlCmd* pCmd, tVariantList* pList) {
......@@ -2987,8 +3026,6 @@ typedef struct SCondExpr {
static int32_t getTimeRange(int64_t* stime, int64_t* etime, tSQLExpr* pRight, int32_t optr, int16_t timePrecision);
static int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr);
static int32_t tSQLExprNodeToString(tSQLExpr* pExpr, char** str) {
if (pExpr->nSQLOptr == TK_ID) { // column name
strncpy(*str, pExpr->colInfo.z, pExpr->colInfo.n);
......@@ -4018,129 +4055,128 @@ static void cleanQueryExpr(SCondExpr* pCondExpr) {
}
}
int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) {
SSqlCmd* pCmd = &pSql->cmd;
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
pCmd->stime = 0;
pCmd->etime = INT64_MAX;
int32_t ret = TSDB_CODE_SUCCESS;
const char* msg1 = "invalid expression";
SCondExpr condExpr = {0};
if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) {
return invalidSqlErrMsg(pCmd, msg1);
}
ret = doParseWhereClause(pSql, pExpr, &condExpr);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
static void doAddJoinTagsColumnsIntoTagList(SSqlCmd* pCmd, SCondExpr* pCondExpr) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (QUERY_IS_JOIN_QUERY(pCmd->type) && UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
SColumnIndex index = {0};
getColumnIndexByNameEx(&condExpr.pJoinExpr->pLeft->colInfo, pCmd, &index);
getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pLeft->colInfo, pCmd, &index);
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
int32_t columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns;
addRequiredTagColumn(pCmd, columnInfo, index.tableIndex);
getColumnIndexByNameEx(&condExpr.pJoinExpr->pRight->colInfo, pCmd, &index);
getColumnIndexByNameEx(&pCondExpr->pJoinExpr->pRight->colInfo, pCmd, &index);
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, index.tableIndex);
columnInfo = index.columnIndex - pMeterMetaInfo->pMeterMeta->numOfColumns;
addRequiredTagColumn(pCmd, columnInfo, index.tableIndex);
}
}
cleanQueryExpr(&condExpr);
static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SCondExpr* pCondExpr, tSQLExpr** pExpr) {
int32_t ret = TSDB_CODE_SUCCESS;
if (pCondExpr->pTagCond != NULL) {
for (int32_t i = 0; i < pCmd->numOfTables; ++i) {
tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i);
char c[TSDB_MAX_TAGS_LEN] = {0};
char* str = c;
if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) {
return ret;
}
tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c);
doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1);
}
pCondExpr->pTagCond = NULL;
}
return ret;
}
int32_t doParseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr, SCondExpr* condExpr) {
int32_t parseWhereClause(SSqlObj* pSql, tSQLExpr** pExpr) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
const char* msg = "invalid filter expression";
int32_t type = 0;
const char* msg1 = "invalid expression";
int32_t ret = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
pCmd->stime = 0;
pCmd->etime = INT64_MAX;
/*
* tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space
*/
//tags query condition may be larger than 512bytes, therefore, we need to prepare enough large space
SStringBuilder sb = {0};
SCondExpr condExpr = {0};
int32_t ret = TSDB_CODE_SUCCESS;
if ((ret = getQueryCondExpr(pCmd, pExpr, condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) {
return ret;
if ((*pExpr)->pLeft == NULL || (*pExpr)->pRight == NULL) {
return invalidSqlErrMsg(pCmd, msg1);
}
int32_t type = 0;
if ((ret = getQueryCondExpr(pCmd, pExpr, &condExpr, &type, (*pExpr)->nSQLOptr)) != TSDB_CODE_SUCCESS) {
return ret;
}
doCompactQueryExpr(pExpr);
// after expression compact, the expression tree is only include tag query condition
condExpr->pTagCond = (*pExpr);
condExpr.pTagCond = (*pExpr);
// 1. check if it is a join query
if ((ret = validateJoinExpr(pCmd, condExpr)) != TSDB_CODE_SUCCESS) {
if ((ret = validateJoinExpr(pCmd, &condExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 2. get the query time range
if ((ret = getTimeRangeFromExpr(pCmd, condExpr->pTimewindow)) != TSDB_CODE_SUCCESS) {
if ((ret = getTimeRangeFromExpr(pCmd, condExpr.pTimewindow)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 3. get the tag query condition
if (condExpr->pTagCond != NULL) {
for (int32_t i = 0; i < pCmd->numOfTables; ++i) {
tSQLExpr* p1 = extractExprForSTable(pExpr, pCmd, i);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, i);
char c[TSDB_MAX_TAGS_LEN] = {0};
char* str = c;
if ((ret = getTagCondString(pCmd, p1, &str)) != TSDB_CODE_SUCCESS) {
return ret;
}
tsSetMetricQueryCond(&pCmd->tagCond, pMeterMetaInfo->pMeterMeta->uid, c);
doCompactQueryExpr(pExpr);
tSQLExprDestroy(p1);
}
condExpr->pTagCond = NULL;
if ((ret = getTagQueryCondExpr(pCmd, &condExpr, pExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 4. get the table name query condition
if ((ret = getTablenameCond(pCmd, condExpr->pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
if ((ret = getTablenameCond(pCmd, condExpr.pTableCond, &sb)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 5. other column query condition
if ((ret = getColumnQueryCondInfo(pCmd, condExpr->pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) {
if ((ret = getColumnQueryCondInfo(pCmd, condExpr.pColumnCond, TK_AND)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 6. join condition
if ((ret = getJoinCondInfo(pSql, condExpr->pJoinExpr)) != TSDB_CODE_SUCCESS) {
if ((ret = getJoinCondInfo(pSql, condExpr.pJoinExpr)) != TSDB_CODE_SUCCESS) {
return ret;
}
// 7. query condition for table name
pCmd->tagCond.relType = (condExpr->relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
pCmd->tagCond.relType = (condExpr.relType == TK_AND) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
ret = setTableCondForMetricQuery(pSql, condExpr->pTableCond, condExpr->tableCondIndex, &sb);
ret = setTableCondForMetricQuery(pSql, condExpr.pTableCond, condExpr.tableCondIndex, &sb);
taosStringBuilderDestroy(&sb);
if (!validateFilterExpr(pCmd)) {
return invalidSqlErrMsg(pCmd, msg);
}
doAddJoinTagsColumnsIntoTagList(pCmd, &condExpr);
cleanQueryExpr(&condExpr);
return ret;
}
......@@ -4972,6 +5008,8 @@ int32_t parseLimitClause(SSqlObj* pSql, SQuerySQL* pQuerySql) {
// handle the limit offset value, validate the limit
pCmd->limit = pQuerySql->limit;
pCmd->globalLimit = pCmd->limit.limit;
pCmd->slimit = pQuerySql->slimit;
if (pCmd->slimit.offset < 0 || pCmd->limit.offset < 0) {
......@@ -5684,3 +5722,30 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg *pCreate) {
return TSDB_CODE_SUCCESS;
}
// for debug purpose
void tscPrintSelectClause(SSqlCmd* pCmd) {
if (pCmd == NULL || pCmd->exprsInfo.numOfExprs == 0) {
return;
}
char* str = calloc(1, 10240);
int32_t offset = 0;
offset += sprintf(str, "%d [", pCmd->exprsInfo.numOfExprs);
for(int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
int32_t size = sprintf(str + offset, "%s(%d)", aAggs[pExpr->functionId].aName, pExpr->colInfo.colId);
offset += size;
if (i < pCmd->exprsInfo.numOfExprs - 1) {
str[offset++] = ',';
}
}
str[offset] = ']';
printf("%s\n", str);
free(str);
}
......@@ -222,7 +222,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { // multiple vnode query
SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pCmd->vnodeIdx);
SVnodeSidList *vnodeList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
if (vnodeList != NULL) {
pVPeersDesc = vnodeList->vpeerDesc;
}
......@@ -528,7 +528,7 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
if (pMeterMetaInfo->pMeterMeta) // it may be deleted
pMeterMetaInfo->pMeterMeta->index = pSql->index;
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pSql->cmd.vnodeIdx);
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, pMeterMetaInfo->vnodeIndex);
pVnodeSidList->index = pSql->index;
}
} else {
......@@ -639,7 +639,7 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
static int tscLaunchMetricSubQueries(SSqlObj *pSql);
// todo merge with callback
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeIdx, SJoinSubquerySupporter *pSupporter) {
int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySupporter *pSupporter) {
SSqlCmd *pCmd = &pSql->cmd;
pSql->res.qhandle = 0x1;
......@@ -652,12 +652,13 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId
}
}
SSqlObj *pNew = createSubqueryObj(pSql, vnodeIdx, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
if (pNew == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pSql->pSubs[pSql->numOfSubs++] = pNew;
assert(pSql->numOfSubs <= pSupporter->pState->numOfTotal);
if (QUERY_IS_JOIN_QUERY(pCmd->type)) {
addGroupInfoForSubquery(pSql, pNew, tableIndex);
......@@ -694,8 +695,6 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId
pExpr->param->i64Key = tagColIndex;
pExpr->numOfParams = 1;
addRequiredTagColumn(pCmd, tagColIndex, 0);
// add the filter tag column
for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
SColumnBase *pColBase = &pSupporter->colList.pColList[i];
......@@ -707,7 +706,11 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, int16_t vnodeId
} else {
pNew->cmd.type |= TSDB_QUERY_TYPE_SUBQUERY;
}
#ifdef _DEBUG_VIEW
tscPrintSelectClause(&pNew->cmd);
#endif
return tscProcessSql(pNew);
}
......@@ -774,7 +777,7 @@ int tscProcessSql(SSqlObj *pSql) {
pSql->index = pMeterMetaInfo->pMeterMeta->index;
} else { // it must be the parent SSqlObj for super table query
if ((pSql->cmd.type & TSDB_QUERY_TYPE_SUBQUERY) != 0) {
int32_t idx = pSql->cmd.vnodeIdx;
int32_t idx = pMeterMetaInfo->vnodeIndex;
SVnodeSidList *pSidList = tscGetVnodeSidList(pMeterMetaInfo->pMetricMeta, idx);
pSql->index = pSidList->index;
}
......@@ -802,7 +805,7 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code;
}
int32_t code = tscLaunchJoinSubquery(pSql, i, 0, pSupporter);
int32_t code = tscLaunchJoinSubquery(pSql, i, pSupporter);
if (code != TSDB_CODE_SUCCESS) { // failed to create subquery object, quit query
tscDestroyJoinSupporter(pSupporter);
pSql->res.code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -944,7 +947,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
trs->pOrderDescriptor = pDesc;
trs->pState = pState;
trs->localBuffer = (tFilePage *)calloc(1, nBufferSize + sizeof(tFilePage));
trs->vnodeIdx = i;
trs->subqueryIndex = i;
trs->pParentSqlObj = pSql;
trs->pFinalColModel = pModel;
......@@ -971,7 +974,7 @@ int tscLaunchMetricSubQueries(SSqlObj *pSql) {
pNew->cmd.tsBuf = tsBufClone(pSql->cmd.tsBuf);
}
tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, pNew->cmd.vnodeIdx);
tscTrace("%p sub:%p launch subquery.orderOfSub:%d", pSql, pNew, trs->subqueryIndex);
tscProcessSql(pNew);
}
......@@ -1020,7 +1023,7 @@ static void tscAbortFurtherRetryRetrieval(SRetrieveSupport *trsupport, TAOS_RES
static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
SSqlObj *pPObj = trsupport->pParentSqlObj;
int32_t idx = trsupport->vnodeIdx;
int32_t subqueryIndex = trsupport->subqueryIndex;
assert(pSql != NULL);
......@@ -1035,27 +1038,27 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
pSql->res.numOfRows = 0;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; // disable retry efforts
tscTrace("%p query is cancelled, sub:%p, orderOfSub:%d abort retrieve, code:%d", trsupport->pParentSqlObj, pSql,
trsupport->vnodeIdx, trsupport->pState->code);
subqueryIndex, trsupport->pState->code);
}
if (numOfRows >= 0) { // current query is successful, but other sub query failed, still abort current query.
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, idx);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql, idx,
trsupport->pState->code);
tscTrace("%p sub:%p retrieve numOfRows:%d,orderOfSub:%d", pPObj, pSql, numOfRows, subqueryIndex);
tscError("%p sub:%p abort further retrieval due to other queries failure,orderOfSub:%d,code:%d", pPObj, pSql,
subqueryIndex, trsupport->pState->code);
} else {
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && trsupport->pState->code == TSDB_CODE_SUCCESS) {
/*
* current query failed, and the retry count is less than the available
* count, retry query clear previous retrieved data, then launch a new sub query
*/
tExtMemBufferClear(trsupport->pExtMemBuffer[idx]);
tExtMemBufferClear(trsupport->pExtMemBuffer[subqueryIndex]);
// clear local saved number of results
trsupport->localBuffer->numOfElems = 0;
pthread_mutex_unlock(&trsupport->queryMutex);
tscTrace("%p sub:%p retrieve failed, code:%d, orderOfSub:%d, retry:%d", trsupport->pParentSqlObj, pSql, numOfRows,
idx, trsupport->numOfRetry);
subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
if (pNew == NULL) {
......@@ -1072,7 +1075,7 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
} else { // reach the maximum retry count, abort
atomic_val_compare_exchange_32(&trsupport->pState->code, TSDB_CODE_SUCCESS, numOfRows);
tscError("%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d", pPObj, pSql,
numOfRows, idx, trsupport->pState->code);
numOfRows, subqueryIndex, trsupport->pState->code);
}
}
......@@ -1115,13 +1118,12 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
int32_t idx = trsupport->vnodeIdx;
int32_t idx = trsupport->subqueryIndex;
SSqlObj * pPObj = trsupport->pParentSqlObj;
tOrderDescriptor *pDesc = trsupport->pOrderDescriptor;
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) {
/* sql object has been released in error process, return immediately */
if (pSql == NULL) { // sql object has been released in error process, return immediately
tscTrace("%p subquery has been released, idx:%d, abort", pPObj, idx);
return;
}
......@@ -1172,7 +1174,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
} else { // all data has been retrieved to client
/* data in from current vnode is stored in cache and disk */
uint32_t numOfRowsFromVnode =
trsupport->pExtMemBuffer[pCmd->vnodeIdx]->numOfAllElems + trsupport->localBuffer->numOfElems;
trsupport->pExtMemBuffer[idx]->numOfAllElems + trsupport->localBuffer->numOfElems;
tscTrace("%p sub:%p all data retrieved from ip:%u,vid:%d, numOfRows:%d, orderOfSub:%d", pPObj, pSql, pSvd->ip,
pSvd->vnode, numOfRowsFromVnode, idx);
......@@ -1285,10 +1287,10 @@ void tscKillMetricQuery(SSqlObj *pSql) {
static void tscRetrieveDataRes(void *param, TAOS_RES *tres, int retCode);
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
SSqlObj *pNew = createSubqueryObj(pSql, trsupport->vnodeIdx, 0, tscRetrieveDataRes, trsupport, prevSqlObj);
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
pSql->pSubs[trsupport->vnodeIdx] = pNew;
pSql->pSubs[trsupport->subqueryIndex] = pNew;
}
return pNew;
......@@ -1298,8 +1300,8 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SRetrieveSupport *trsupport = (SRetrieveSupport *)param;
SSqlObj * pSql = (SSqlObj *)tres;
int32_t idx = pSql->cmd.vnodeIdx;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
int32_t idx = pMeterMetaInfo->vnodeIndex;
SVnodeSidList *vnodeInfo = NULL;
SVPeerDesc * pSvd = NULL;
......@@ -1317,7 +1319,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
code = trsupport->pState->code;
}
tscTrace("%p query cancelled or failed, sub:%p, orderOfSub:%d abort, code:%d", trsupport->pParentSqlObj, pSql,
trsupport->vnodeIdx, code);
trsupport->subqueryIndex, code);
}
/*
......@@ -1337,7 +1339,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSqlObj, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->vnodeIdx);
trsupport->pParentSqlObj, pSql, pSvd->vnode, trsupport->subqueryIndex);
trsupport->pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
......@@ -1353,17 +1355,17 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->vnodeIdx, trsupport->pState->code);
trsupport->subqueryIndex, trsupport->pState->code);
} else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", trsupport->pParentSqlObj, pSql,
trsupport->vnodeIdx, trsupport->pState->code);
trsupport->subqueryIndex, trsupport->pState->code);
}
tscRetrieveFromVnodeCallBack(param, tres, trsupport->pState->code);
} else { // success, proceed to retrieve data from dnode
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
trsupport->vnodeIdx);
trsupport->subqueryIndex);
taos_fetch_rows_a(tres, tscRetrieveFromVnodeCallBack, param);
}
......@@ -1438,7 +1440,7 @@ void tscUpdateVnodeInQueryMsg(SSqlObj *pSql, char *buf) {
pQueryMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode);
} else { // query on metric
SMetricMeta * pMetricMeta = pMeterMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx);
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
pQueryMsg->vnode = htons(pVnodeSidList->vpeerDesc[pSql->index].vnode);
}
}
......@@ -1461,7 +1463,7 @@ static int32_t tscEstimateQueryMsgSize(SSqlCmd *pCmd) {
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx);
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
int32_t meterInfoSize = (pMetricMeta->tagLen + sizeof(SMeterSidExtInfo)) * pVnodeSidList->numOfSids;
int32_t outputColumnSize = pCmd->fieldsInfo.numOfOutputCols * sizeof(SSqlFuncExprMsg);
......@@ -1506,12 +1508,12 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->numOfTagsCols = 0;
} else { // query on metric
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
if (pCmd->vnodeIdx < 0) {
tscError("%p error vnodeIdx:%d", pSql, pCmd->vnodeIdx);
if (pMeterMetaInfo->vnodeIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
return -1;
}
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx);
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
uint32_t vnodeId = pVnodeSidList->vpeerDesc[pVnodeSidList->index].vnode;
numOfMeters = pVnodeSidList->numOfSids;
......@@ -1693,7 +1695,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->colNameLen = htonl(len);
// set sids list
tscTrace("%p vid:%d, query on %d meters", pSql, pSql->cmd.vnodeIdx, numOfMeters);
tscTrace("%p vid:%d, query on %d meters", pSql, htons(pQueryMsg->vnode), numOfMeters);
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
#ifdef _DEBUG_VIEW
......@@ -1703,7 +1705,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pSMeterTagInfo->sid = htonl(pMeterMeta->sid);
pMsg += sizeof(SMeterSidExtInfo);
} else {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pCmd->vnodeIdx);
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex);
for (int32_t i = 0; i < numOfMeters; ++i) {
SMeterSidExtInfo *pMeterTagInfo = (SMeterSidExtInfo *)pMsg;
......@@ -1774,7 +1776,7 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
int32_t numOfBlocks = 0;
if (pCmd->tsBuf != NULL) {
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pCmd->tsBuf, pCmd->vnodeIdx);
STSVnodeBlockInfo *pBlockInfo = tsBufGetVnodeBlockInfo(pCmd->tsBuf, pMeterMetaInfo->vnodeIndex);
assert(QUERY_IS_JOIN_QUERY(pCmd->type) && pBlockInfo != NULL); // this query should not be sent
// todo refactor
......
......@@ -16,21 +16,21 @@
#include "os.h"
#include "tcache.h"
#include "tlog.h"
#include "tnote.h"
#include "trpc.h"
#include "tscJoinProcess.h"
#include "tscProfile.h"
#include "tscSQLParser.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscompression.h"
#include "tsocket.h"
#include "tscSQLParser.h"
#include "ttimer.h"
#include "tutil.h"
#include "tnote.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) {
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
STscObj *pObj;
taos_init();
......@@ -81,7 +81,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
return NULL;
}
memset(pObj, 0, sizeof(STscObj));
pObj->signature = pObj;
......@@ -113,7 +113,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
free(pObj);
return NULL;
}
memset(pSql, 0, sizeof(SSqlObj));
pSql->pTscObj = pObj;
pSql->signature = pSql;
......@@ -162,14 +162,14 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
void *taos = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL);
if (taos != NULL) {
STscObj* pObj = (STscObj*) taos;
STscObj *pObj = (STscObj *)taos;
// version compare only requires the first 3 segments of the version string
int32_t comparedSegments = 3;
char client_version[64] = {0};
char server_version[64] = {0};
int clientVersionNumber[4] = {0};
int serverVersionNumber[4] = {0};
char client_version[64] = {0};
char server_version[64] = {0};
int clientVersionNumber[4] = {0};
int serverVersionNumber[4] = {0};
strcpy(client_version, version);
strcpy(server_version, taos_get_server_info(taos));
......@@ -188,7 +188,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
return NULL;
}
for(int32_t i = 0; i < comparedSegments; ++i) {
for (int32_t i = 0; i < comparedSegments; ++i) {
if (clientVersionNumber[i] != serverVersionNumber[i]) {
tscError("taos:%p, the %d-th number of server version:%s not matched with client version:%s, close connection",
taos, i, server_version, version);
......@@ -225,7 +225,7 @@ void taos_close(TAOS *taos) {
}
}
int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
pRes->numOfRows = 1;
......@@ -251,7 +251,7 @@ int taos_query_imp(STscObj* pObj, SSqlObj* pSql) {
} else {
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(pObj), pObj);
}
if (pRes->code != TSDB_CODE_SUCCESS) {
tscFreeSqlObjPartial(pSql);
}
......@@ -271,9 +271,10 @@ int taos_query(TAOS *taos, const char *sqlstr) {
size_t sqlLen = strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) {
pRes->code = tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
pRes->code =
tscInvalidSQLErrMsg(pSql->cmd.payload, "sql too long", NULL); // set the additional error msg for invalid sql
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code;
}
......@@ -283,7 +284,7 @@ int taos_query(TAOS *taos, const char *sqlstr) {
if (sql == NULL) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer, reason:%s", pSql, strerror(errno));
tscError("%p SQL result:%d, %s pObj:%p", pSql, pRes->code, taos_errstr(taos), pObj);
return pRes->code;
}
......@@ -451,25 +452,56 @@ static void **getOneRowFromBuf(SSqlObj *pSql) {
return pRes->tsrow;
}
static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
bool hasData = true;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
while (1) {
bool hasData = true;
if (tscProjectionQueryOnMetric(pCmd)) {
bool allSubqueryExhausted = true;
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfo(pCmd1, 0);
assert(pCmd1->numOfTables == 1);
/*
* if the global limitation is not reached, and current result has not exhausted, or next more vnodes are
* available, go on
*/
if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows &&
(!tscHasReachLimitation(pSql->pSubs[i]))) {
allSubqueryExhausted = false;
break;
}
}
// in case inner join, if any subquery exhausted, query completed
if (pRes1->numOfRows == 0) {
hasData = !allSubqueryExhausted;
} else { // otherwise, in case inner join, if any subquery exhausted, query completed.
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
SSqlRes *pRes1 = &pSql->pSubs[i]->res;
if ((pRes1->row >= pRes1->numOfRows && tscHasReachLimitation(pSql->pSubs[i]) &&
tscProjectionQueryOnTable(&pSql->pSubs[i]->cmd)) ||
(pRes1->numOfRows == 0)) {
hasData = false;
break;
}
}
}
return hasData;
}
static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (!hasData) { // free all sub sqlobj
tscTrace("%p one subquery exhausted, free other %d subquery", pSql, pSql->numOfSubs - 1);
while (1) {
if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj
tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1);
SSubqueryState *pState = NULL;
......@@ -487,41 +519,32 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
}
if (pRes->tsrow == NULL) {
pRes->tsrow = malloc(sizeof(void *) * pCmd->exprsInfo.numOfExprs);
pRes->tsrow = malloc(POINTER_BYTES * pCmd->exprsInfo.numOfExprs);
}
bool success = false;
if (pSql->numOfSubs >= 2) {
// do merge result
if (pSql->numOfSubs >= 2) { // do merge result
SSqlRes *pRes1 = &pSql->pSubs[0]->res;
SSqlRes *pRes2 = &pSql->pSubs[1]->res;
while (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) {
if (pRes1->row < pRes1->numOfRows && pRes2->row < pRes2->numOfRows) {
doSetResultRowData(pSql->pSubs[0]);
doSetResultRowData(pSql->pSubs[1]);
TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
if (key1 == key2) {
success = true;
pRes1->row++;
pRes2->row++;
break;
} else if (key1 < key2) {
pRes1->row++;
} else if (key1 > key2) {
pRes2->row++;
}
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// printf("first:%lld, second:%lld\n", key1, key2);
success = true;
pRes1->row++;
pRes2->row++;
}
} else {
} else { // only one subquery
SSqlRes *pRes1 = &pSql->pSubs[0]->res;
doSetResultRowData(pSql->pSubs[0]);
success = (pRes1->row++ < pRes1->numOfRows);
}
if (success) {
if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
int32_t tableIndex = pRes->pColumnIndex[i].tableIndex;
int32_t columnIndex = pRes->pColumnIndex[i].columnIndex;
......@@ -531,7 +554,7 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) {
}
break;
} else {
} else { // continue retrieve data from vnode
tscFetchDatablockFromSubquery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
return NULL;
......@@ -553,9 +576,12 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
if (pRes->code == TSDB_CODE_SUCCESS) {
tscTrace("%p data from all subqueries have been retrieved to client", pSql);
return tscJoinResultsetFromBuf(pSql);
} else {
tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code);
return NULL;
}
......@@ -596,7 +622,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
// reach the maximum number of output rows, abort
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
if (tscHasReachLimitation(pSql)) {
return NULL;
}
......@@ -609,7 +635,15 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0));
if ((++pCmd->vnodeIdx) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
/*
* For project query with super table join, the numOfSub is equalled to the number of all subqueries, so
* we need to reset the value of numOfSubs to be 0.
*
* For super table join with projection query, if anyone of the subquery is exhausted, the query completed.
*/
pSql->numOfSubs = 0;
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pCmd->command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL);
tscProcessSql(pSql);
......@@ -617,7 +651,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// check!!!
if (rows != NULL || pCmd->vnodeIdx >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
if (rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
break;
}
}
......@@ -643,7 +677,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
nRows = taos_fetch_block_impl(res, rows);
while (*rows == NULL && tscProjectionQueryOnMetric(pCmd)) {
/* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
if (tscHasReachLimitation(pSql)) {
return 0;
}
......@@ -653,8 +687,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
pCmd->limit.limit = pSql->cmd.globalLimit - pRes->numOfTotal;
pCmd->limit.offset = pRes->offset;
if ((++pSql->cmd.vnodeIdx) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
if ((++pMeterMetaInfo->vnodeIndex) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT;
assert(pSql->fp == NULL);
tscProcessSql(pSql);
......@@ -662,7 +695,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
}
// check!!!
if (*rows != NULL || pCmd->vnodeIdx >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
if (*rows != NULL || pMeterMetaInfo->vnodeIndex >= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
break;
}
}
......@@ -888,12 +921,11 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
size_t xlen = strlen(row[i]);
size_t trueLen = MIN(xlen, fields[i].bytes);
memcpy(str + len, (char*) row[i], trueLen);
memcpy(str + len, (char *)row[i], trueLen);
str[len + trueLen] = ' ';
len += (trueLen + 1);
}
break;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%lld ", *((int64_t *)row[i]));
......@@ -950,7 +982,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return code;
}
static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t tblListLen) {
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object
tscRemoveAllMeterMetaInfo(&pSql->cmd, false);
tscCleanSqlCmd(&pSql->cmd);
......@@ -961,11 +993,11 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t
pCmd->count = 0;
int code = TSDB_CODE_INVALID_METER_ID;
char *str = (char*) tblNameList;
char *str = (char *)tblNameList;
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pCmd);
if ((code = tscAllocPayload(pCmd, tblListLen+16)) != TSDB_CODE_SUCCESS) {
if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -987,7 +1019,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t
strtrim(tblName);
len = (uint32_t)strlen(tblName);
SSQLToken sToken = {.n = len, .type = TK_ID, .z = tblName};
tSQLGetToken(tblName, &sToken.type);
......@@ -1031,7 +1063,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char* tblNameList, int32_t t
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
const int32_t MAX_TABLE_NAME_LENGTH = 12*1024*1024; // 12MB list
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
......@@ -1055,7 +1087,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return pRes->code;
}
char* str = calloc(1, tblListLen + 1);
char *str = calloc(1, tblListLen + 1);
if (str == NULL) {
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
tscError("%p failed to malloc sql string buffer", pSql);
......@@ -1063,7 +1095,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
}
strtolower(str, tableNameList);
pRes->code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
pRes->code = (uint8_t)tscParseTblNameList(pSql, str, tblListLen);
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
......
......@@ -244,8 +244,7 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) {
//for project query, only the following two function is allowed
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
int32_t functionId = pExpr->functionId;
int32_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ &&
functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TS) {
return false;
......@@ -255,6 +254,17 @@ bool tscProjectionQueryOnMetric(SSqlCmd* pCmd) {
return true;
}
bool tscProjectionQueryOnTable(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->fieldsInfo.numOfOutputCols; ++i) {
int32_t functionId = tscSqlExprGet(pCmd, i)->functionId;
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TS) {
return false;
}
}
return true;
}
bool tscIsPointInterpQuery(SSqlCmd* pCmd) {
for (int32_t i = 0; i < pCmd->exprsInfo.numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pCmd, i);
......@@ -1474,7 +1484,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
* data blocks have been submit to vnode.
*/
SDataBlockList* pDataBlocks = pCmd->pDataBlocks;
if (pDataBlocks == NULL || pCmd->vnodeIdx >= pDataBlocks->nSize) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0);
assert(pSql->cmd.numOfTables == 1);
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
tscTrace("%p object should be release since all data blocks have been submit", pSql);
return true;
} else {
......@@ -1487,10 +1501,11 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
}
SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index) {
if (pCmd == NULL || index >= pCmd->numOfTables || index < 0) {
if (pCmd == NULL || pCmd->numOfTables == 0) {
return NULL;
}
assert(index >= 0 && index <= pCmd->numOfTables && pCmd->pMeterInfo != NULL);
return pCmd->pMeterInfo[index];
}
......@@ -1533,7 +1548,7 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta*
pMeterMetaInfo->numOfTags = numOfTags;
if (tags != NULL) {
memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(int16_t) * numOfTags);
memcpy(pMeterMetaInfo->tagColumnIndex, tags, sizeof(pMeterMetaInfo->tagColumnIndex[0]) * numOfTags);
}
pCmd->numOfTables += 1;
......@@ -1587,13 +1602,13 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0;
}
SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex, void (*fp)(), void* param,
SSqlObj* pPrevSql) {
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
if (pNew == NULL) {
tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex);
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex);
return NULL;
}
......@@ -1602,7 +1617,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
pNew->sqlstr = strdup(pSql->sqlstr);
if (pNew->sqlstr == NULL) {
tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex);
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex);
free(pNew);
return NULL;
......@@ -1627,15 +1642,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
tscTagCondCopy(&pNew->cmd.tagCond, &pCmd->tagCond);
if (tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE) != TSDB_CODE_SUCCESS) {
tscError("%p new subquery failed, vnodeIdx:%d, tableIndex:%d", pSql, vnodeIndex, tableIndex);
tscError("%p new subquery failed, tableIndex:%d, vnodeIndex:%d", pSql, tableIndex, pMeterMetaInfo->vnodeIndex);
tscFreeSqlObj(pNew);
return NULL;
}
tscColumnBaseInfoCopy(&pNew->cmd.colList, &pCmd->colList, (int16_t)tableIndex);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
// set the correct query type
if (pPrevSql != NULL) {
pNew->cmd.type = pPrevSql->cmd.type;
......@@ -1666,12 +1679,15 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
pNew->fp = fp;
pNew->param = param;
pNew->cmd.vnodeIdx = vnodeIndex;
SMeterMetaInfo* pMetermetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid);
#ifdef _DEBUG_VIEW
printf("the metricmeta key is:%s\n", key);
#endif
char* name = pMeterMetaInfo->name;
SMeterMetaInfo* pFinalInfo = NULL;
......@@ -1695,8 +1711,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int32_t vnodeIndex, int16_t tableIndex
assert(pFinalInfo->pMetricMeta != NULL);
}
tscTrace("%p new subquery %p, vnodeIdx:%d, tableIndex:%d, type:%d", pSql, pNew, vnodeIndex, tableIndex,
pNew->cmd.type);
tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d", pSql, pNew, tableIndex,
pMeterMetaInfo->vnodeIndex, pNew->cmd.type);
return pNew;
}
......@@ -1765,3 +1781,12 @@ int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *s
return TSDB_CODE_INVALID_SQL;
}
bool tscHasReachLimitation(SSqlObj* pSql) {
assert(pSql != NULL && pSql->cmd.globalLimit != 0);
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
return (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit);
}
......@@ -978,12 +978,19 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if (code == 1) {
//mTrace("table:%s, wait vgroup create finish", pCreate->meterId, code);
}
else if (code != 0) {
mError("table:%s, failed to create table, code:%d", pCreate->meterId, code);
} else if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_TABLE_ALREADY_EXIST) { // table already created when the second attempt to create table
STabObj* pMeter = mgmtGetMeter(pCreate->meterId);
assert(pMeter != NULL);
mWarn("table:%s, table already created, failed to create table, ts:%lld, code:%d", pCreate->meterId,
pMeter->createdTime, code);
} else { // other errors
mError("table:%s, failed to create table, code:%d", pCreate->meterId, code);
}
} else {
mTrace("table:%s, table is created by %s", pCreate->meterId, pConn->pUser->user);
//mLPrint("meter:%s is created by %s", pCreate->meterId, pConn->pUser->user);
}
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_TABLE_RSP, code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册