提交 73b41cb4 编写于 作者: S Steven Li

Merge remote-tracking branch 'origin/develop' into feature/python-test-no-sudo

上级 e27ec1f2 905e2820
ver ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver-2.1.0 ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- ver- tq- release/ver- mac-enter-test autoaddcol_07161651 autoaddcol_07161108 autoaddcol_07150626 autoaddcol_0720_1434 autoaddcol_0719_1122 autoaddcol_0718_2208 TDB-ver0.8 TDB-ver-0.7
......@@ -33,6 +33,7 @@ Target/
# Doxygen Generated files
......@@ -26,10 +26,6 @@ matrix:
- python3-setuptools
- valgrind
- sudo apt update -y -qq
- sudo apt install -y net-tools python-pip python-setuptools python3-pip python3-setuptools valgrind
- mkdir debug
......@@ -44,7 +40,7 @@ matrix:
cd ${TRAVIS_BUILD_DIR}/debug
sudo make install || travis_terminate $?
make install || travis_terminate $?
pip install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python2/
pip3 install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python3/
......@@ -63,11 +59,14 @@ matrix:
tail -10 mem-error-out.txt
defiMemError=`grep -m 1 'definitely lost' mem-error-out.txt | awk '{print $7}'`
memError=`grep -m 1 'ERROR SUMMARY' mem-error-out.txt | awk '{print $4}'`
if [ -n "$memError" ]; then
if [ "$memError" -gt 23 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError. More than our threshold! ## ${NC} "
if [ "$memError" -gt 16 ] || [ "$defiMemError" -gt 0 ]; then
echo -e "${RED} ## Memory errors number valgrind reports is $memError.\
Definitely lost is $defiMemError. More than our threshold! ## ${NC}"
travis_terminate $memError
......@@ -131,10 +130,6 @@ matrix:
- python3-setuptools
- lcov
- sudo apt update -y -qq
- sudo apt install -y net-tools python-pip python-setuptools python3-pip python3-setuptools lcov
- mkdir debug
......@@ -149,7 +144,7 @@ matrix:
cd ${TRAVIS_BUILD_DIR}/debug
sudo make install || travis_terminate $?
make install || travis_terminate $?
pip install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python2/
pip3 install --user ${TRAVIS_BUILD_DIR}/src/connector/python/linux/python3/
......@@ -162,7 +157,7 @@ matrix:
travis_terminate $?
sudo pkill taosd
pkill taosd
sleep 1
......@@ -43,7 +43,7 @@ lib_files="${build_dir}/lib/libtaos.so.${version}"
header_files="${code_dir}/inc/taos.h ${code_dir}/inc/taoserror.h"
# Init file
......@@ -15,4 +15,4 @@ ADD_SUBDIRECTORY(vnode)
......@@ -57,8 +57,8 @@ typedef struct SJoinSubquerySupporter {
int64_t interval; // interval time
SLimitVal limit; // limit info
uint64_t uid; // query meter uid
SColumnBaseInfo colList; // previous query information
SSqlExprInfo exprsInfo;
SArray* colList; // previous query information
SArray* exprsInfo;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupbyExpr;
......@@ -69,8 +69,9 @@ typedef struct SJoinSubquerySupporter {
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes,
uint32_t offset);
......@@ -85,8 +86,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx);
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
......@@ -106,69 +106,68 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryOnSTable(SSqlCmd* pCmd);
bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t isTag);
void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
//void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index);
int32_t setMeterID(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertOrImportData(char* sqlstr);
/* use for keep current db info temporarily, for handle table with db prefix */
// todo remove it
void tscGetDBInfoFromMeterId(char* tableId, char* db);
int tscAllocPayload(SSqlCmd* pCmd, int size);
void tscFieldInfoSetValFromSchema(SFieldInfo* pFieldInfo, int32_t index, SSchema* pSchema);
void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* pField);
void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes);
void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible);
void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr);
void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionExpr* pExpr);
TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes);
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size);
void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src);
SFieldSupInfo* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField);
SFieldSupInfo* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field);
SFieldSupInfo* tscFieldInfoGetSupp(SFieldInfo* pFieldInfo, int32_t index);
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscGetResRowLength(SQueryInfo* pQueryInfo);
void tscClearFieldInfo(SFieldInfo* pFieldInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
int32_t tscNumOfFields(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2);
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);
int32_t tscGetResRowLength(SArray* pExprList);
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize);
SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId);
int16_t size, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize, bool isTagCol);
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid, bool deepcopy);
void* tscSqlExprDestroy(SSqlExpr* pExpr);
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo);
SArray* tscSqlExprCopy(const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprInfoDestroy(SArray* pExprInfo);
SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* colIndex);
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src);
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src);
SColumn* tscColumnClone(const SColumn* src);
SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex);
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex);
void tscColumnListDestroy(SArray* pColList);
void tscColumnBaseInfoCopy(SColumnBaseInfo* dst, const SColumnBaseInfo* src, int16_t tableIndex);
SColumnBase* tscColumnBaseInfoGet(SColumnBaseInfo* pColumnBaseInfo, int32_t index);
void tscColumnBaseInfoUpdateTableIndex(SColumnBaseInfo* pColList, int16_t tableIndex);
void tscColumnBaseInfoReserve(SColumnBaseInfo* pColumnBaseInfo, int32_t size);
void tscColumnBaseInfoDestroy(SColumnBaseInfo* pColumnBaseInfo);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
int32_t tscValidateName(SSQLToken* pToken);
......@@ -188,27 +187,25 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void tscSetFreeHeatBeat(STscObj* pObj);
bool tscShouldFreeHeatBeat(SSqlObj* pHb);
void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
bool tscShouldBeFreed(SSqlObj* pSql);
void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
void tscClearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo);
STableMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, int16_t numOfTags, int16_t* tags);
SVgroupsInfo* vgroupList, SArray* pTagCols);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscFreeQueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
......@@ -242,11 +239,6 @@ void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid);
TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, void (*fp)(void*, TAOS_RES*, int),
void* param, void** taos);
void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
......@@ -22,32 +22,30 @@ extern "C" {
#include "os.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tglobal.h"
#include "trpc.h"
#include "tsqlfunction.h"
#include "tutil.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "queryExecutor.h"
// forward declaration
struct SSqlInfo;
struct SLocalReducer;
typedef SCMSTableVgroupRspMsg SVgroupsInfo;
// data source from sql string or from file
enum {
typedef struct SSqlGroupbyExpr {
int16_t tableIndex;
int16_t numOfGroupCols;
SColIndex columnInfo[TSDB_MAX_TAGS]; // group by columns information
int16_t orderIndex; // order by column index
int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr;
typedef SCMSTableVgroupRspMsg SVgroupsInfo;
typedef struct STableComInfo {
uint8_t numOfTags;
......@@ -57,42 +55,44 @@ typedef struct STableComInfo {
} STableComInfo;
typedef struct STableMeta {
//super table if it is created according to super table, otherwise, tableInfo is used
union { struct STableMeta* pSTable; STableComInfo tableInfo; };
uint8_t tableType;
int16_t sversion;
// super table if it is created according to super table, otherwise, tableInfo is used
union {
struct STableMeta *pSTable;
STableComInfo tableInfo;
uint8_t tableType;
int16_t sversion;
SCMVgroupInfo vgroupInfo;
int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SVgroupsInfo* vgroupList;
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SVgroupsInfo *vgroupList;
* 1. keep the vnode index during the multi-vnode super table projection query
* 2. keep the vnode index for multi-vnode insertion
int32_t vgroupIndex;
char name[TSDB_TABLE_ID_LEN]; // (super) table name
int16_t numOfTags; // total required tags in query, including groupby tags
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
SArray* tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
int64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int16_t interResBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndex colInfo;
int64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
int16_t resType; // return value type
int16_t resBytes; // length of return value
int16_t interResBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
} SSqlExpr;
typedef struct SColumnIndex {
......@@ -100,46 +100,27 @@ typedef struct SColumnIndex {
int16_t columnIndex;
} SColumnIndex;
typedef struct SFieldInfo {
int16_t numOfOutputCols; // number of column in result
int16_t numOfAlloc; // allocated size
TAOS_FIELD *pFields;
typedef struct SFieldSupInfo {
bool visible;
SArithExprInfo *pArithExprInfo;
SSqlExpr * pSqlExpr;
} SFieldSupInfo;
* define if this column is belong to the queried result, it may be add by parser to faciliate
* the query process
* NOTE: these hidden columns always locate at the end of the output columns
bool * pVisibleCols;
int32_t numOfHiddenCols; // the number of column not belongs to the queried result columns
SSqlFunctionExpr** pExpr; // used for aggregation arithmetic express,such as count(*)+count(*)
SSqlExpr** pSqlExpr;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SArray *pFields; // SArray<TAOS_FIELD>
SArray *pSupportInfo; // SArray<SFieldSupInfo>
} SFieldInfo;
typedef struct SSqlExprInfo {
int16_t numOfAlloc;
int16_t numOfExprs;
SSqlExpr** pExprs;
} SSqlExprInfo;
typedef struct SColumnBase {
typedef struct SColumn {
SColumnIndex colIndex;
int32_t numOfFilters;
SColumnFilterInfo *filterInfo;
} SColumnBase;
typedef struct SColumnBaseInfo {
int16_t numOfAlloc;
int16_t numOfCols;
SColumnBase *pColList;
} SColumnBaseInfo;
struct SLocalReducer;
} SColumn;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
int32_t len; // length of tag query condition data
char * cond;
} SCond;
......@@ -166,7 +147,7 @@ typedef struct STagCond {
SJoinInfo joinInfo;
// for different table, the query condition must be seperated
SArray* pCond;
SArray *pCond;
} STagCond;
typedef struct SParamInfo {
......@@ -207,7 +188,7 @@ typedef struct STableDataBlocks {
SParamInfo *params;
} STableDataBlocks;
typedef struct SDataBlockList {
typedef struct SDataBlockList { // todo remove
uint32_t nSize;
uint32_t nAlloc;
STableDataBlocks **pData;
......@@ -218,14 +199,14 @@ typedef struct SQueryInfo {
uint16_t type; // query/insert/import type
char slidingTimeUnit;
int64_t etime, stime;
STimeWindow window;
int64_t intervalTime; // aggregation time interval
int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
SColumnBaseInfo colList;
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SSqlExprInfo exprsInfo;
SArray * exprsInfo; // SArray<SSqlExpr*>
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
......@@ -239,22 +220,16 @@ typedef struct SQueryInfo {
int64_t clauseLimit; // limit for current sub clause
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset;
int64_t prjOffset;
} SQueryInfo;
// data source from sql string or from file
enum {
typedef struct {
int command;
uint8_t msgType;
union {
bool existsCheck; // check if the table exists or not
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
......@@ -271,11 +246,11 @@ typedef struct {
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
SDataBlockList *pDataBlocks; // submit data blocks after parsing sql
char * curSql; // current sql, resume position of sql after parsing paused
void * pTableList; // referred table involved in sql
// for parameter ('?') binding and batch processing
int32_t batchSize;
int32_t numOfParams;
......@@ -286,50 +261,49 @@ typedef struct SResRec {
int numOfTotal;
} SResRec;
struct STSBuf;
typedef struct {
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp;
int32_t rspType;
int32_t rspLen;
uint64_t qhandle;
int64_t uid;
int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable
int32_t row;
int16_t numOfCols;
int16_t precision;
bool completed;
int32_t code;
int32_t numOfGroups;
SResRec * pGroupRec;
char * data;
void ** tsrow;
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex *pColumnIndex;
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp;
int32_t rspType;
int32_t rspLen;
uint64_t qhandle;
int64_t uid;
int64_t useconds;
int64_t offset; // offset value from vnode during projection query of stable
int32_t row;
int16_t numOfCols;
int16_t precision;
bool completed;
int32_t code;
int32_t numOfGroups;
SResRec * pGroupRec;
char * data;
void ** tsrow;
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
SColumnIndex * pColumnIndex;
struct SLocalReducer *pLocalReducer;
} SSqlRes;
typedef struct STscObj {
void * signature;
void * pTimer;
char mgmtIp[TSDB_USER_LEN];
uint16_t mgmtPort;
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char acctId[TSDB_DB_NAME_LEN];
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
struct SSqlObj *pSql;
struct SSqlObj *pHb;
struct SSqlObj *sqlList;
void * signature;
void * pTimer;
char mgmtIp[TSDB_USER_LEN];
uint16_t mgmtPort;
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char acctId[TSDB_DB_NAME_LEN];
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
void* pMgmtConn;
struct SSqlObj * pSql;
struct SSqlObj * pHb;
struct SSqlObj * sqlList;
struct SSqlStream *streamList;
pthread_mutex_t mutex;
pthread_mutex_t mutex;
} STscObj;
typedef struct SSqlObj {
......@@ -337,23 +311,23 @@ typedef struct SSqlObj {
STscObj *pTscObj;
void (*fp)();
void (*fetchFp)();
void * param;
uint32_t ip;
short vnode;
int64_t stime;
uint32_t queryId;
void * pStream;
void * pSubscription;
char * sqlstr;
char retry;
char maxRetry;
SRpcIpSet ipList;
char freed : 4;
char listed : 4;
tsem_t rspSem;
SSqlCmd cmd;
SSqlRes res;
uint8_t numOfSubs;
void * param;
uint32_t ip;
short vnode;
int64_t stime;
uint32_t queryId;
void * pStream;
void * pSubscription;
char * sqlstr;
char retry;
char maxRetry;
SRpcIpSet ipList;
char freed : 4;
char listed : 4;
tsem_t rspSem;
SSqlCmd cmd;
SSqlRes res;
uint8_t numOfSubs;
struct SSqlObj **pSubs;
struct SSqlObj * prev, *next;
} SSqlObj;
......@@ -386,14 +360,11 @@ typedef struct SSqlStream {
struct SSqlStream *prev, *next;
} SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret);
int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn);
void tscInitMsgsFp();
// tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgsFp();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
int tscProcessSql(SSqlObj *pSql);
......@@ -406,12 +377,8 @@ int tscProcessLocalCmd(SSqlObj *pSql);
int tscCfgDynamicOptions(char *msg);
int taos_retrieve(TAOS_RES *res);
* transfer function for metric query in stream computing, the function need to be change
* before send query message to vnode
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
......@@ -441,10 +408,13 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen);
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos);
void doAsyncQuery(STscObj *pObj, SSqlObj *pSql, void (*fp)(), void *param, const char *sqlstr, size_t sqlLen);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscKillSTableQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
......@@ -453,19 +423,20 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo);
extern void * pVnodeConn;
extern void * pTscMgmtConn;
extern void * tscCacheHandle;
extern int slaveIndex;
extern void * tscTmr;
extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern SRpcIpSet tscMgmtIpSet;
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
char * tscGetResultColumnChr(SSqlRes *pRes, SQueryInfo *pQueryInfo, int32_t column);
extern void * pVnodeConn;
extern void * tscCacheHandle;
extern void * tscTmr;
extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern SRpcIpSet tscMgmtIpSet;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
......@@ -281,7 +281,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(J
int code = taos_query(tscon, dst);
if (code != 0) {
jniError("jobj:%p, conn:%p, code:%d, msg:%s", jobj, tscon, code, taos_errstr(tscon));
jniError("jobj:%p, conn:%p, code:%s, msg:%s", jobj, tscon, tstrerror(code), taos_errstr(tscon));
} else {
......@@ -290,9 +290,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp(J
if (pSql->cmd.command == TSDB_SQL_INSERT) {
affectRows = taos_affected_rows(tscon);
jniTrace("jobj:%p, conn:%p, code:%d, affect rows:%d", jobj, tscon, code, affectRows);
jniTrace("jobj:%p, conn:%p, code:%s, affect rows:%d", jobj, tscon, tstrerror(code), affectRows);
} else {
jniTrace("jobj:%p, conn:%p, code:%d", jobj, tscon, code);
jniTrace("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
......@@ -46,7 +46,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->maxRetry = 1;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
pSql->fp = fp;
......@@ -287,9 +287,9 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
for (int i = 0; i < pCmd->numOfCols; ++i){
SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
if (pExpr != NULL) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
if (pSup->pSqlExpr != NULL) {
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
} else {
//todo add
......@@ -308,11 +308,12 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < pCmd->numOfCols; ++i) {
SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
if (pExpr != NULL) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
SFieldSupInfo* pSup = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
if (pSup->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i);
} else {
//todo add
// todo add
......@@ -332,7 +333,7 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
int code = pRes->code;
// in case of async insert, restore the user specified callback function
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
bool shouldFree = tscShouldBeFreed(pSql);
if (cmd == TSDB_SQL_INSERT) {
assert(pSql->fp != NULL);
......@@ -487,7 +488,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
} else {
tscTrace("%p get tableMeta successfully", pSql);
......@@ -2940,7 +2940,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
pCtx->aOutputBuf += pCtx->inputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
pCtx->aOutputBuf += pCtx->inputBytes;
......@@ -3294,29 +3294,26 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
char *arithmetic_callback_function(void *param, char *name, int32_t colId) {
char *getArithColumnData(void *param, const char* name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SSqlFunctionExpr *pExpr = pSupport->pExpr;
int32_t colIndex = -1;
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) {
if (colId == pExpr->binExprInfo.pReqColumns[i].colId) {
colIndex = pExpr->binExprInfo.pReqColumns[i].colIndex;
int32_t index = -1;
for (int32_t i = 0; i < pSupport->numOfCols; ++i) {
if (colId == pSupport->colList[i].colId) {
index = i;
assert(colIndex >= 0 && colId >= 0);
return pSupport->data[colIndex] + pSupport->offset * pSupport->elemSize[colIndex];
assert(index >= 0 && colId >= 0);
return pSupport->data[index] + pSupport->offset * pSupport->colList[index].bytes;
static void arithmetic_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size;
pCtx->param[1].pz = NULL;
......@@ -3327,10 +3324,9 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
sas->offset = index;
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order,
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, getArithColumnData);
pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
pCtx->aOutputBuf += pCtx->outputBytes;
#define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \
......@@ -130,13 +130,13 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSchema *pSchema = tscGetTableSchema(pMeta);
for (int32_t i = 0; i < numOfRows; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i, pSchema[i].name,
char *type = tDataTypeDesc[pSchema[i].type].aName;
pField = tscFieldInfoGetField(pQueryInfo, 1);
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i, type, pField->bytes);
int32_t bytes = pSchema[i].bytes;
......@@ -144,10 +144,10 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
bytes = bytes / TSDB_NCHAR_SIZE;
pField = tscFieldInfoGetField(pQueryInfo, 2);
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
*(int32_t *)(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 2) * totalNumOfRows + pField->bytes * i) = bytes;
pField = tscFieldInfoGetField(pQueryInfo, 3);
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3);
if (i >= tscGetNumOfColumns(pMeta) && tscGetNumOfTags(pMeta) != 0) {
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i, "tag",
strlen("tag") + 1);
......@@ -162,18 +162,18 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
char *pTagValue = tsGetTagsValue(pMeta);
for (int32_t i = numOfRows; i < totalNumOfRows; ++i) {
// field name
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 0) * totalNumOfRows + pField->bytes * i, pSchema[i].name,
// type name
pField = tscFieldInfoGetField(pQueryInfo, 1);
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 1);
char *type = tDataTypeDesc[pSchema[i].type].aName;
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 1) * totalNumOfRows + pField->bytes * i, type, pField->bytes);
// type length
int32_t bytes = pSchema[i].bytes;
pField = tscFieldInfoGetField(pQueryInfo, 2);
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 2);
if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
bytes = bytes / TSDB_NCHAR_SIZE;
......@@ -181,7 +181,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
*(int32_t *)(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 2) * totalNumOfRows + pField->bytes * i) = bytes;
// tag value
pField = tscFieldInfoGetField(pQueryInfo, 3);
pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 3);
char *target = pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i;
if (isNull(pTagValue, pSchema[i].type)) {
......@@ -236,31 +236,51 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, int32_t typeColLength,
int32_t noteColLength) {
int32_t rowLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->numOfCols = numOfCols;
SColumnIndex index = {0};
pSql->cmd.numOfCols = numOfCols;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_BINARY, "Field", TSDB_COL_NAME_LEN);
strncpy(f.name, "Field", TSDB_COL_NAME_LEN);
SFieldSupInfo* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN,
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 1, TSDB_DATA_TYPE_BINARY, "Type", typeColLength);
f.bytes = typeColLength;
strncpy(f.name, "Type", TSDB_COL_NAME_LEN);
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength,
typeColLength, false);
rowLen += typeColLength;
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 2, TSDB_DATA_TYPE_INT, "Length", sizeof(int32_t));
f.bytes = sizeof(int32_t);
strncpy(f.name, "Length", TSDB_COL_NAME_LEN);
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
sizeof(int32_t), false);
rowLen += sizeof(int32_t);
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 3, TSDB_DATA_TYPE_BINARY, "Note", noteColLength);
rowLen += noteColLength;
f.bytes = noteColLength;
strncpy(f.name, "Note", TSDB_COL_NAME_LEN);
//set the sqlexpr part
SColumnIndex index = {0};
pQueryInfo->fieldsInfo.pSqlExpr[0] = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN, TSDB_COL_NAME_LEN);
pQueryInfo->fieldsInfo.pSqlExpr[1] = tscSqlExprInsert(pQueryInfo, 1, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength, typeColLength);
pQueryInfo->fieldsInfo.pSqlExpr[2] = tscSqlExprInsert(pQueryInfo, 2, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), sizeof(int32_t));
pQueryInfo->fieldsInfo.pSqlExpr[3] = tscSqlExprInsert(pQueryInfo, 3, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength, noteColLength);
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pSqlExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength,
noteColLength, false);
rowLen += noteColLength;
return rowLen;
......@@ -280,7 +300,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
int32_t rowLen =
tscBuildMeterSchemaResultFields(pSql, NUM_OF_DESCRIBE_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, note_field_length);
return tscSetValueToResObj(pSql, rowLen);
......@@ -310,7 +330,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
int32_t totalNumOfResults = pMetricMeta->numOfTables;
int32_t rowLen = tscGetResRowLength(pQueryInfo);
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
......@@ -321,7 +341,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
STableIdInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
SColIndex *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
int16_t offsetId = pColIndex->colIdx;
......@@ -329,7 +349,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
char * val = NULL;//pSidExt->tags + vOffset[offsetId];
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val,
......@@ -350,17 +370,17 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
#if 0
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo);
int32_t rowLen = tscGetResRowLength(pQueryInfo->exprsInfo);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
int32_t rowIdx = 0;
for (int32_t i = 0; i < totalNumOfResults; ++i) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutput; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->colInfo.colIdx == -1 && pExpr->functionId == TSDB_FUNC_COUNT) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, i) * totalNumOfResults + pField->bytes * rowIdx,
&pMetricMeta->numOfTables, sizeof(pMetricMeta->numOfTables));
......@@ -388,7 +408,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
return pSql->res.code;
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, 0);
SSqlExpr *pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
if (pExpr->functionId == TSDB_FUNC_COUNT) {
return tscBuildMetricTagSqlFunctionResult(pSql);
} else {
......@@ -399,7 +419,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
static void tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
tscSetLocalQueryResult(pSql, pSql->pTscObj->user, pExpr->aliasName, TSDB_USER_LEN);
......@@ -414,7 +434,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
tscSetLocalQueryResult(pSql, db, pExpr->aliasName, TSDB_DB_NAME_LEN);
......@@ -422,14 +442,14 @@ static void tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
tscSetLocalQueryResult(pSql, v, pExpr->aliasName, tListLen(pSql->pTscObj->sversion));
static void tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
tscSetLocalQueryResult(pSql, version, pExpr->aliasName, strlen(version));
......@@ -449,7 +469,7 @@ static void tscProcessServStatus(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, 0);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
tscSetLocalQueryResult(pSql, "1", pExpr->aliasName, 2);
......@@ -462,13 +482,16 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = tscCreateField(TSDB_DATA_TYPE_BINARY, columnName, valueLength);
tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 0, TSDB_DATA_TYPE_BINARY, columnName, valueLength);
tscInitResObjForLocalQuery(pSql, 1, valueLength);
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0);
pQueryInfo->fieldsInfo.pSqlExpr[0] = pQueryInfo->exprsInfo.pExprs[0];
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, 0);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, 0);
pInfo->pSqlExpr = taosArrayGetP(pQueryInfo->exprsInfo, 0);
strncpy(pRes->data, val, pField->bytes);
......@@ -613,7 +613,7 @@ static void tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, in
// data block is disordered, sort it in ascending order
void sortRemoveDuplicates(STableDataBlocks *dataBuf) {
void tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf) {
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
// size is less than the total size, since duplicated rows may be removed yet.
......@@ -779,7 +779,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
STableMetaInfo *pSTableMeterMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
setMeterID(pSTableMeterMetaInfo, &sToken, pSql);
tscSetTableId(pSTableMeterMetaInfo, &sToken, pSql);
strncpy(pTag->name, pSTableMeterMetaInfo->name, TSDB_TABLE_ID_LEN);
code = tscGetTableMeta(pSql, pSTableMeterMetaInfo);
......@@ -922,7 +922,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
int32_t ret = setMeterID(pTableMetaInfo, &tableToken, pSql);
int32_t ret = tscSetTableId(pTableMetaInfo, &tableToken, pSql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
......@@ -1059,7 +1059,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
if ((code = setMeterID(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscSetTableId(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
......@@ -209,15 +209,15 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
SQqueryList *pQList = (SQqueryList *)pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SQueryDesc *pQdesc = pQList->qdesc;
SQqueryList *pQList = (SQqueryList *)pMsg;
pQList->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc*)(pMsg + sizeof(SQqueryList));
// We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQqueryList);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
......@@ -244,8 +244,9 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
SStreamList *pSList = (SStreamList *)pMsg;
SStreamDesc *pSdesc = pSList->sdesc;
pSList->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc*) (pMsg + sizeof(SStreamList));
pMsg += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList;
......@@ -161,9 +161,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags,
.numOfTags = pTableMetaMsg->numOfTags,
.precision = pTableMetaMsg->precision,
.numOfColumns = pTableMetaMsg->numOfColumns,
.precision = pTableMetaMsg->precision
pTableMeta->sid = pTableMetaMsg->sid;
......@@ -172,7 +172,7 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns;
for(int32_t i = 0; i < numOfTotalCols; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
......@@ -61,7 +61,9 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
* merge requirement. So, the final result in pRes structure is formatted in accordance with the pCmd object.
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
SQLFunctionCtx *pCtx = &pReducer->pCtx[i];
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i);
......@@ -108,10 +110,10 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SSqlRes *pRes, SLocalReducer *pRedu
int16_t n = 0;
int16_t tagLen = 0;
SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutputCols, POINTER_BYTES);
SQLFunctionCtx **pTagCtx = calloc(pQueryInfo->fieldsInfo.numOfOutput, POINTER_BYTES);
SQLFunctionCtx *pCtx = NULL;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId == TSDB_FUNC_TAG_DUMMY || pExpr->functionId == TSDB_FUNC_TS_DUMMY) {
tagLen += pExpr->resBytes;
......@@ -254,12 +256,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
// the input data format follows the old format, but output in a new format.
// so, all the input must be parsed as old format
pReducer->pCtx = (SQLFunctionCtx *)calloc(pQueryInfo->exprsInfo.numOfExprs, sizeof(SQLFunctionCtx));
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
pReducer->pCtx = (SQLFunctionCtx *)calloc(size, sizeof(SQLFunctionCtx));
pReducer->rowSize = pMemBuffer[0]->nElemSize;
if (pReducer->rowSize > pMemBuffer[0]->pageSize) {
assert(false); // todo fixed row size is larger than the minimum page size;
......@@ -279,7 +282,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16;
pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage));
int32_t finalRowLength = tscGetResRowLength(pQueryInfo);
int32_t finalRowLength = tscGetResRowLength(pQueryInfo->exprsInfo);
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize / finalRowLength;
assert(finalRowLength <= pReducer->rowSize);
......@@ -301,7 +304,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->pTempBuffer->numOfElems = 0;
pReducer->pResInfo = calloc((size_t)pQueryInfo->exprsInfo.numOfExprs, sizeof(SResultInfo));
pReducer->pResInfo = calloc(size, sizeof(SResultInfo));
tscCreateResPointerInfo(pRes, pQueryInfo);
tscInitSqlContext(pCmd, pRes, pReducer, pDesc);
......@@ -324,7 +327,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
......@@ -332,7 +335,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutputCols - pQueryInfo->groupbyExpr.numOfGroupCols;
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
pInterpoInfo->pTags[0] = (char *)pInterpoInfo->pTags + POINTER_BYTES * pQueryInfo->groupbyExpr.numOfGroupCols;
......@@ -462,7 +465,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
if (pLocalReducer->pCtx != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[i];
......@@ -480,7 +483,7 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
if (pLocalReducer->pResInfo != NULL) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......@@ -532,7 +535,7 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm
if (numOfGroupByCols > 0) {
int32_t startCols = pQueryInfo->fieldsInfo.numOfOutputCols - pQueryInfo->groupbyExpr.numOfGroupCols;
int32_t startCols = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
// tags value locate at the last columns
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
......@@ -612,8 +615,10 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
return pRes->code;
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
pSchema = (SSchema *)calloc(1, sizeof(SSchema) * size);
if (pSchema == NULL) {
tscError("%p failed to allocate memory", pSql);
......@@ -621,7 +626,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
int32_t rlen = 0;
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
for (int32_t i = 0; i < size; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
pSchema[i].bytes = pExpr->resBytes;
......@@ -634,8 +639,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
if (rlen != 0) {
capacity = nBufferSizes / rlen;
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
pModel = createColumnModel(pSchema, size, capacity);
size_t numOfSubs = pTableMetaInfo->vgroupList->numOfVgroups;
for (int32_t i = 0; i < numOfSubs; ++i) {
......@@ -649,8 +654,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
// final result depends on the fields number
memset(pSchema, 0, sizeof(SSchema) * pQueryInfo->exprsInfo.numOfExprs);
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
memset(pSchema, 0, sizeof(SSchema) * size);
for (int32_t i = 0; i < size; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema *p1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIndex);
......@@ -683,8 +688,8 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pSchema[i].bytes = bytes;
strcpy(pSchema[i].name, pModel->pFields[i].field.name);
*pFinalModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
*pFinalModel = createColumnModel(pSchema, size, capacity);
......@@ -782,7 +787,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
......@@ -800,7 +805,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
// for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
// TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
// TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
// int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
// char * src = pFinalDataPage->data + (pRes->numOfRows - 1) * pField->bytes + pRes->numOfRows * offset;
......@@ -817,8 +822,10 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRes *pRes, tFilePage **pResPages,
SLocalReducer *pLocalReducer) {
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
assert(offset == getColumnModelOffset(pLocalReducer->resColModel, i));
......@@ -894,7 +901,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, &pLocalReducer->interpolationInfo);
int32_t rowSize = tscGetResRowLength(pQueryInfo);
int32_t rowSize = tscGetResRowLength(pQueryInfo->exprsInfo);
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * rowSize);
pFinalDataPage->numOfElems = 0;
......@@ -905,18 +912,18 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
int64_t actualETime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
int64_t actualETime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutputCols);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
tFilePage **pResPages = malloc(POINTER_BYTES * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity);
char ** srcData = (char **)malloc((POINTER_BYTES + sizeof(int32_t)) * pQueryInfo->fieldsInfo.numOfOutputCols);
int32_t *functions = (int32_t *)((char *)srcData + pQueryInfo->fieldsInfo.numOfOutputCols * sizeof(void *));
char ** srcData = (char **)malloc((POINTER_BYTES + sizeof(int32_t)) * pQueryInfo->fieldsInfo.numOfOutput);
int32_t *functions = (int32_t *)((char *)srcData + pQueryInfo->fieldsInfo.numOfOutput * sizeof(void *));
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
srcData[i] =
pLocalReducer->pBufForInterpo + tscFieldInfoGetOffset(pQueryInfo, i) * pInterpoInfo->numOfRawDataInRows;
functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId;
......@@ -943,8 +950,8 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
newRows -= pQueryInfo->limit.offset;
if (pQueryInfo->limit.offset > 0) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
memmove(pResPages[i]->data, pResPages[i]->data + pField->bytes * pQueryInfo->limit.offset,
newRows * pField->bytes);
......@@ -992,8 +999,8 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
if (pQueryInfo->order.order == TSDB_ORDER_ASC) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
int16_t offset = getColumnModelOffset(pLocalReducer->resColModel, i);
memcpy(pRes->data + offset * pRes->numOfRows, pResPages[i]->data, pField->bytes * pRes->numOfRows);
......@@ -1003,7 +1010,7 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
pFinalDataPage->numOfElems = 0;
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
......@@ -1030,8 +1037,9 @@ static void savePreviousRow(SLocalReducer *pLocalReducer, tFilePage *tmpBuffer)
static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, bool needInit) {
// the tag columns need to be set before all functions execution
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
for (int32_t j = 0; j < size; ++j) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, j);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[j];
......@@ -1051,7 +1059,7 @@ static void doExecuteSecondaryMerge(SSqlCmd *pCmd, SLocalReducer *pLocalReducer,
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
for (int32_t j = 0; j < size; ++j) {
int32_t functionId = tscSqlExprGet(pQueryInfo, j)->functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
......@@ -1071,8 +1079,9 @@ static void handleUnprocessedRow(SSqlCmd *pCmd, SLocalReducer *pLocalReducer, tF
static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx) {
int64_t maxOutput = 0;
for (int32_t j = 0; j < pQueryInfo->exprsInfo.numOfExprs; ++j) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
// SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[j];
// if (pExpr == NULL) {
// assert(pQueryInfo->fieldsInfo.pExpr[j] != NULL);
......@@ -1107,7 +1116,9 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLocalReducer *pLocalReducer) {
int32_t maxBufSize = 0; // find the max tags column length to prepare the buffer
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
if (maxBufSize < pExpr->resBytes && pExpr->functionId == TSDB_FUNC_TAG) {
maxBufSize = pExpr->resBytes;
......@@ -1117,7 +1128,7 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
assert(maxBufSize >= 0);
char *buf = malloc((size_t)maxBufSize);
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
if (pExpr->functionId != TSDB_FUNC_TAG) {
......@@ -1139,7 +1150,9 @@ static void fillMultiRowsOfTagsVal(SQueryInfo *pQueryInfo, int32_t numOfRes, SLo
int32_t finalizeRes(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) {
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < size; ++k) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, k);
......@@ -1242,7 +1255,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutputCols - pQueryInfo->groupbyExpr.numOfGroupCols;
int32_t startIndex = pQueryInfo->fieldsInfo.numOfOutput - pQueryInfo->groupbyExpr.numOfGroupCols;
for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) {
int16_t offset = getColumnModelOffset(pModel, startIndex + i);
......@@ -1258,7 +1271,7 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { // reset output buffer to the beginning
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
pLocalReducer->pCtx[i].aOutputBuf =
pLocalReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pLocalReducer->resColModel->capacity;
......@@ -1282,7 +1295,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
// for group result interpolation, do not return if not data is generated
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
......@@ -1348,7 +1361,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
prevGroupCompleted) {
// if interpoType == TSDB_INTERPO_NONE, return directly
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->slidingTimeUnit, precision);
......@@ -1386,8 +1399,9 @@ static void doProcessResultInNextWindow(SSqlObj *pSql, int32_t numOfRes) {
SLocalReducer *pLocalReducer = pRes->pLocalReducer;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
for (int32_t k = 0; k < size; ++k) {
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, k);
SQLFunctionCtx *pCtx = &pLocalReducer->pCtx[k];
......@@ -66,7 +66,8 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
return NULL;
if (tscInitRpc(user, pass) != 0) {
void* pMgmtConn = NULL;
if (tscInitRpc(user, pass, &pMgmtConn) != 0) {
return NULL;
......@@ -118,6 +119,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
strtolower(pObj->db, tmp);
pObj->pMgmtConn = pMgmtConn;
pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
......@@ -129,7 +131,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
pSql->pTscObj = pObj;
pSql->signature = pSql;
pSql->maxRetry = TSDB_REPLICA_MAX_NUM;
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
tsem_init(&pSql->rspSem, 0, 0);
......@@ -326,16 +328,21 @@ int taos_num_fields(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) return 0;
int32_t num = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo == NULL) {
return 0;
return num;
SFieldInfo *pFieldsInfo = &pQueryInfo->fieldsInfo;
if (pFieldsInfo)
return (pFieldsInfo->numOfOutputCols - pFieldsInfo->numOfHiddenCols);
return 0;
size_t numOfCols = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfCols; ++i) {
SFieldSupInfo* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.pSupportInfo, i);
if (pInfo->visible) {
return num;
int taos_field_count(TAOS *taos) {
......@@ -357,11 +364,16 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo)
return pQueryInfo->fieldsInfo.pFields;
if (pQueryInfo == NULL) {
return NULL;
size_t numOfCols = tscNumOfFields(pQueryInfo);
if (numOfCols == 0) {
return NULL;
return pQueryInfo->fieldsInfo.pFields->pData;
int taos_retrieve(TAOS_RES *res) {
......@@ -414,8 +426,8 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
if (pQueryInfo == NULL)
return 0;
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i);
*rows = pRes->tsrow;
......@@ -446,20 +458,21 @@ static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pF
static char *getArithemicInputSrc(void *param, char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SSqlFunctionExpr * pExpr = pSupport->pExpr;
int32_t index = -1;
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->binExprInfo.pReqColumns[i].name) == 0) {
index = i;
assert(index >= 0 && index < pExpr->binExprInfo.numOfCols);
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
static char *getArithemicInputSrc(void *param, const char *name, int32_t colId) {
// SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
// SArithExprInfo * pExpr = pSupport->pArithExpr;
// int32_t index = -1;
// for (int32_t i = 0; i < pExpr->numOfCols; ++i) {
// if (strcmp(name, pExpr->colList[i].name) == 0) {
// index = i;
// break;
// }
// }
// assert(index >= 0 && index < pExpr->numOfCols);
// return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
return 0;
static void **doSetResultRowData(SSqlObj *pSql) {
......@@ -476,7 +489,8 @@ static void **doSetResultRowData(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//todo refactor move away
for(int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for(int32_t k = 0; k < numOfExprs; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
if (k > 0) {
......@@ -487,9 +501,9 @@ static void **doSetResultRowData(SSqlObj *pSql) {
int32_t num = 0;
for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) {
SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, i);
if (pInfo->pSqlExpr != NULL) {
pRes->tsrow[i] = tscGetResultColumnChr(pRes, pQueryInfo, i) + pInfo->pSqlExpr->resBytes * pRes->row;
} else {
......@@ -499,37 +513,37 @@ static void **doSetResultRowData(SSqlObj *pSql) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
transferNcharData(pSql, i, pField);
// calculate the result from serveral other columns
if (pQueryInfo->fieldsInfo.pExpr != NULL && pQueryInfo->fieldsInfo.pExpr[i] != NULL) {
// calculate the result from several other columns
if (pInfo->pArithExprInfo != NULL) {
SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
sas->offset = 0;
sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i];
sas->pArithExpr = pInfo->pArithExprInfo;
sas->numOfCols = sas->pExpr->binExprInfo.numOfCols;
// sas->numOfCols = sas->pArithExpr->numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
pRes->buffer[i] = malloc(tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i)->bytes);
for(int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIndex;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
// int32_t columnIndex = sas->pArithExpr->colList[k].colIndex;
// SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
// sas->elemSize[k] = pExpr->resBytes;
// sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc);
tExprTreeCalcTraverse(sas->pArithExpr->pExpr, 1, pRes->buffer[i], sas, TSDB_ORDER_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization
assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols);
assert(num <= pQueryInfo->fieldsInfo.numOfOutput);
pRes->row++; // index increase one-step
return pRes->tsrow;
......@@ -591,11 +605,13 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
while (1) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
if (pRes->tsrow == NULL) {
pRes->tsrow = calloc(pQueryInfo->exprsInfo.numOfExprs, POINTER_BYTES);
pRes->tsrow = calloc(numOfExprs, POINTER_BYTES);
bool success = false;
......@@ -619,7 +635,7 @@ static UNUSED_FUNC void **tscBuildResFromSubqueries(SSqlObj *pSql) {
if (success) { // current row of final output has been built, return to app
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t tableIndex = pRes->pColumnIndex[i].tableIndex;
int32_t columnIndex = pRes->pColumnIndex[i].columnIndex;
......@@ -949,7 +965,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
......@@ -1126,7 +1142,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
return code;
if ((code = setMeterID(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscSetTableId(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
......@@ -36,7 +36,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
static bool isProjectStream(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->functionId != TSDB_FUNC_PRJ) {
return false;
......@@ -86,7 +86,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
// failed to get meter/metric meta, retry in 10sec.
if (code != TSDB_CODE_SUCCESS) {
......@@ -116,18 +116,18 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if (isProjectStream(pQueryInfo)) {
* pQueryInfo->etime, which is the start time, does not change in case of
* pQueryInfo->window.ekey, which is the start time, does not change in case of
* repeat first execution, once the first execution failed.
pQueryInfo->stime = pStream->stime; // start time
pQueryInfo->window.skey = pStream->stime; // start time
pQueryInfo->etime = taosGetTimestamp(pStream->precision); // end time
if (pQueryInfo->etime > pStream->etime) {
pQueryInfo->etime = pStream->etime;
pQueryInfo->window.ekey = taosGetTimestamp(pStream->precision); // end time
if (pQueryInfo->window.ekey > pStream->etime) {
pQueryInfo->window.ekey = pStream->etime;
} else {
pQueryInfo->stime = pStream->stime - pStream->interval;
pQueryInfo->etime = pStream->stime - 1;
pQueryInfo->window.skey = pStream->stime - pStream->interval;
pQueryInfo->window.ekey = pStream->stime - 1;
// launch stream computing in a new thread
......@@ -147,7 +147,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
tscClearMeterMetaInfo(pTableMetaInfo, true);
tscClearTableMetaInfo(pTableMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
......@@ -177,7 +177,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
if (pSql == NULL || numOfRows < 0) {
int64_t retryDelayTime = tscGetRetryDelayTime(pStream->slidingTime, pStream->precision);
tscError("%p stream:%p, retrieve data failed, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retryDelayTime);
tscClearMeterMetaInfo(pTableMetaInfo, true);
tscClearTableMetaInfo(pTableMetaInfo, true);
tscSetRetryTimer(pStream, pStream->pSql, retryDelayTime);
......@@ -219,9 +219,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
void *oldPtr = pSql->res.data;
pSql->res.data = tmpRes;
for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->defaultVal[i]), pField->bytes, pField->type);
row[i] = pSql->res.data + offset;
......@@ -231,7 +231,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
row[0] = pRes->data;
// char result[512] = {0};
// taos_print_row(result, row, pQueryInfo->fieldsInfo.pFields, pQueryInfo->fieldsInfo.numOfOutputCols);
// taos_print_row(result, row, pQueryInfo->fieldsInfo.pFields, pQueryInfo->fieldsInfo.numOfOutput);
// tscPrint("%p stream:%p query result: %s", pSql, pStream, result);
tscTrace("%p stream:%p fetch result", pSql, pStream);
......@@ -259,7 +259,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
// release the metric/meter meta information reference, so data in cache can be updated
tscClearMeterMetaInfo(pTableMetaInfo, false);
tscClearTableMetaInfo(pTableMetaInfo, false);
tscSetNextLaunchTimer(pStream, pSql);
......@@ -425,10 +425,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
pStream->slidingTime = tsProjectExecInterval;
if (stime != 0) { // first projection start from the latest event timestamp
assert(stime >= pQueryInfo->stime);
assert(stime >= pQueryInfo->window.skey);
stime += 1; // exclude the last records from table
} else {
stime = pQueryInfo->stime;
stime = pQueryInfo->window.skey;
} else { // timewindow based aggregation stream
if (stime == 0) { // no data in meter till now
......@@ -548,7 +548,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream->precision = tinfo.precision;
pStream->ctime = taosGetTimestamp(pStream->precision);
pStream->etime = pQueryInfo->etime;
pStream->etime = pQueryInfo->window.ekey;
pSql->pStream = pStream;
......@@ -190,10 +190,10 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
if (pSupporter->f != NULL) {
......@@ -211,9 +211,11 @@ void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter) {
bool needSecondaryQuery(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->colList.numOfCols; ++i) {
SColumnBase* pBase = tscColumnBaseInfoGet(&pQueryInfo->colList, i);
if (pBase->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* base = taosArrayGet(pQueryInfo->colList, i);
if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return true;
......@@ -236,7 +238,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
for (int32_t i = 0; i < pSql->numOfSubs; ++i) {
pSupporter = pSql->pSubs[i]->param;
if (pSupporter->exprsInfo.numOfExprs > 0) {
if (taosArrayGetSize(pSupporter->exprsInfo) > 0) {
......@@ -262,7 +264,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSupporter = pPrevSub->param;
if (pSupporter->exprsInfo.numOfExprs == 0) {
if (taosArrayGetSize(pSupporter->exprsInfo) == 0) {
tscTrace("%p subIndex: %d, not need to launch query, ignore it", pSql, i);
......@@ -277,7 +279,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj
assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns
assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL);
......@@ -299,27 +301,27 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pQueryInfo->intervalTime = pSupporter->interval;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0);
tscColumnListCopy(pQueryInfo->colList, pSupporter->colList, 0);
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
pQueryInfo->exprsInfo = tscSqlExprCopy(pSupporter->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopy(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
pSupporter->exprsInfo.numOfExprs = 0;
pSupporter->fieldsInfo.numOfOutputCols = 0;
pSupporter->fieldsInfo.numOfOutput = 0;
* if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting.
if (pSupporter->exprsInfo.pExprs[0]->functionId != TSDB_FUNC_TS) {
SSqlExpr* pExpr = taosArrayGet(pSupporter->exprsInfo, 0);
if (pExpr->functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pNewQueryInfo, 0);
......@@ -342,10 +344,11 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
tscPrintSelectClause(pNew, 0);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vgroupIndex:%d, type:%d, exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, 0, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
taosArrayGetSize(pNewQueryInfo->exprsInfo), numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
//prepare the subqueries object failed, abort
......@@ -413,10 +416,10 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter
// update the query time range according to the join results on timestamp
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, int64_t st, int64_t et) {
assert(pQueryInfo->stime <= st && pQueryInfo->etime >= et);
assert(pQueryInfo->window.skey <= st && pQueryInfo->window.ekey >= et);
pQueryInfo->stime = st;
pQueryInfo->etime = et;
pQueryInfo->window.skey = st;
pQueryInfo->window.ekey = et;
static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
......@@ -688,9 +691,9 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t tableIndexOfSub = -1;
......@@ -707,7 +710,8 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
for (int32_t k = 0; k < pSubQueryInfo->exprsInfo.numOfExprs; ++k) {
size_t numOfExprs = taosArrayGetSize(pSubQueryInfo->exprsInfo);
for (int32_t k = 0; k < numOfExprs; ++k) {
SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
......@@ -723,11 +727,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// int32_t idx = pSql->cmd.vnodeIdx;
// SVnodeSidList *vnodeInfo = NULL;
// if (pTableMetaInfo->pMetricMeta != NULL) {
// vnodeInfo = tscGetVnodeSidList(pTableMetaInfo->pMetricMeta, idx - 1);
// }
SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)param;
// if (atomic_add_fetch_32(pSupporter->numOfComplete, 1) >=
......@@ -850,11 +849,17 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNewQueryInfo != NULL);
tscColumnBaseInfoUpdateTableIndex(&pNewQueryInfo->colList, 0);
tscColumnBaseInfoCopy(&pSupporter->colList, &pNewQueryInfo->colList, 0);
// update the table index
size_t num = taosArrayGetSize(pNewQueryInfo->colList);
for (int32_t i = 0; i < num; ++i) {
SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i);
pCol->colIndex.tableIndex = 0;
tscSqlExprCopy(&pSupporter->exprsInfo, &pNewQueryInfo->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
tscColumnListCopy(pSupporter->colList, pNewQueryInfo->colList, 0);
pSupporter->exprsInfo = tscSqlExprCopy(pNewQueryInfo->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopy(&pSupporter->fieldsInfo, &pNewQueryInfo->fieldsInfo);
tscTagCondCopy(&pSupporter->tagCond, &pNewQueryInfo->tagCond);
......@@ -867,8 +872,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
memset(&pNewQueryInfo->groupbyExpr, 0, sizeof(SSqlGroupbyExpr));
// this data needs to be transfer to support struct
pNewQueryInfo->fieldsInfo.numOfOutputCols = 0;
pNewQueryInfo->exprsInfo.numOfExprs = 0;
pNewQueryInfo->fieldsInfo.numOfOutput = 0;
// set the ts,tags that involved in join, as the output column of intermediate result
......@@ -888,27 +892,26 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
pExpr->numOfParams = 1;
// add the filter tag column
for (int32_t i = 0; i < pSupporter->colList.numOfCols; ++i) {
SColumnBase *pColBase = &pSupporter->colList.pColList[i];
if (pColBase->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
tscColumnBaseCopy(&pNewQueryInfo->colList.pColList[pNewQueryInfo->colList.numOfCols], pColBase);
size_t s = taosArrayGetSize(pSupporter->colList);
for (int32_t i = 0; i < s; ++i) {
SColumn *pCol = taosArrayGetP(pSupporter->colList, i);
if (pCol->numOfFilters > 0) { // copy to the pNew->cmd.colList if it is filtered.
SColumn* p = tscColumnClone(pCol);
taosArrayPush(pNewQueryInfo->colList, &p);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0);
size_t numOfCols = taosArrayGetSize(pNewQueryInfo->colList);
tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, "
"exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s",
pSql, pNew, tableIndex, pTableMetaInfo->vgroupIndex, pNewQueryInfo->type,
pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pTableMetaInfo[0]->name);
tscSqlExprNumOfExprs(pNewQueryInfo), numOfCols,
pNewQueryInfo->fieldsInfo.numOfOutput, pNewQueryInfo->pTableMetaInfo[0]->name);
tscPrintSelectClause(pNew, 0);
} else {
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY;
......@@ -1352,7 +1355,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
#ifdef _DEBUG_VIEW
printf("received data from vnode: %d rows\n", pRes->numOfRows);
printf("received data from vnode: %"PRIu64" rows\n", pRes->numOfRows);
SSrcColumnInfo colInfo[256] = {0};
tscGetSrcColumnInfo(colInfo, pQueryInfo);
......@@ -17,8 +17,6 @@
#include "taosmsg.h"
#include "tcache.h"
#include "trpc.h"
#include "taosdef.h"
#include "tsocket.h"
#include "tsystem.h"
#include "ttime.h"
#include "ttimer.h"
......@@ -33,16 +31,11 @@
// global, not configurable
void * pVnodeConn;
void * pVMeterConn;
void * pTscMgmtConn;
void * pSlaveConn;
void * tscCacheHandle;
int slaveIndex;
void * tscTmr;
void * tscQhandle;
void * tscCheckDiskUsageTmr;
int tsInsertHeadSize;
char tsLastUser[TSDB_USER_LEN + 1];
int tscNumOfThreads;
......@@ -50,12 +43,12 @@ static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
void tscCheckDiskUsage(void *para, void *unused) {
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
int32_t tscInitRpc(const char *user, const char *secret) {
int32_t tscInitRpc(const char *user, const char *secret, void** pMgmtConn) {
SRpcInit rpcInit;
char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
......@@ -67,7 +60,6 @@ int32_t tscInitRpc(const char *user, const char *secret) {
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.ufp = tscUpdateIpSet;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char*)user;
......@@ -82,30 +74,24 @@ int32_t tscInitRpc(const char *user, const char *secret) {
// not stop service, switch users
if (strcmp(tsLastUser, user) != 0 && pTscMgmtConn != NULL) {
tscTrace("switch user from %s to %s", user, tsLastUser);
pTscMgmtConn = NULL;
if (pTscMgmtConn == NULL) {
if (*pMgmtConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.ufp = tscUpdateIpSet;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 2000;
rpcInit.user = (char*)user;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.secret = secretEncrypt;
strcpy(tsLastUser, user);
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
*pMgmtConn = rpcOpen(&rpcInit);
if (*pMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
return -1;
......@@ -167,7 +153,6 @@ void taos_init_imp() {
slaveIndex = rand();
int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections;
if (tscEmbedded == 0) {
......@@ -219,11 +204,6 @@ void taos_cleanup() {
pVnodeConn = NULL;
if (pTscMgmtConn != NULL) {
pTscMgmtConn = NULL;
......@@ -395,7 +375,6 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
return 0;
int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0;
......@@ -48,11 +48,9 @@ extern int32_t tsEnableCoreFile;
extern int32_t tsTotalMemoryMB;
extern int32_t tsVersion;
extern int tscEmbedded;
extern int32_t tscEmbedded;
extern int64_t tsMsPerDay[2];
extern char tsMasterIp[];
extern char tsSecondIp[];
extern uint16_t tsMnodeDnodePort;
......@@ -61,94 +59,90 @@ extern uint16_t tsDnodeShellPort;
extern uint16_t tsDnodeMnodePort;
extern uint16_t tsSyncPort;
extern int tsStatusInterval;
extern int tsShellActivityTimer;
extern int tsVnodePeerHBTimer;
extern int tsMgmtPeerHBTimer;
extern int tsMeterMetaKeepTimer;
extern int tsMetricMetaKeepTimer;
extern float tsNumOfThreadsPerCore;
extern float tsRatioOfQueryThreads;
extern char tsPublicIp[];
extern char tsPrivateIp[];
extern short tsNumOfVnodesPerCore;
extern short tsNumOfTotalVnodes;
extern short tsCheckHeaderFile;
extern int32_t tsStatusInterval;
extern int32_t tsShellActivityTimer;
extern int32_t tsVnodePeerHBTimer;
extern int32_t tsMgmtPeerHBTimer;
extern int32_t tsMeterMetaKeepTimer;
extern int32_t tsMetricMetaKeepTimer;
extern float tsNumOfThreadsPerCore;
extern float tsRatioOfQueryThreads;
extern char tsPublicIp[];
extern char tsPrivateIp[];
extern int16_t tsNumOfVnodesPerCore;
extern int16_t tsNumOfTotalVnodes;
extern uint32_t tsPublicIpInt;
extern short tsAffectedRowsMod;
extern int tsSessionsPerVnode;
extern int tsAverageCacheBlocks;
extern int tsCacheBlockSize;
extern int tsRowsInFileBlock;
extern float tsFileBlockMinPercent;
extern short tsNumOfBlocksPerMeter;
extern short tsCommitTime; // seconds
extern short tsCommitLog;
extern short tsAsyncLog;
extern short tsCompression;
extern short tsDaysPerFile;
extern int tsDaysToKeep;
extern int tsReplications;
extern int tsNumOfMPeers;
extern int tsMaxShellConns;
extern int tsMaxTables;
extern int32_t tsCacheBlockSize;
extern int32_t tsTotalBlocks;
extern int32_t tsTablesPerVnode;
extern int16_t tsDaysPerFile;
extern int32_t tsDaysToKeep;
extern int32_t tsMinRowsInFileBlock;
extern int32_t tsMaxRowsInFileBlock;
extern int16_t tsCommitTime; // seconds
extern int32_t tsTimePrecision;
extern int16_t tsCompression;
extern int16_t tsCommitLog;
extern int32_t tsReplications;
extern int16_t tsAffectedRowsMod;
extern int32_t tsNumOfMPeers;
extern int32_t tsMaxShellConns;
extern int32_t tsMaxTables;
extern char tsLocalIp[];
extern char tsDefaultDB[];
extern char tsDefaultUser[];
extern char tsDefaultPass[];
extern int tsMaxMeterConnections;
extern int tsMaxVnodeConnections;
extern int tsMaxMgmtConnections;
extern int tsBalanceMonitorInterval;
extern int tsBalanceStartInterval;
extern int tsBalancePolicy;
extern int tsOfflineThreshold;
extern int tsMgmtEqualVnodeNum;
extern int tsEnableHttpModule;
extern int tsEnableMonitorModule;
extern int tsRestRowLimit;
extern int tsCompressMsgSize;
extern int tsMaxSQLStringLen;
extern int tsMaxNumOfOrderedResults;
extern int32_t tsMaxMeterConnections;
extern int32_t tsMaxVnodeConnections;
extern int32_t tsMaxMgmtConnections;
extern int32_t tsBalanceMonitorInterval;
extern int32_t tsBalanceStartInterval;
extern int32_t tsOfflineThreshold;
extern int32_t tsMgmtEqualVnodeNum;
extern int32_t tsEnableHttpModule;
extern int32_t tsEnableMonitorModule;
extern int32_t tsRestRowLimit;
extern int32_t tsMaxSQLStringLen;
extern int32_t tsCompressMsgSize;
extern int32_t tsMaxNumOfOrderedResults;
extern char tsSocketType[4];
extern int tsTimePrecision;
extern int tsMinSlidingTime;
extern int tsMinIntervalTime;
extern int tsMaxStreamComputDelay;
extern int tsStreamCompStartDelay;
extern int tsStreamCompRetryDelay;
extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsStreamCompRetryDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int tsProjectExecInterval;
extern int64_t tsMaxRetentWindow;
extern char tsHttpIp[];
extern char tsHttpIp[];
extern uint16_t tsHttpPort;
extern int tsHttpCacheSessions;
extern int tsHttpSessionExpire;
extern int tsHttpMaxThreads;
extern int tsHttpEnableCompress;
extern int tsHttpEnableRecordSql;
extern int tsTelegrafUseFieldNum;
extern int tsTscEnableRecordSql;
extern int tsAnyIp;
extern char tsMonitorDbName[];
extern char tsInternalPass[];
extern int tsMonitorInterval;
extern int tsNumOfLogLines;
extern int32_t tsHttpCacheSessions;
extern int32_t tsHttpSessionExpire;
extern int32_t tsHttpMaxThreads;
extern int32_t tsHttpEnableCompress;
extern int32_t tsHttpEnableRecordSql;
extern int32_t tsTelegrafUseFieldNum;
extern int32_t tsTscEnableRecordSql;
extern int32_t tsAnyIp;
extern char tsMonitorDbName[];
extern char tsInternalPass[];
extern int32_t tsMonitorInterval;
extern int32_t tsAsyncLog;
extern int32_t tsNumOfLogLines;
extern int32_t ddebugFlag;
extern int32_t mdebugFlag;
extern int32_t cdebugFlag;
......@@ -74,43 +74,40 @@ int32_t tsVnodePeerHBTimer = 1; // second
int32_t tsMgmtPeerHBTimer = 1; // second
int32_t tsMeterMetaKeepTimer = 7200; // second
int32_t tsMetricMetaKeepTimer = 600; // second
int tsRpcTimer = 300;
int tsRpcMaxTime = 600; // seconds;
int32_t tsRpcTimer = 300;
int32_t tsRpcMaxTime = 600; // seconds;
float tsNumOfThreadsPerCore = 1.0;
float tsRatioOfQueryThreads = 0.5;
char tsPublicIp[TSDB_IPv4ADDR_LEN] = {0};
char tsPrivateIp[TSDB_IPv4ADDR_LEN] = {0};
float tsNumOfThreadsPerCore = 1.0;
float tsRatioOfQueryThreads = 0.5;
char tsPublicIp[TSDB_IPv4ADDR_LEN] = {0};
char tsPrivateIp[TSDB_IPv4ADDR_LEN] = {0};
int16_t tsNumOfVnodesPerCore = 8;
int16_t tsNumOfTotalVnodes = TSDB_INVALID_VNODE_NUM;
int16_t tsCheckHeaderFile = 0;
#ifdef _TD_ARM_32_
int32_t tsSessionsPerVnode = 100;
int32_t tsTablesPerVnode = 100;
int32_t tsSessionsPerVnode = 1000;
int32_t tsTablesPerVnode = TSDB_DEFAULT_TABLES;
int32_t tsCacheBlockSize = 16384; // 256 columns
int32_t tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS;
int32_t tsCacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
int32_t tsTotalBlocks = TSDB_DEFAULT_TOTAL_BLOCKS;
int16_t tsDaysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
int32_t tsDaysToKeep = TSDB_DEFAULT_KEEP;
int32_t tsMinRowsInFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
int32_t tsMaxRowsInFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
int16_t tsCommitTime = TSDB_DEFAULT_COMMIT_TIME; // seconds
int32_t tsTimePrecision = TSDB_DEFAULT_PRECISION;
int16_t tsCompression = TSDB_DEFAULT_COMP_LEVEL;
int16_t tsCommitLog = TSDB_DEFAULT_CLOG_LEVEL;
int32_t tsReplications = TSDB_DEFAULT_REPLICA_NUM;
* Change the meaning of affected rows:
* 0: affected rows not include those duplicate records
* 1: affected rows include those duplicate records
int16_t tsAffectedRowsMod = 0;
int32_t tsRowsInFileBlock = 4096;
float tsFileBlockMinPercent = 0.05;
int16_t tsNumOfBlocksPerMeter = 100;
int16_t tsCommitTime = 3600; // seconds
int16_t tsCommitLog = 1;
int16_t tsCompression = TSDB_MAX_COMPRESSION_LEVEL;
int16_t tsDaysPerFile = 10;
int32_t tsDaysToKeep = 3650;
int32_t tsReplications = TSDB_REPLICA_MIN_NUM;
int32_t tsNumOfMPeers = 3;
int32_t tsMaxShellConns = 2000;
int32_t tsMaxTables = 100000;
......@@ -125,15 +122,16 @@ int32_t tsMaxVnodeConnections = 10000;
int32_t tsBalanceMonitorInterval = 2; // seconds
int32_t tsBalanceStartInterval = 300; // seconds
int32_t tsBalancePolicy = 0; // 1-use sys.montor
int32_t tsOfflineThreshold = 864000; // seconds 10days
int32_t tsMgmtEqualVnodeNum = 4;
int32_t tsEnableHttpModule = 1;
int32_t tsEnableMonitorModule = 0;
int32_t tsRestRowLimit = 10240;
int32_t tsMaxSQLStringLen = TSDB_MAX_SQL_LEN;
int32_t tsNumOfLogLines = 10000000;
int32_t mdebugFlag = 135;
int32_t sdbDebugFlag = 135;
int32_t ddebugFlag = 131;
......@@ -146,7 +144,6 @@ int32_t qdebugFlag = 131;
int32_t rpcDebugFlag = 131;
int32_t uDebugFlag = 131;
int32_t debugFlag = 131;
int tsNumOfLogLines = 10000000;
// the maximum number of results for projection query on super table that are returned from
// one virtual node, to order according to timestamp
......@@ -165,9 +162,6 @@ int32_t tsCompressMsgSize = -1;
// use UDP by default[option: udp, tcp]
char tsSocketType[4] = "udp";
// time precision, millisecond by default
int32_t tsTimePrecision = TSDB_TIME_PRECISION_MILLI;
// 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10;
......@@ -575,16 +569,6 @@ static void doInitGlobalConfig() {
cfg.option = "ctime";
cfg.ptr = &tsCommitTime;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.minValue = 30;
cfg.maxValue = 40960;
cfg.ptrLength = 0;
cfg.option = "statusInterval";
cfg.ptr = &tsStatusInterval;
cfg.valType = TAOS_CFG_VTYPE_INT32;
......@@ -686,32 +670,42 @@ static void doInitGlobalConfig() {
// database configs
cfg.option = "clog";
cfg.ptr = &tsCommitLog;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.option = "tables";
cfg.ptr = &tsTablesPerVnode;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.minValue = 0;
cfg.maxValue = 2;
cfg.minValue = TSDB_MIN_TABLES;
cfg.maxValue = TSDB_MAX_TABLES;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "comp";
cfg.ptr = &tsCompression;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.option = "cache";
cfg.ptr = &tsCacheBlockSize;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.minValue = 0;
cfg.maxValue = 2;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
cfg.option = "blocks";
cfg.ptr = &tsTotalBlocks;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
cfg.option = "days";
cfg.ptr = &tsDaysPerFile;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.minValue = 1;
cfg.maxValue = 365;
cfg.minValue = TSDB_MIN_DAYS_PER_FILE;
cfg.maxValue = TSDB_MAX_DAYS_PER_FILE;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
......@@ -720,78 +714,68 @@ static void doInitGlobalConfig() {
cfg.ptr = &tsDaysToKeep;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.minValue = 1;
cfg.maxValue = 365000;
cfg.minValue = TSDB_MIN_KEEP;
cfg.maxValue = TSDB_MAX_KEEP;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "replica";
cfg.ptr = &tsReplications;
cfg.option = "minRows";
cfg.ptr = &tsMinRowsInFileBlock;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.minValue = 1;
cfg.maxValue = 3;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "tables";
cfg.ptr = &tsSessionsPerVnode;
cfg.option = "maxRows";
cfg.ptr = &tsMaxRowsInFileBlock;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "cache";
cfg.ptr = &tsCacheBlockSize;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.option = "ctime";
cfg.ptr = &tsCommitTime;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.minValue = 100;
cfg.maxValue = 1048576;
cfg.minValue = TSDB_MIN_COMMIT_TIME;
cfg.maxValue = TSDB_MAX_COMMIT_TIME;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
cfg.option = "rows";
cfg.ptr = &tsRowsInFileBlock;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.option = "comp";
cfg.ptr = &tsCompression;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.minValue = 200;
cfg.maxValue = 1048576;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "fileBlockMinPercent";
cfg.ptr = &tsFileBlockMinPercent;
cfg.minValue = 0;
cfg.maxValue = 1.0;
cfg.minValue = TSDB_MIN_COMP_LEVEL;
cfg.maxValue = TSDB_MAX_COMP_LEVEL;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "ablocks";
cfg.ptr = &tsAverageCacheBlocks;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.option = "clog";
cfg.ptr = &tsCommitLog;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.minValue = TSDB_MIN_AVG_BLOCKS;
cfg.maxValue = TSDB_MAX_AVG_BLOCKS;
cfg.minValue = TSDB_MIN_CLOG_LEVEL;
cfg.maxValue = TSDB_MAX_CLOG_LEVEL;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
cfg.option = "tblocks";
cfg.ptr = &tsNumOfBlocksPerMeter;
cfg.valType = TAOS_CFG_VTYPE_INT16;
cfg.option = "replica";
cfg.ptr = &tsReplications;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.minValue = 32;
cfg.maxValue = 4096;
cfg.minValue = TSDB_MIN_REPLICA_NUM;
cfg.maxValue = TSDB_MAX_REPLICA_NUM;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
......@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
......@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
......@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
......@@ -316,6 +316,9 @@ class CTaosInterface(object):
blocks = [None] * len(fields)
for i in range(len(fields)):
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
if data == None:
blocks[i] = [None] * num_of_rows
if fields[i]['type'] not in _CONVERT_FUNC:
raise DatabaseError("Invalid data type returned from database")
......@@ -39,10 +39,6 @@ extern int32_t ddebugFlag;
#define dPrint(...) \
{ taosPrintLog("DND ", 255, __VA_ARGS__); }
#define dLError(...) taosLogError(__VA_ARGS__) dError(__VA_ARGS__)
#define dLWarn(...) taosLogWarn(__VA_ARGS__) dWarn(__VA_ARGS__)
#define dLPrint(...) taosLogPrint(__VA_ARGS__) dPrint(__VA_ARGS__)
#ifdef __cplusplus
......@@ -128,14 +128,16 @@ static void dnodeCloseVnodes() {
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.maxCacheSize = htobe64(pCreate->cfg.maxCacheSize);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.minRowsPerFileBlock = htonl(pCreate->cfg.minRowsPerFileBlock);
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.arbitratorIp = htonl(pCreate->cfg.arbitratorIp);
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册