提交 3b22abb1 编写于 作者: H Haojun Liao

[td-255] refactor code.

上级 6d659815
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCLOCALMERGE_H
#define TDENGINE_TSCLOCALMERGE_H
#ifndef TDENGINE_TSCGLOBALMERGE_H
#define TDENGINE_TSCGLOBALMERGE_H
#ifdef __cplusplus
extern "C" {
......@@ -24,7 +24,7 @@ extern "C" {
#include "qFill.h"
#include "taosmsg.h"
#include "tlosertree.h"
#include "tsclient.h"
#include "qExecutor.h"
#define MAX_NUM_OF_SUBQUERY_RETRY 3
......@@ -38,7 +38,7 @@ typedef struct SLocalDataSource {
tFilePage filePage;
} SLocalDataSource;
typedef struct SLocalMerger {
typedef struct SGlobalMerger {
SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;
......@@ -48,20 +48,22 @@ typedef struct SLocalMerger {
tOrderDescriptor *pDesc;
tExtMemBuffer **pExtMemBuffer; // disk-based buffer
char *buf; // temp buffer
} SLocalMerger;
} SGlobalMerger;
struct SSqlObj;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql;
struct SSqlObj *pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
int32_t tscCreateGlobalMergerEnv(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
void tscDestroyGlobalMergerEnv(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType);
......@@ -71,13 +73,13 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
/*
* create local reducer to launch the second-stage reduce process at client site
*/
int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo *pQueryInfo, SLocalMerger **pMerger, int64_t id);
int32_t tscCreateGlobalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo *pQueryInfo, SGlobalMerger **pMerger, int64_t id);
void tscDestroyLocalMerger(SLocalMerger* pLocalMerger);
void tscDestroyGlobalMerger(SGlobalMerger* pMerger);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCLOCALMERGE_H
#endif // TDENGINE_TSCGLOBALMERGE_H
......@@ -20,13 +20,13 @@
extern "C" {
#endif
#include "tsched.h"
#include "exception.h"
#include "os.h"
#include "qExtbuffer.h"
#include "taosdef.h"
#include "tbuffer.h"
#include "tscLocalMerge.h"
#include "tscGlobalmerge.h"
#include "tsched.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
......@@ -63,7 +63,7 @@ typedef struct SJoinSupporter {
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SGroupbyExpr groupInfo; // group by info
SGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
......@@ -108,7 +108,7 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap);
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
......@@ -282,6 +282,7 @@ void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
void tscInitResForMerge(SSqlRes* pRes);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
......@@ -326,12 +327,15 @@ SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCHEMAUTIL_H
#define TDENGINE_TSCHEMAUTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosmsg.h"
#include "tsclient.h"
#include "ttoken.h"
/**
* get the number of tags of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfTags(const STableMeta* pTableMeta);
/**
* get the number of columns of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfColumns(const STableMeta* pTableMeta);
/**
* get the basic info of this table
* @param pTableMeta
* @return
*/
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta);
/**
* get the schema
* @param pTableMeta
* @return
*/
SSchema* tscGetTableSchema(const STableMeta* pTableMeta);
/**
* get the tag schema
* @param pMeta
* @return
*/
SSchema *tscGetTableTagSchema(const STableMeta *pMeta);
/**
* get the column schema according to the column index
* @param pMeta
* @param colIndex
* @return
*/
SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
/**
* get the column schema according to the column id
* @param pTableMeta
* @param colId
* @return
*/
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
/**
* create the table meta from the msg
* @param pTableMetaMsg
* @param size size of the table meta
* @return
*/
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCHEMAUTIL_H
......@@ -40,17 +40,9 @@ extern "C" {
// forward declaration
struct SSqlInfo;
struct SLocalMerger;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct STableComInfo {
uint8_t numOfTags;
uint8_t precision;
int16_t numOfColumns;
int32_t rowSize;
} STableComInfo;
typedef struct SNewVgroupInfo {
int32_t vgId;
int8_t inUse;
......@@ -66,34 +58,6 @@ typedef struct CChildTableMeta {
uint64_t suid; // super table id
} CChildTableMeta;
typedef struct STableMeta {
int32_t vgId;
STableId id;
uint8_t tableType;
char sTableName[TSDB_TABLE_FNAME_LEN]; // super table name
uint64_t suid; // super table id
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
uint32_t tableMetaSize;
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
......@@ -111,44 +75,6 @@ typedef struct SInternalField {
SExprInfo *pExpr;
} SInternalField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
// relation between tbname list and query condition, including : TK_AND or TK_OR
int16_t relType;
// tbname query condition, only support tbname query condition on one table
SCond tbnameCond;
// join condition, only support two tables join currently
SJoinInfo joinInfo;
// for different table, the query condition must be seperated
SArray *pCond;
} STagCond;
typedef struct SParamInfo {
int32_t idx;
uint8_t type;
......@@ -191,57 +117,6 @@ typedef struct STableDataBlocks {
SParamInfo *params;
} STableDataBlocks;
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
SOrderVal order;
int16_t fillType; // final result fill type
int16_t numOfTables;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
SQInfo* pQInfo; // global merge operator
SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
} SQueryInfo;
typedef struct {
STableMeta *pTableMeta;
SVgroupsInfo *pVgroupInfo;
......@@ -255,9 +130,13 @@ typedef struct SInsertStatementParam {
int8_t schemaAttached; // denote if submit block is built with table schema or not
STagData tagData; // NOTE: pTagData->data is used as a variant length array
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
char msg[512]; // error message
char *sql; // current sql statement position
uint32_t insertType; // insert data from [file|sql statement| bound statement]
uint64_t objectId; // sql object id
char *sql; // current sql statement position
} SInsertStatementParam;
// TODO extract sql parser supporter
......@@ -266,13 +145,9 @@ typedef struct {
uint8_t msgType;
SInsertStatementParam insertParam;
char reserve1[3]; // fix bus error on arm32
union {
int32_t count;
};
int32_t count; // todo remove it
char * curSql; // current sql, resume position of sql after parsing paused
char reserve2[3]; // fix bus error on arm32
int16_t numOfCols;
char reserve3[2]; // fix bus error on arm32
uint32_t allocSize;
......@@ -283,8 +158,6 @@ typedef struct {
SQueryInfo *pQueryInfo;
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
STagData tagData; // NOTE: pTagData->data is used as a variant length array
int32_t resColumnId;
} SSqlCmd;
......@@ -320,7 +193,7 @@ typedef struct {
TAOS_FIELD* final;
SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions
struct SLocalMerger *pLocalMerger;
struct SGlobalMerger *pMerger;
} SSqlRes;
typedef struct {
......@@ -447,7 +320,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
void destroyTableNameList(SSqlCmd* pCmd);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......@@ -479,7 +352,7 @@ void waitForQueryRsp(void *param, TAOS_RES *tres, int code);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, __async_cb_func_t fp, void *param, const char *sqlstr, size_t sqlLen);
void tscImportDataFromFile(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
struct SGlobalMerger* tscInitResObjForLocalQuery(int32_t numOfRes, int32_t rowLen, uint64_t id);
bool tscIsUpdateQuery(SSqlObj* pSql);
char* tscGetSqlStr(SSqlObj* pSql);
bool tscIsQueryWithLimit(SSqlObj* pSql);
......@@ -489,7 +362,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscInvalidOperationMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
......
......@@ -22,7 +22,7 @@
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsched.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "tsclient.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -58,7 +58,6 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pCmd->curSql = pSql->sqlstr;
pCmd->resColumnId = TSDB_RES_COL_ID;
int32_t code = tsParseSql(pSql, true);
......@@ -221,7 +220,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
tscResetForNextRetrieve(pRes);
// handle the sub queries of join query
// handle outer query based on the already retrieved nest query results.
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0};
......
......@@ -13,15 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tscLocalMerge.h"
#include "tscSubquery.h"
#include "os.h"
#include "texpr.h"
#include "tlosertree.h"
#include "tscGlobalmerge.h"
#include "tscSubquery.h"
#include "tscLog.h"
#include "tsclient.h"
#include "qUtil.h"
#define COLMODEL_GET_VAL(data, schema, rowId, colId) \
(data + (schema)->pFields[colId].offset * ((schema)->capacity) + (rowId) * (schema)->pFields[colId].field.bytes)
typedef struct SCompareParam {
SLocalDataSource **pLocalData;
tOrderDescriptor * pDesc;
......@@ -29,9 +33,18 @@ typedef struct SCompareParam {
int32_t groupOrderType;
} SCompareParam;
bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndex, int32_t index, char **buf);
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0;
size_t size = taosArrayGetSize(columnIndexList);
if (size > 0) {
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
}
// if ret == 0, means the result belongs to the same group
return (ret == 0);
}
int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
static int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
int32_t pLeftIdx = *(int32_t *)pLeft;
int32_t pRightIdx = *(int32_t *)pRight;
......@@ -57,16 +70,16 @@ int32_t treeComparator(const void *pLeft, const void *pRight, void *param) {
}
}
int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo* pQueryInfo, SLocalMerger **pMerger, int64_t id) {
int32_t tscCreateGlobalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo* pQueryInfo, SGlobalMerger **pMerger, int64_t id) {
if (pMemBuffer == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscDestroyGlobalMergerEnv(pMemBuffer, pDesc, numOfBuffer);
tscError("0x%"PRIx64" %p pMemBuffer is NULL", id, pMemBuffer);
return TSDB_CODE_TSC_APP_ERROR;
}
if (pDesc->pColumnModel == NULL) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscDestroyGlobalMergerEnv(pMemBuffer, pDesc, numOfBuffer);
tscError("0x%"PRIx64" no local buffer or intermediate result format model", id);
return TSDB_CODE_TSC_APP_ERROR;
}
......@@ -83,7 +96,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
}
if (numOfFlush == 0 || numOfBuffer == 0) {
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscDestroyGlobalMergerEnv(pMemBuffer, pDesc, numOfBuffer);
tscDebug("0x%"PRIx64" no data to retrieve", id);
return TSDB_CODE_SUCCESS;
}
......@@ -92,15 +105,15 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
tscError("0x%"PRIx64" Invalid value of buffer capacity %d and page size %d ", id, pDesc->pColumnModel->capacity,
pMemBuffer[0]->pageSize);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscDestroyGlobalMergerEnv(pMemBuffer, pDesc, numOfBuffer);
return TSDB_CODE_TSC_APP_ERROR;
}
*pMerger = (SLocalMerger *) calloc(1, sizeof(SLocalMerger));
*pMerger = (SGlobalMerger *) calloc(1, sizeof(SGlobalMerger));
if ((*pMerger) == NULL) {
tscError("0x%"PRIx64" failed to create local merge structure, out of memory", id);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscDestroyGlobalMergerEnv(pMemBuffer, pDesc, numOfBuffer);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -160,7 +173,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
// no data actually, no need to merge result.
if (idx == 0) {
tscDebug("0x%"PRIx64" retrieved no data", id);
tscLocalReducerEnvDestroy(pMemBuffer, pDesc, numOfBuffer);
tscDestroyGlobalMergerEnv(pMemBuffer, pDesc, numOfBuffer);
return TSDB_CODE_SUCCESS;
}
......@@ -198,7 +211,7 @@ int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tO
}
// restore the limitation value at the last stage
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->orderProjectQuery) {
pQueryInfo->limit.limit = pQueryInfo->clauseLimit;
pQueryInfo->limit.offset = pQueryInfo->prjOffset;
}
......@@ -297,28 +310,28 @@ int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePa
return 0;
}
void tscDestroyLocalMerger(SLocalMerger* pLocalMerger) {
if (pLocalMerger == NULL) {
void tscDestroyGlobalMerger(SGlobalMerger* pMerger) {
if (pMerger == NULL) {
return;
}
for (int32_t i = 0; i < pLocalMerger->numOfBuffer; ++i) {
tfree(pLocalMerger->pLocalDataSrc[i]);
for (int32_t i = 0; i < pMerger->numOfBuffer; ++i) {
tfree(pMerger->pLocalDataSrc[i]);
}
pLocalMerger->numOfBuffer = 0;
tscLocalReducerEnvDestroy(pLocalMerger->pExtMemBuffer, pLocalMerger->pDesc, pLocalMerger->numOfVnode);
pMerger->numOfBuffer = 0;
tscDestroyGlobalMergerEnv(pMerger->pExtMemBuffer, pMerger->pDesc, pMerger->numOfVnode);
pLocalMerger->numOfCompleted = 0;
pMerger->numOfCompleted = 0;
if (pLocalMerger->pLoserTree) {
tfree(pLocalMerger->pLoserTree->param);
tfree(pLocalMerger->pLoserTree);
if (pMerger->pLoserTree) {
tfree(pMerger->pLoserTree->param);
tfree(pMerger->pLoserTree);
}
tfree(pLocalMerger->buf);
tfree(pLocalMerger->pLocalDataSrc);
free(pLocalMerger);
tfree(pMerger->buf);
tfree(pMerger->pLocalDataSrc);
free(pMerger);
}
static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo* pQueryInfo, SColumnModel *pModel) {
......@@ -329,7 +342,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
}
// primary timestamp column is involved in final result
if (pQueryInfo->interval.interval != 0 || tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
if (pQueryInfo->interval.interval != 0 || pQueryInfo->orderProjectQuery) {
numOfGroupByCols++;
}
......@@ -392,7 +405,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SQueryInfo*
}
}
int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub,
int32_t tscCreateGlobalMergerEnv(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub,
tOrderDescriptor **pOrderDesc, uint32_t nBufferSizes, int64_t id) {
SSchema *pSchema = NULL;
SColumnModel *pModel = NULL;
......@@ -456,7 +469,7 @@ int32_t tscLocalReducerEnvCreate(SQueryInfo *pQueryInfo, tExtMemBuffer ***pMemBu
* @param pDesc
* @param numOfVnodes
*/
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes) {
void tscDestroyGlobalMergerEnv(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes) {
tOrderDescDestroy(pDesc);
for (int32_t i = 0; i < numOfVnodes; ++i) {
pMemBuffer[i] = destoryExtMemBuffer(pMemBuffer[i]);
......@@ -467,12 +480,12 @@ void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDe
/**
*
* @param pLocalMerge
* @param pMerger
* @param pOneInterDataSrc
* @param treeList
* @return the number of remain input source. if ret == 0, all data has been handled
*/
int32_t loadNewDataFromDiskFor(SLocalMerger *pLocalMerge, SLocalDataSource *pOneInterDataSrc,
int32_t loadNewDataFromDiskFor(SGlobalMerger *pMerger, SLocalDataSource *pOneInterDataSrc,
bool *needAdjustLoserTree) {
pOneInterDataSrc->rowIdx = 0;
pOneInterDataSrc->pageId += 1;
......@@ -489,17 +502,17 @@ int32_t loadNewDataFromDiskFor(SLocalMerger *pLocalMerge, SLocalDataSource *pOne
#endif
*needAdjustLoserTree = true;
} else {
pLocalMerge->numOfCompleted += 1;
pMerger->numOfCompleted += 1;
pOneInterDataSrc->rowIdx = -1;
pOneInterDataSrc->pageId = -1;
*needAdjustLoserTree = true;
}
return pLocalMerge->numOfBuffer;
return pMerger->numOfBuffer;
}
void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOneInterDataSrc,
void adjustLoserTreeFromNewData(SGlobalMerger *pMerger, SLocalDataSource *pOneInterDataSrc,
SLoserTreeInfo *pTree) {
/*
* load a new data page into memory for intermediate dataset source,
......@@ -507,7 +520,7 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn
*/
bool needToAdjust = true;
if (pOneInterDataSrc->filePage.num <= pOneInterDataSrc->rowIdx) {
loadNewDataFromDiskFor(pLocalMerge, pOneInterDataSrc, &needToAdjust);
loadNewDataFromDiskFor(pMerger, pOneInterDataSrc, &needToAdjust);
}
/*
......@@ -515,7 +528,7 @@ void adjustLoserTreeFromNewData(SLocalMerger *pLocalMerge, SLocalDataSource *pOn
* if the loser tree is rebuild completed, we do not need to adjust
*/
if (needToAdjust) {
int32_t leafNodeIdx = pTree->pNode[0].index + pLocalMerge->numOfBuffer;
int32_t leafNodeIdx = pTree->pNode[0].index + pMerger->numOfBuffer;
#ifdef _DEBUG_VIEW
printf("before adjust:\t");
......@@ -567,7 +580,7 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
}
}
static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
......@@ -579,7 +592,7 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) {
if (needToMergeRv(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
......@@ -654,45 +667,27 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S
tfree(add);
}
bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0;
size_t size = taosArrayGetSize(columnIndexList);
if (size > 0) {
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
}
// if ret == 0, means the result belongs to the same group
return (ret == 0);
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
return (pMerger->numOfBuffer == pMerger->numOfCompleted);
}
static bool isAllSourcesCompleted(SLocalMerger *pLocalMerge) {
return (pLocalMerge->numOfBuffer == pLocalMerge->numOfCompleted);
}
void tscInitResObjForLocalQuery(SSqlObj *pSql, int32_t numOfRes, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
if (pRes->pLocalMerger != NULL) {
tscDestroyLocalMerger(pRes->pLocalMerger);
pRes->pLocalMerger = NULL;
tscDebug("0x%"PRIx64" free local reducer finished", pSql->self);
SGlobalMerger* tscInitResObjForLocalQuery(int32_t numOfRes, int32_t rowLen, uint64_t id) {
SGlobalMerger *pMerger = calloc(1, sizeof(SGlobalMerger));
if (pMerger == NULL) {
tscDebug("0x%"PRIx64" free local reducer finished", id);
return NULL;
}
pRes->qId = 1; // hack to pass the safety check in fetch_row function
pRes->numOfRows = 0;
pRes->row = 0;
pRes->rspType = 0; // used as a flag to denote if taos_retrieved() has been called yet
pRes->pLocalMerger = (SLocalMerger *)calloc(1, sizeof(SLocalMerger));
/*
* One more byte space is required, since the sprintf function needs one additional space to put '\0' at
* the end of string
*/
size_t size = numOfRes * rowLen + 1;
pRes->pLocalMerger->buf = calloc(1, size);
pRes->data = pRes->pLocalMerger->buf;
pMerger->buf = calloc(1, size);
return pMerger;
}
// todo remove it
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) {
int32_t maxRowSize = MAX(rowSize, finalRowSize);
char* pbuf = calloc(1, (size_t)(pOutput->num * maxRowSize));
......@@ -736,9 +731,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_
return offset;
}
#define COLMODEL_GET_VAL(data, schema, rowId, colId) \
(data + (schema)->pFields[colId].offset * ((schema)->capacity) + (rowId) * (schema)->pFields[colId].field.bytes)
static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel *pModel, int32_t rowIndex,
int32_t maxRows) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
......@@ -760,7 +752,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SMultiwayMergeInfo *pInfo = pOperator->info;
SLocalMerger *pMerger = pInfo->pMerge;
SGlobalMerger *pMerger = pInfo->pMerge;
SLoserTreeInfo *pTree = pMerger->pLoserTree;
pInfo->binfo.pRes->info.rows = 0;
......@@ -844,7 +836,7 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL;
}
static bool isSameGroupRv(SArray* orderColumnList, SSDataBlock* pBlock, char** dataCols) {
static bool isSameGroup(SArray* orderColumnList, SSDataBlock* pBlock, char** dataCols) {
int32_t numOfCols = (int32_t) taosArrayGetSize(orderColumnList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(orderColumnList, i);
......@@ -892,7 +884,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
}
}
doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pAggInfo->pExistBlock);
doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pAggInfo->pExistBlock);
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pAggInfo->pExistBlock, 0,
&pAggInfo->hasGroupColData);
......@@ -913,7 +905,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
}
if (pAggInfo->hasGroupColData) {
bool sameGroup = isSameGroupRv(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup) {
*newgroup = true;
pAggInfo->hasDataBlockForNewGroup = true;
......@@ -927,7 +919,7 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor);
doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pBlock);
doExecuteFinalMerge(pOperator, pOperator->numOfOutput, pBlock);
savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData);
handleData = true;
}
......
......@@ -20,7 +20,7 @@
#include "tname.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "tsclient.h"
#include "taos.h"
#include "tscSubquery.h"
......@@ -71,7 +71,9 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
numOfRows = numOfRows + tscGetNumOfTags(pMeta);
}
tscInitResObjForLocalQuery(pSql, totalNumOfRows, rowLen);
pSql->res.pMerger = tscInitResObjForLocalQuery(totalNumOfRows, rowLen, pSql->self);
tscInitResForMerge(&pSql->res);
SSchema *pSchema = tscGetTableSchema(pMeta);
for (int32_t i = 0; i < numOfRows; ++i) {
......@@ -433,7 +435,8 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
if (strlen(ddl) == 0) {
}
tscInitResObjForLocalQuery(pSql, numOfRows, rowLen);
pSql->res.pMerger = tscInitResObjForLocalQuery(numOfRows, rowLen, pSql->self);
tscInitResForMerge(&pSql->res);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
char* dst = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * numOfRows;
......@@ -882,7 +885,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
TAOS_FIELD f = tscCreateField((int8_t)type, columnName, (int16_t)valueLength);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
tscInitResObjForLocalQuery(pSql, 1, (int32_t)valueLength);
pSql->res.pMerger = tscInitResObjForLocalQuery(1, (int32_t)valueLength, pSql->self);
tscInitResForMerge(&pSql->res);
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, 0);
pInfo->pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
......
此差异已折叠。
......@@ -1081,7 +1081,7 @@ static int insertStmtExecute(STscStmt* stmt) {
fillTablesColumnsNull(stmt->pSql);
int code = tscMergeTableDataBlocks(stmt->pSql, false);
int code = tscMergeTableDataBlocks(&stmt->pSql->cmd.insertParam, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1181,7 +1181,7 @@ static int insertBatchStmtExecute(STscStmt* pStmt) {
fillTablesColumnsNull(pStmt->pSql);
if ((code = tscMergeTableDataBlocks(pStmt->pSql, false)) != TSDB_CODE_SUCCESS) {
if ((code = tscMergeTableDataBlocks(&pStmt->pSql->cmd.insertParam, false)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1286,7 +1286,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
if (tscIsInsertData(pSql->sqlstr)) {
pStmt->isInsert = true;
pSql->cmd.numOfParams = 0;
pSql->cmd.insertParam.numOfParams = 0;
pSql->cmd.batchSize = 0;
registerSqlObj(pSql);
......@@ -1298,7 +1298,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
}
int32_t index = 0;
SStrToken sToken = tStrGetToken(pCmd->curSql, &index, false);
SStrToken sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n == 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -1388,7 +1388,7 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
tscDebug("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pSql->cmd.numOfParams = 0;
pSql->cmd.insertParam.numOfParams = 0;
pSql->cmd.batchSize = 0;
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
......@@ -1667,8 +1667,8 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
if (pStmt->isInsert) {
SSqlObj* pSql = pStmt->pSql;
SSqlCmd *pCmd = &pSql->cmd;
*nums = pCmd->numOfParams;
SSqlCmd *pCmd = &pSql->cmd;
*nums = pCmd->insertParam.numOfParams;
return TSDB_CODE_SUCCESS;
} else {
SNormalStmt* normal = &pStmt->normal;
......
......@@ -28,7 +28,7 @@
#include "tname.h"
#include "tscLog.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "tsclient.h"
#include "tstrbuild.h"
#include "ttoken.h"
......@@ -195,7 +195,7 @@ static bool validateDebugFlag(int32_t v) {
* is not needed in the final error message.
*/
static int32_t invalidOperationMsg(char* dstBuffer, const char* errMsg) {
return tscInvalidSQLErrMsg(dstBuffer, errMsg, NULL);
return tscInvalidOperationMsg(dstBuffer, errMsg, NULL);
}
static int setColumnFilterInfoForTimestamp(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tVariant* pVar) {
......@@ -6610,7 +6610,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
if (!findColumnIndex) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken->z);
return tscInvalidOperationMsg(pCmd->payload, "invalid tag name", sToken->z);
}
}
} else {
......@@ -7619,6 +7619,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
SExprInfo** p = NULL;
int32_t numOfExpr = 0;
......
......@@ -15,15 +15,15 @@
#include "os.h"
#include "tcmdtype.h"
#include "tlockfree.h"
#include "trpc.h"
#include "tscLocalMerge.h"
#include "tscGlobalmerge.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "tsclient.h"
#include "ttimer.h"
#include "tlockfree.h"
#include "qPlan.h"
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
......@@ -1598,7 +1598,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
return code;
}
if (pRes->pLocalMerger == NULL) { // no result from subquery, so abort here directly.
if (pRes->pMerger == NULL) { // no result from subquery, so abort here directly.
(*pSql->fp)(pSql->param, pSql, pRes->numOfRows);
return code;
}
......@@ -1615,15 +1615,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
taosArrayPush(group, &tableKeyInfo);
taosArrayPush(tableGroupInfo.pGroupList, &group);
// todo remove it
SExprInfo* list = calloc(tscNumOfExprs(pQueryInfo), sizeof(SExprInfo));
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExprInfo = tscExprGet(pQueryInfo, i);
list[i] = *pExprInfo;
}
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, list, &tableGroupInfo, NULL, NULL, pRes->pLocalMerger, MERGE_STAGE);
tfree(list);
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE);
}
uint64_t localQueryId = 0;
......@@ -2402,8 +2394,8 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
char *pMsg = (char *)pInfoMsg + sizeof(STableInfoMsg);
// tag data exists
if (autocreate && pSql->cmd.tagData.dataLen != 0) {
pMsg = serializeTagData(&pSql->cmd.tagData, pMsg);
if (autocreate && pSql->cmd.insertParam.tagData.dataLen != 0) {
pMsg = serializeTagData(&pSql->cmd.insertParam.tagData, pMsg);
}
pNew->cmd.payloadLen = (int32_t)(pMsg - (char*)pInfoMsg);
......
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tschemautil.h>
#include "os.h"
#include "taosmsg.h"
#include "tscLog.h"
......
......@@ -21,7 +21,7 @@
#include "tcompare.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "tsclient.h"
#include "qUtil.h"
#include "qPlan.h"
......@@ -604,7 +604,6 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) {
int16_t funcId = pExpr->base.functionId;
// add the invisible timestamp column
printf("--------read:%p\n", pExpr);
if ((pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) ||
(funcId != TSDB_FUNC_TS && funcId != TSDB_FUNC_TS_DUMMY && funcId != TSDB_FUNC_PRJ)) {
......@@ -2447,7 +2446,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
assert(pState->numOfSub > 0);
int32_t ret = tscLocalReducerEnvCreate(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
int32_t ret = tscCreateGlobalMergerEnv(pQueryInfo, &pMemoryBuf, pSql->subState.numOfSub, &pDesc, nBufferSize, pSql->self);
if (ret != 0) {
pRes->code = ret;
tscAsyncResultOnError(pSql);
......@@ -2460,7 +2459,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
if (pSql->pSubs == NULL) {
tfree(pSql->pSubs);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc,pState->numOfSub);
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc,pState->numOfSub);
tscAsyncResultOnError(pSql);
return ret;
......@@ -2527,13 +2526,13 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
tscError("0x%"PRIx64" failed to prepare subquery structure and launch subqueries", pSql->self);
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pState->numOfSub);
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code; // free all allocated resource
}
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscLocalReducerEnvDestroy(pMemoryBuf, pDesc, pState->numOfSub);
tscDestroyGlobalMergerEnv(pMemoryBuf, pDesc, pState->numOfSub);
doCleanupSubqueries(pSql, i);
return pRes->code;
}
......@@ -2698,7 +2697,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tstrerror(pParentSql->res.code));
// release allocated resource
tscLocalReducerEnvDestroy(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor,
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor,
pState->numOfSub);
tscFreeRetrieveSup(pSql);
......@@ -2773,7 +2772,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
SQueryInfo *pPQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
tscClearInterpInfo(pPQueryInfo);
code = tscCreateLocalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pLocalMerger, pParentSql->self);
code = tscCreateGlobalMerger(trsupport->pExtMemBuffer, pState->numOfSub, pDesc, pPQueryInfo, &pParentSql->res.pMerger, pParentSql->self);
pParentSql->res.code = code;
if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) {
......@@ -3500,9 +3499,8 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
return hasData;
}
// todo remove pExprs
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo,
SOperatorInfo* pSourceOperator, char* sql, void* merger, int32_t stage) {
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator,
char* sql, void* merger, int32_t stage) {
assert(pQueryInfo != NULL);
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
......
......@@ -14,18 +14,18 @@
*/
#include "tscUtil.h"
#include "tsched.h"
#include "hash.h"
#include "os.h"
#include "taosmsg.h"
#include "texpr.h"
#include "tkey.h"
#include "tmd5.h"
#include "tscLocalMerge.h"
#include "tscGlobalmerge.h"
#include "tscLog.h"
#include "tscProfile.h"
#include "tscSubquery.h"
#include "tschemautil.h"
#include "tsched.h"
#include "qTableMeta.h"
#include "tsclient.h"
#include "ttimer.h"
#include "ttokendef.h"
......@@ -976,16 +976,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
int32_t numOfOutput = (int32_t)tscNumOfExprs(px);
int32_t numOfCols = (int32_t)taosArrayGetSize(px->colList);
SQueriedTableInfo info = {
.colList = pColumnInfo,
.numOfCols = numOfCols,
};
SSchema* pSchema = tscGetTableSchema(px->pTableMetaInfo[0]->pTableMeta);
STableGroupInfo tableGroupInfo = {
.numOfTables = 1,
.pGroupList = taosArrayInit(1, POINTER_BYTES),
......@@ -1065,23 +1056,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
memcpy(schema, pSchema, numOfCol1*sizeof(SSchema));
}
SExprInfo* exprInfo = NULL;
/*int32_t code = */ createQueryFunc(&info, numOfOutput, &exprInfo, px->exprList->pData, NULL, px->type, NULL);
for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pex = taosArrayGetP(px->exprList, i);
int32_t colId = pex->base.colInfo.colId;
for(int32_t j = 0; j < pSourceOperator->numOfOutput; ++j) {
if (colId == schema[j].colId) {
pex->base.colInfo.colIndex = j;
break;
}
}
}
px->pQInfo = createQInfoFromQueryNode(px, exprInfo, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
tfree(pColumnInfo);
tfree(schema);
tfree(exprInfo);
}
uint64_t qId = 0;
......@@ -1158,31 +1135,34 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
pCmd->active = NULL;
}
void destroyTableNameList(SSqlCmd* pCmd) {
if (pCmd->insertParam.numOfTables == 0) {
assert(pCmd->insertParam.pTableNameList == NULL);
void destroyTableNameList(SInsertStatementParam* pInsertParam) {
if (pInsertParam->numOfTables == 0) {
assert(pInsertParam->pTableNameList == NULL);
return;
}
for(int32_t i = 0; i < pCmd->insertParam.numOfTables; ++i) {
tfree(pCmd->insertParam.pTableNameList[i]);
for(int32_t i = 0; i < pInsertParam->numOfTables; ++i) {
tfree(pInsertParam->pTableNameList[i]);
}
pCmd->insertParam.numOfTables = 0;
tfree(pCmd->insertParam.pTableNameList);
pInsertParam->numOfTables = 0;
tfree(pInsertParam->pTableNameList);
}
void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
pCmd->command = 0;
pCmd->numOfCols = 0;
pCmd->count = 0;
pCmd->curSql = NULL;
pCmd->msgType = 0;
destroyTableNameList(pCmd);
pCmd->insertParam.sql = NULL;
destroyTableNameList(&pCmd->insertParam);
pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, clearCachedMeta);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
tfree(pCmd->insertParam.tagData.data);
pCmd->insertParam.tagData.dataLen = 0;
tscFreeQueryInfo(pCmd, clearCachedMeta);
if (pCmd->pTableMetaMap != NULL) {
......@@ -1201,8 +1181,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
void tscFreeSqlResult(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
tscDestroyLocalMerger(pRes->pLocalMerger);
pRes->pLocalMerger = NULL;
tscDestroyGlobalMerger(pRes->pMerger);
pRes->pMerger = NULL;
tscDestroyResPointerInfo(pRes);
memset(&pSql->res, 0, sizeof(SSqlRes));
......@@ -1295,9 +1275,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeSqlResult(pSql);
tscResetSqlCmd(pCmd, false);
tfree(pCmd->tagData.data);
pCmd->tagData.dataLen = 0;
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload);
pCmd->allocSize = 0;
......@@ -1598,37 +1575,36 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
return result;
}
static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) {
pCmd->insertParam.numOfTables = (int32_t) taosHashGetSize(pCmd->insertParam.pTableBlockHashList);
if (pCmd->insertParam.pTableNameList == NULL) {
pCmd->insertParam.pTableNameList = calloc(pCmd->insertParam.numOfTables, POINTER_BYTES);
static void extractTableNameList(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList);
if (pInsertParam->pTableNameList == NULL) {
pInsertParam->pTableNameList = calloc(pInsertParam->numOfTables, POINTER_BYTES);
} else {
memset(pCmd->insertParam.pTableNameList, 0, pCmd->insertParam.numOfTables * POINTER_BYTES);
memset(pInsertParam->pTableNameList, 0, pInsertParam->numOfTables * POINTER_BYTES);
}
STableDataBlocks **p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
int32_t i = 0;
while(p1) {
STableDataBlocks* pBlocks = *p1;
tfree(pCmd->insertParam.pTableNameList[i]);
tfree(pInsertParam->pTableNameList[i]);
pCmd->insertParam.pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p1);
pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
}
if (freeBlockMap) {
pCmd->insertParam.pTableBlockHashList = tscDestroyBlockHashTable(pCmd->insertParam.pTableBlockHashList, false);
pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false);
}
}
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
STableDataBlocks** p = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
STableDataBlocks* pOneTableBlock = *p;
while(pOneTableBlock) {
......@@ -1641,7 +1617,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
INSERT_HEAD_SIZE, 0, &pOneTableBlock->tableName, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pSql->self, ret);
tscError("0x%"PRIx64" failed to prepare the data block buffer for merging table data, code:%d", pInsertParam->objectId, ret);
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
return ret;
......@@ -1659,7 +1635,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
dataBuf->pData = tmp;
memset(dataBuf->pData + dataBuf->size, 0, dataBuf->nAllocSize - dataBuf->size);
} else { // failed to allocate memory, free already allocated memory and return error code
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pSql->self, dataBuf->nAllocSize);
tscError("0x%"PRIx64" failed to allocate memory for merging submit block, size:%d", pInsertParam->objectId, dataBuf->nAllocSize);
taosHashCleanup(pVnodeDataBlockHashList);
tscDestroyBlockArrayList(pVnodeDataBlockList);
......@@ -1672,7 +1648,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
tscSortRemoveDataBlockDupRows(pOneTableBlock);
char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1);
tscDebug("0x%"PRIx64" name:%s, name:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql->self, tNameGetTableName(&pOneTableBlock->tableName),
tscDebug("0x%"PRIx64" name:%s, name:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pInsertParam->objectId, tNameGetTableName(&pOneTableBlock->tableName),
pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey));
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
......@@ -1684,7 +1660,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
pBlocks->schemaLen = 0;
// erase the empty space reserved for binary data
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pCmd->insertParam.schemaAttached);
int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, pInsertParam->schemaAttached);
assert(finalLen <= len);
dataBuf->size += (finalLen + sizeof(SSubmitBlk));
......@@ -1696,10 +1672,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
pBlocks->numOfRows = 0;
}else {
tscDebug("0x%"PRIx64" table %s data block is empty", pSql->self, pOneTableBlock->tableName.tname);
tscDebug("0x%"PRIx64" table %s data block is empty", pInsertParam->objectId, pOneTableBlock->tableName.tname);
}
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
p = taosHashIterate(pInsertParam->pTableBlockHashList, p);
if (p == NULL) {
break;
}
......@@ -1707,10 +1683,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap) {
pOneTableBlock = *p;
}
extractTableNameList(pCmd, freeBlockMap);
extractTableNameList(pInsertParam, freeBlockMap);
// free the table data blocks;
pCmd->insertParam.pDataBlocks = pVnodeDataBlockList;
pInsertParam->pDataBlocks = pVnodeDataBlockList;
taosHashCleanup(pVnodeDataBlockHashList);
return TSDB_CODE_SUCCESS;
......@@ -3039,6 +3015,15 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0;
}
void tscInitResForMerge(SSqlRes* pRes) {
pRes->qId = 1; // hack to pass the safety check in fetch_row function
pRes->rspType = 0; // used as a flag to denote if taos_retrieved() has been called yet
tscResetForNextRetrieve(pRes);
assert(pRes->pMerger != NULL);
pRes->data = pRes->pMerger->buf;
}
void registerSqlObj(SSqlObj* pSql) {
taosAcquireRef(tscRefId, pSql->pTscObj->rid);
pSql->self = taosAddRef(tscObjRef, pSql);
......@@ -3060,14 +3045,6 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd;
int32_t code = copyTagData(&pNew->cmd.tagData, &pSql->cmd.tagData);
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" new subquery failed, unable to malloc tag data, tableIndex:%d", pSql->self, 0);
free(pNew);
return NULL;
}
if (tscAddQueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
......@@ -3163,8 +3140,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pnCmd->insertParam.numOfTables = 0;
pnCmd->insertParam.pTableNameList = NULL;
pnCmd->insertParam.pTableBlockHashList = NULL;
pnCmd->tagData.data = NULL;
pnCmd->tagData.dataLen = 0;
if (tscAddQueryInfo(pnCmd) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3548,10 +3523,10 @@ int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* s
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
int32_t tscInvalidSQLErrMsg(char* msg, const char* additionalInfo, const char* sql) {
const char* msgFormat1 = "invalid SQL: %s";
const char* msgFormat2 = "invalid SQL: \'%s\' (%s)";
const char* msgFormat3 = "invalid SQL: \'%s\'";
int32_t tscInvalidOperationMsg(char* msg, const char* additionalInfo, const char* sql) {
const char* msgFormat1 = "invalid operation: %s";
const char* msgFormat2 = "invalid operation: \'%s\' (%s)";
const char* msgFormat3 = "invalid operation: \'%s\'";
const int32_t BACKWARD_CHAR_STEP = 0;
......@@ -4351,3 +4326,38 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt
return TSDB_CODE_SUCCESS;
}
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src) {
assert(pExisted != NULL && src != NULL);
if (pExisted->numOfEps != src->numOfEps) {
return false;
}
for(int32_t i = 0; i < pExisted->numOfEps; ++i) {
if (pExisted->ep[i].port != src->epAddr[i].port) {
return false;
}
if (strncmp(pExisted->ep[i].fqdn, src->epAddr[i].fqdn, tListLen(pExisted->ep[i].fqdn)) != 0) {
return false;
}
}
return true;
}
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
assert(pVgroupMsg != NULL);
SNewVgroupInfo info = {0};
info.numOfEps = pVgroupMsg->numOfEps;
info.vgId = pVgroupMsg->vgId;
info.inUse = 0; // 0 is the default value of inUse in case of multiple replica
assert(info.numOfEps >= 1 && info.vgId >= 1);
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
info.ep[i].port = pVgroupMsg->epAddr[i].port;
}
return info;
}
\ No newline at end of file
......@@ -22,6 +22,7 @@
#include "qFill.h"
#include "qResultbuf.h"
#include "qSqlparser.h"
#include "qTableMeta.h"
#include "qTsbuf.h"
#include "query.h"
#include "taosdef.h"
......@@ -70,14 +71,6 @@ typedef struct SResultRowPool {
SArray* pData; // SArray<void*>
} SResultRowPool;
typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t numOfGroupCols; // todo remove it
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SGroupbyExpr;
typedef struct SResultRow {
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
int32_t offset:29; // row index in buffer page
......@@ -216,7 +209,7 @@ typedef struct SQueryAttr {
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query
SGroupbyExpr* pGroupbyExpr;
SGroupbyExpr *pGroupbyExpr;
SExprInfo* pExpr1;
SExprInfo* pExpr2;
......@@ -475,10 +468,10 @@ typedef struct SDistinctOperatorInfo {
int64_t outputCapacity;
} SDistinctOperatorInfo;
struct SLocalMerger;
struct SGlobalMerger;
typedef struct SMultiwayMergeInfo {
struct SLocalMerger *pMerge;
struct SGlobalMerger *pMerge;
SOptrBasicInfo binfo;
int32_t bufCapacity;
int64_t seed;
......
......@@ -16,6 +16,8 @@
#ifndef TDENGINE_QPLAN_H
#define TDENGINE_QPLAN_H
#include "qExecutor.h"
struct SQueryInfo;
typedef struct SQueryNodeBasicInfo {
......
......@@ -142,7 +142,7 @@ typedef struct SCreateTableSql {
} colInfo;
SArray *childTableInfo; // SArray<SCreatedTableInfo>
SSqlNode *pSelect;
SSqlNode *pSelect;
} SCreateTableSql;
typedef struct SAlterTableInfo {
......@@ -258,7 +258,6 @@ SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder);
SRelationInfo *setTableNameList(SRelationInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias);
//SRelationInfo *setSubquery(SRelationInfo* pFromInfo, SRelElementPair* p);
void *destroyRelationInfo(SRelationInfo* pFromInfo);
SRelationInfo *addSubqueryElem(SRelationInfo* pRelationInfo, SArray* pSub, SStrToken* pAlias);
......
#ifndef TDENGINE_QTABLEUTIL_H
#define TDENGINE_QTABLEUTIL_H
#include "tsdb.h" //todo tsdb should not be here
#include "qSqlparser.h"
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
// relation between tbname list and query condition, including : TK_AND or TK_OR
int16_t relType;
// tbname query condition, only support tbname query condition on one table
SCond tbnameCond;
// join condition, only support two tables join currently
SJoinInfo joinInfo;
// for different table, the query condition must be seperated
SArray *pCond;
} STagCond;
typedef struct SGroupbyExpr {
int16_t tableIndex;
SArray* columnInfo; // SArray<SColIndex>, group by columns information
int16_t numOfGroupCols; // todo remove it
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SGroupbyExpr;
typedef struct STableComInfo {
uint8_t numOfTags;
uint8_t precision;
int16_t numOfColumns;
int32_t rowSize;
} STableComInfo;
typedef struct STableMeta {
int32_t vgId;
STableId id;
uint8_t tableType;
char sTableName[TSDB_TABLE_FNAME_LEN]; // super table name
uint64_t suid; // super table id
int16_t sversion;
int16_t tversion;
STableComInfo tableInfo;
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
uint32_t tableMetaSize;
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
struct SQInfo; // global merge operator
struct SQueryAttr; // query object
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
SOrderVal order;
int16_t fillType; // final result fill type
int16_t numOfTables;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int64_t * fillVal; // default value for fill
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
struct SQInfo* pQInfo; // global merge operator
struct SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
bool orderProjectQuery;
} SQueryInfo;
/**
* get the number of tags of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfTags(const STableMeta* pTableMeta);
/**
* get the number of columns of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfColumns(const STableMeta* pTableMeta);
/**
* get the basic info of this table
* @param pTableMeta
* @return
*/
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta);
/**
* get the schema
* @param pTableMeta
* @return
*/
SSchema* tscGetTableSchema(const STableMeta* pTableMeta);
/**
* get the tag schema
* @param pMeta
* @return
*/
SSchema *tscGetTableTagSchema(const STableMeta *pMeta);
/**
* get the column schema according to the column index
* @param pMeta
* @param colIndex
* @return
*/
SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t colIndex);
/**
* get the column schema according to the column id
* @param pTableMeta
* @param colId
* @return
*/
SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId);
/**
* create the table meta from the msg
* @param pTableMetaMsg
* @param size size of the table meta
* @return
*/
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg);
#endif // TDENGINE_QTABLEUTIL_H
#include "os.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "qPlan.h"
#include "qExecutor.h"
#include "qUtil.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tschemautil.h"
#include "qTableMeta.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "tsclient.h"
int32_t tscGetNumOfTags(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
if (pTableMeta->tableType == TSDB_NORMAL_TABLE) {
assert(tinfo.numOfTags == 0);
return 0;
}
if (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE) {
return tinfo.numOfTags;
}
assert(tinfo.numOfTags == 0);
return 0;
}
int32_t tscGetNumOfColumns(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
// table created according to super table, use data from super table
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
return tinfo.numOfColumns;
......@@ -54,10 +39,10 @@ SSchema *tscGetTableSchema(const STableMeta *pTableMeta) {
SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL && (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE));
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
assert(tinfo.numOfTags > 0);
return tscGetTableColumnSchema(pTableMeta, tinfo.numOfColumns);
}
......@@ -68,7 +53,7 @@ STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t colIndex) {
assert(pTableMeta != NULL);
SSchema* pSchema = (SSchema*) pTableMeta->schema;
return &pSchema[colIndex];
}
......@@ -88,7 +73,7 @@ SSchema* tscGetColumnSchemaById(STableMeta* pTableMeta, int16_t colId) {
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2 && pTableMetaMsg->numOfTags >= 0);
int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
......@@ -97,11 +82,11 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
pTableMeta->suid = pTableMetaMsg->suid;
pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags,
.precision = pTableMetaMsg->precision,
.numOfColumns = pTableMetaMsg->numOfColumns,
.numOfTags = pTableMetaMsg->numOfTags,
.precision = pTableMetaMsg->precision,
.numOfColumns = pTableMetaMsg->numOfColumns,
};
pTableMeta->id.tid = pTableMetaMsg->tid;
pTableMeta->id.uid = pTableMetaMsg->uid;
......@@ -109,69 +94,14 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) {
pTableMeta->tversion = pTableMetaMsg->tversion;
tstrncpy(pTableMeta->sTableName, pTableMetaMsg->sTableName, TSDB_TABLE_FNAME_LEN);
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < numOfTotalCols; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
return pTableMeta;
}
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src) {
assert(pExisted != NULL && src != NULL);
if (pExisted->numOfEps != src->numOfEps) {
return false;
}
for(int32_t i = 0; i < pExisted->numOfEps; ++i) {
if (pExisted->ep[i].port != src->epAddr[i].port) {
return false;
}
if (strncmp(pExisted->ep[i].fqdn, src->epAddr[i].fqdn, tListLen(pExisted->ep[i].fqdn)) != 0) {
return false;
}
}
return true;
}
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
assert(pVgroupMsg != NULL);
SNewVgroupInfo info = {0};
info.numOfEps = pVgroupMsg->numOfEps;
info.vgId = pVgroupMsg->vgId;
info.inUse = 0; // 0 is the default value of inUse in case of multiple replica
assert(info.numOfEps >= 1 && info.vgId >= 1);
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
info.ep[i].port = pVgroupMsg->epAddr[i].port;
}
return info;
}
// todo refactor
UNUSED_FUNC static FORCE_INLINE char* skipSegments(char* input, char delim, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
while (*input != 0 && *input++ != delim) {
};
}
return input;
}
UNUSED_FUNC static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
size_t len = 0;
while (*src != delimiter && *src != 0) {
*dst++ = *src++;
len++;
}
return len;
return pTableMeta;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册