未验证 提交 dcdb04fb 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1178 from taosdata/feature/liaohj

Feature/liaohj
...@@ -211,6 +211,12 @@ ...@@ -211,6 +211,12 @@
# whether to enable HTTP compression transmission # whether to enable HTTP compression transmission
# httpEnableCompress 0 # httpEnableCompress 0
# the delayed time for launching each continuous query. 10% of the whole computing time window by default.
# streamCompDelayRatio 0.1
# the max allowed delayed time for launching continuous query. 20ms by default
# tsMaxStreamComputDelay 20000
# whether the telegraf table name contains the number of tags and the number of fields # whether the telegraf table name contains the number of tags and the number of fields
# telegrafUseFieldNum 0 # telegrafUseFieldNum 0
......
...@@ -27,7 +27,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql); ...@@ -27,7 +27,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num); void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num);
void tscSetupOutputColumnIndex(SSqlObj* pSql); void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondSubquery(SSqlObj* pSql); int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
...@@ -121,7 +121,7 @@ STSBuf* tsBufCreate(bool autoDelete); ...@@ -121,7 +121,7 @@ STSBuf* tsBufCreate(bool autoDelete);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void tsBufDestory(STSBuf* pTSBuf); void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
......
...@@ -21,9 +21,70 @@ extern "C" { ...@@ -21,9 +21,70 @@ extern "C" {
#endif #endif
#include "taos.h" #include "taos.h"
#include "taosmsg.h"
#include "tsqldef.h" #include "tsqldef.h"
#include "ttypes.h" #include "ttypes.h"
#include "taosmsg.h"
enum _sql_cmd {
TSDB_SQL_SELECT = 1,
TSDB_SQL_FETCH,
TSDB_SQL_INSERT,
TSDB_SQL_MGMT, // the SQL below is for mgmt node
TSDB_SQL_CREATE_DB,
TSDB_SQL_CREATE_TABLE,
TSDB_SQL_DROP_DB,
TSDB_SQL_DROP_TABLE,
TSDB_SQL_CREATE_ACCT,
TSDB_SQL_CREATE_USER, //10
TSDB_SQL_DROP_ACCT,
TSDB_SQL_DROP_USER,
TSDB_SQL_ALTER_USER,
TSDB_SQL_ALTER_ACCT,
TSDB_SQL_ALTER_TABLE,
TSDB_SQL_ALTER_DB,
TSDB_SQL_CREATE_MNODE,
TSDB_SQL_DROP_MNODE,
TSDB_SQL_CREATE_DNODE,
TSDB_SQL_DROP_DNODE, // 20
TSDB_SQL_CFG_DNODE,
TSDB_SQL_CFG_MNODE,
TSDB_SQL_SHOW,
TSDB_SQL_RETRIEVE,
TSDB_SQL_KILL_QUERY,
TSDB_SQL_KILL_STREAM,
TSDB_SQL_KILL_CONNECTION,
TSDB_SQL_READ, // SQL below is for read operation
TSDB_SQL_CONNECT,
TSDB_SQL_USE_DB, // 30
TSDB_SQL_META,
TSDB_SQL_METRIC,
TSDB_SQL_MULTI_META,
TSDB_SQL_HB,
TSDB_SQL_LOCAL, // SQL below for client local
TSDB_SQL_DESCRIBE_TABLE,
TSDB_SQL_RETRIEVE_METRIC,
TSDB_SQL_METRIC_JOIN_RETRIEVE,
TSDB_SQL_RETRIEVE_TAGS,
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_SQL_RETRIEVE_EMPTY_RESULT, //40
TSDB_SQL_RESET_CACHE,
TSDB_SQL_SERV_STATUS,
TSDB_SQL_CURRENT_DB,
TSDB_SQL_SERV_VERSION,
TSDB_SQL_CLI_VERSION,
TSDB_SQL_CURRENT_USER,
TSDB_SQL_CFG_LOCAL,
TSDB_SQL_MAX //48
};
#define MAX_TOKEN_LEN 30 #define MAX_TOKEN_LEN 30
...@@ -72,72 +133,12 @@ typedef struct tFieldList { ...@@ -72,72 +133,12 @@ typedef struct tFieldList {
TAOS_FIELD *p; TAOS_FIELD *p;
} tFieldList; } tFieldList;
// sql operation type // create table operation type
enum TSQL_TYPE { enum TSQL_TYPE {
TSQL_CREATE_NORMAL_METER = 0x01, TSQL_CREATE_TABLE = 0x1,
TSQL_CREATE_NORMAL_METRIC = 0x02, TSQL_CREATE_STABLE = 0x2,
TSQL_CREATE_METER_FROM_METRIC = 0x04, TSQL_CREATE_TABLE_FROM_STABLE = 0x3,
TSQL_CREATE_STREAM = 0x08, TSQL_CREATE_STREAM = 0x4,
TSQL_QUERY_METER = 0x10,
TSQL_INSERT = 0x20,
DROP_DNODE = 0x40,
DROP_DATABASE = 0x41,
DROP_TABLE = 0x42,
DROP_USER = 0x43,
DROP_ACCOUNT = 0x44,
USE_DATABASE = 0x50,
// show operation
SHOW_DATABASES = 0x60,
SHOW_TABLES = 0x61,
SHOW_STABLES = 0x62,
SHOW_MNODES = 0x63,
SHOW_DNODES = 0x64,
SHOW_ACCOUNTS = 0x65,
SHOW_USERS = 0x66,
SHOW_VGROUPS = 0x67,
SHOW_QUERIES = 0x68,
SHOW_STREAMS = 0x69,
SHOW_CONFIGS = 0x6a,
SHOW_SCORES = 0x6b,
SHOW_MODULES = 0x6c,
SHOW_CONNECTIONS = 0x6d,
SHOW_GRANTS = 0x6e,
SHOW_VNODES = 0x6f,
// create dnode
CREATE_DNODE = 0x80,
CREATE_DATABASE = 0x81,
CREATE_USER = 0x82,
CREATE_ACCOUNT = 0x83,
DESCRIBE_TABLE = 0x90,
ALTER_USER_PASSWD = 0xA0,
ALTER_USER_PRIVILEGES = 0xA1,
ALTER_DNODE = 0xA2,
ALTER_LOCAL = 0xA3,
ALTER_DATABASE = 0xA4,
ALTER_ACCT = 0xA5,
// reset operation
RESET_QUERY_CACHE = 0xB0,
// alter tags
ALTER_TABLE_TAGS_ADD = 0xC0,
ALTER_TABLE_TAGS_DROP = 0xC1,
ALTER_TABLE_TAGS_CHG = 0xC2,
ALTER_TABLE_TAGS_SET = 0xC4,
// alter table column
ALTER_TABLE_ADD_COLUMN = 0xD0,
ALTER_TABLE_DROP_COLUMN = 0xD1,
KILL_QUERY = 0xD2,
KILL_STREAM = 0xD3,
KILL_CONNECTION = 0xD4,
}; };
typedef struct SQuerySQL { typedef struct SQuerySQL {
...@@ -158,32 +159,30 @@ typedef struct SCreateTableSQL { ...@@ -158,32 +159,30 @@ typedef struct SCreateTableSQL {
struct SSQLToken name; // meter name, create table [meterName] xxx struct SSQLToken name; // meter name, create table [meterName] xxx
bool existCheck; bool existCheck;
int8_t type; // create normal table/from super table/ stream
struct { struct {
tFieldList *pTagColumns; // for normal table, pTagColumns = NULL; tFieldList *pTagColumns; // for normal table, pTagColumns = NULL;
tFieldList *pColumns; tFieldList *pColumns;
} colInfo; } colInfo;
struct { struct {
SSQLToken metricName; // metric name, for using clause SSQLToken stableName; // super table name, for using clause
tVariantList *pTagVals; // create by using metric, tag value tVariantList *pTagVals; // create by using metric, tag value
STagData tagdata;
} usingInfo; } usingInfo;
SQuerySQL *pSelect; SQuerySQL *pSelect;
} SCreateTableSQL; } SCreateTableSQL;
typedef struct SAlterTableSQL { typedef struct SAlterTableSQL {
SSQLToken name; SSQLToken name;
int16_t type;
STagData tagData;
tFieldList * pAddColumns; tFieldList * pAddColumns;
SSQLToken dropTagToken;
tVariantList *varList; // set t=val or: change src dst tVariantList *varList; // set t=val or: change src dst
} SAlterTableSQL; } SAlterTableSQL;
typedef struct SInsertSQL {
SSQLToken name;
struct tSQLExprListList *pValue;
} SInsertSQL;
typedef struct SCreateDBInfo { typedef struct SCreateDBInfo {
SSQLToken dbname; SSQLToken dbname;
int32_t replica; int32_t replica;
...@@ -204,40 +203,67 @@ typedef struct SCreateDBInfo { ...@@ -204,40 +203,67 @@ typedef struct SCreateDBInfo {
} SCreateDBInfo; } SCreateDBInfo;
typedef struct SCreateAcctSQL { typedef struct SCreateAcctSQL {
int32_t users; int32_t maxUsers;
int32_t dbs; int32_t maxDbs;
int32_t tseries; int32_t maxTimeSeries;
int32_t streams; int32_t maxStreams;
int32_t pps; int32_t maxPointsPerSecond;
int64_t storage; int64_t maxStorage;
int64_t qtime; int64_t maxQueryTime;
int32_t conns; int32_t maxConnections;
SSQLToken stat; SSQLToken stat;
} SCreateAcctSQL; } SCreateAcctSQL;
typedef struct SShowInfo {
uint8_t showType;
SSQLToken prefix;
SSQLToken pattern;
} SShowInfo;
typedef struct SUserInfo {
SSQLToken user;
SSQLToken passwd;
// bool hasPasswd;
SSQLToken privilege;
// bool hasPrivilege;
int16_t type;
} SUserInfo;
typedef struct tDCLSQL { typedef struct tDCLSQL {
int32_t nTokens; /* Number of expressions on the list */ int32_t nTokens; /* Number of expressions on the list */
int32_t nAlloc; /* Number of entries allocated below */ int32_t nAlloc; /* Number of entries allocated below */
SSQLToken *a; /* one entry for element */ SSQLToken *a; /* one entry for element */
bool existsCheck;
union { union {
SCreateDBInfo dbOpt; SCreateDBInfo dbOpt;
SCreateAcctSQL acctOpt; SCreateAcctSQL acctOpt;
SShowInfo showOpt;
SSQLToken ip;
}; };
SUserInfo user;
} tDCLSQL; } tDCLSQL;
typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause
SQuerySQL **pClause;
int32_t numOfClause;
} SSubclauseInfo;
typedef struct SSqlInfo { typedef struct SSqlInfo {
int32_t sqlType; int32_t type;
bool validSql; bool valid;
union { union {
SCreateTableSQL *pCreateTableInfo; SCreateTableSQL *pCreateTableInfo;
SInsertSQL * pInsertInfo;
SAlterTableSQL * pAlterInfo; SAlterTableSQL * pAlterInfo;
SQuerySQL * pQueryInfo;
tDCLSQL * pDCLInfo; tDCLSQL * pDCLInfo;
}; };
SSubclauseInfo subclauseInfo;
char pzErrMsg[256]; char pzErrMsg[256];
} SSqlInfo; } SSqlInfo;
...@@ -338,7 +364,7 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection, ...@@ -338,7 +364,7 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection,
SCreateTableSQL *tSetCreateSQLElems(tFieldList *pCols, tFieldList *pTags, SSQLToken *pMetricName, SCreateTableSQL *tSetCreateSQLElems(tFieldList *pCols, tFieldList *pTags, SSQLToken *pMetricName,
tVariantList *pTagVals, SQuerySQL *pSelect, int32_t type); tVariantList *pTagVals, SQuerySQL *pSelect, int32_t type);
void tSQLExprDestroy(tSQLExpr *);
void tSQLExprNodeDestroy(tSQLExpr *pExpr); void tSQLExprNodeDestroy(tSQLExpr *pExpr);
tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr); tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr);
...@@ -346,23 +372,31 @@ SAlterTableSQL *tAlterTableSQLElems(SSQLToken *pMeterName, tFieldList *pCols, tV ...@@ -346,23 +372,31 @@ SAlterTableSQL *tAlterTableSQLElems(SSQLToken *pMeterName, tFieldList *pCols, tV
tSQLExprListList *tSQLListListAppend(tSQLExprListList *pList, tSQLExprList *pExprList); tSQLExprListList *tSQLListListAppend(tSQLExprListList *pList, tSQLExprList *pExprList);
void tSetInsertSQLElems(SSqlInfo *pInfo, SSQLToken *pName, tSQLExprListList *pList); void destroyAllSelectClause(SSubclauseInfo *pSql);
void doDestroyQuerySql(SQuerySQL *pSql);
void destroyQuerySql(SQuerySQL *pSql); SSqlInfo * setSQLInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SSQLToken *pMeterName, int32_t type);
SSubclauseInfo *setSubclause(SSubclauseInfo *pClause, void *pSqlExprInfo);
void setSQLInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SSQLToken *pMeterName, int32_t type); SSubclauseInfo *appendSelectClause(SSubclauseInfo *pInfo, void *pSubclause);
void setCreatedMeterName(SSqlInfo *pInfo, SSQLToken *pMeterName, SSQLToken *pIfNotExists); void setCreatedMeterName(SSqlInfo *pInfo, SSQLToken *pMeterName, SSQLToken *pIfNotExists);
void SQLInfoDestroy(SSqlInfo *pInfo); void SQLInfoDestroy(SSqlInfo *pInfo);
void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...); void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...);
void setDropDBTableInfo(SSqlInfo *pInfo, int32_t type, SSQLToken* pToken, SSQLToken* existsCheck);
void setShowOptions(SSqlInfo *pInfo, int32_t type, SSQLToken* prefix, SSQLToken* pPatterns);
tDCLSQL *tTokenListAppend(tDCLSQL *pTokenList, SSQLToken *pToken); tDCLSQL *tTokenListAppend(tDCLSQL *pTokenList, SSQLToken *pToken);
void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBInfo *pDB, SSQLToken *pIgExists); void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBInfo *pDB, SSQLToken *pIgExists);
void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo); void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo);
void setCreateUserSQL(SSqlInfo *pInfo, SSQLToken *pName, SSQLToken *pPasswd);
void setKillSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *ip);
void setAlterUserSQL(SSqlInfo *pInfo, int16_t type, SSQLToken *pName, SSQLToken* pPwd, SSQLToken *pPrivilege);
void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo); void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo);
// prefix show db.tables; // prefix show db.tables;
......
...@@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd ...@@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
void tscDestroyLocalReducer(SSqlObj *pSql); void tscDestroyLocalReducer(SSqlObj *pSql);
int32_t tscLocalDoReduce(SSqlObj *pSql); int32_t tscDoLocalreduce(SSqlObj *pSql);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -29,9 +29,9 @@ extern "C" { ...@@ -29,9 +29,9 @@ extern "C" {
#include "tsclient.h" #include "tsclient.h"
#include "tsdb.h" #include "tsdb.h"
#define UTIL_METER_IS_METRIC(metaInfo) \ #define UTIL_METER_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC)) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo))) #define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \ #define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE)) (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
...@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter { ...@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter {
} SJoinSubquerySupporter; } SJoinSubquerySupporter;
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks** dataBlocks); SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
...@@ -81,7 +81,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); ...@@ -81,7 +81,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, int32_t startOffset, int32_t rowSize, const char* tableId, SMeterMeta* pMeterMeta,
STableDataBlocks** dataBlocks); STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
...@@ -95,23 +95,27 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); ...@@ -95,23 +95,27 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
* @param pSql sql object * @param pSql sql object
* @return * @return
*/ */
bool tscIsPointInterpQuery(SSqlCmd* pCmd); bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SSqlCmd* pCmd); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscProjectionQueryOnMetric(SSqlCmd* pCmd);
bool tscProjectionQueryOnTable(SSqlCmd* pCmd); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd); bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd); bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SSqlCmd* pCmd); bool tscQueryMetricTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd); bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex, void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t isTag); SSchema* pColSchema, int16_t isTag);
void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex); void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex); int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SSqlCmd* pCmd); void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertOrImportData(char* sqlstr); bool tscIsInsertOrImportData(char* sqlstr);
...@@ -125,29 +129,33 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE ...@@ -125,29 +129,33 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE
void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes); void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes);
void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible); void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible);
void tscFieldInfoCalOffset(SSqlCmd* pCmd); void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffset(SSqlCmd* pCmd); void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size); void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size);
void tscFieldInfoCopyAll(SFieldInfo* src, SFieldInfo* dst); void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src);
TAOS_FIELD* tscFieldInfoGetField(SSqlCmd* pCmd, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index);
int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscGetResRowLength(SSqlCmd* pCmd); int32_t tscGetResRowLength(SQueryInfo* pQueryInfo);
void tscClearFieldInfo(SFieldInfo* pFieldInfo); void tscClearFieldInfo(SFieldInfo* pFieldInfo);
int32_t tscNumOfFields(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex); void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);
SSqlExpr* tscSqlExprInsert(SSqlCmd* pCmd, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type, SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize); int16_t size, int16_t interSize);
SSqlExpr* tscSqlExprInsertEmpty(SSqlCmd* pCmd, int32_t index, int16_t functionId); SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId);
SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size); int16_t size);
SSqlExpr* tscSqlExprGet(SSqlCmd* pCmd, int32_t index); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid);
void* tscSqlExprDestroy(SSqlExpr* pExpr);
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo);
SColumnBase* tscColumnBaseInfoInsert(SSqlCmd* pCmd, SColumnIndex* colIndex); SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* colIndex);
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src); void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src);
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src); void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src);
...@@ -162,7 +170,7 @@ int32_t tscValidateName(SSQLToken* pToken); ...@@ -162,7 +170,7 @@ int32_t tscValidateName(SSQLToken* pToken);
void tscIncStreamExecutionCount(void* pStream); void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId); bool tscValidateColumnId(SMeterMetaInfo* pMeterMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload // get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex); SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex);
...@@ -171,30 +179,38 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str); ...@@ -171,30 +179,38 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tscTagCondCopy(STagCond* dest, const STagCond* src); void tscTagCondCopy(STagCond* dest, const STagCond* src);
void tscTagCondRelease(STagCond* pCond); void tscTagCondRelease(STagCond* pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SSqlCmd* pCmd); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void tscSetFreeHeatBeat(STscObj* pObj); void tscSetFreeHeatBeat(STscObj* pObj);
bool tscShouldFreeHeatBeat(SSqlObj* pHb); bool tscShouldFreeHeatBeat(SSqlObj* pHb);
void tscCleanSqlCmd(SSqlCmd* pCmd); void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql); bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache); void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index); SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SSqlCmd* pCmd, uint64_t uid, int32_t* index); SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo);
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache);
SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta, SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta,
int16_t numOfTags, int16_t* tags); int16_t numOfTags, int16_t* tags);
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SSqlCmd* pCmd); SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr, uint64_t uid); void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql); int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex);
int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex); int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists); int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes); void tscResetForNextRetrieve(SSqlRes* pRes);
void tscAddTimestampColumn(SSqlCmd* pCmd, int16_t functionId, int16_t tableIndex); void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex);
void tscDoQuery(SSqlObj* pSql); void tscDoQuery(SSqlObj* pSql);
/** /**
...@@ -215,9 +231,9 @@ void tscDoQuery(SSqlObj* pSql); ...@@ -215,9 +231,9 @@ void tscDoQuery(SSqlObj* pSql);
* @return * @return
*/ */
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql); SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex); void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex); void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid); int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid);
...@@ -226,7 +242,13 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port, ...@@ -226,7 +242,13 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port,
void sortRemoveDuplicates(STableDataBlocks* dataBuf); void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlCmd* pCmd); void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,14 +20,6 @@ ...@@ -20,14 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "taosmsg.h" #include "taosmsg.h"
...@@ -39,69 +31,8 @@ extern "C" { ...@@ -39,69 +31,8 @@ extern "C" {
#include "tsqlfunction.h" #include "tsqlfunction.h"
#include "tutil.h" #include "tutil.h"
#define TSC_GET_RESPTR_BASE(res, cmd, col, ord) \ #define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
((res->data + tscFieldInfoGetOffset(cmd, col) * res->numOfRows) + \ (res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
(1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(cmd, col)->bytes)
enum _sql_cmd {
TSDB_SQL_SELECT,
TSDB_SQL_FETCH,
TSDB_SQL_INSERT,
TSDB_SQL_MGMT, // the SQL below is for mgmt node
TSDB_SQL_CREATE_DB,
TSDB_SQL_CREATE_TABLE,
TSDB_SQL_DROP_DB,
TSDB_SQL_DROP_TABLE,
TSDB_SQL_CREATE_ACCT,
TSDB_SQL_CREATE_USER,
TSDB_SQL_DROP_ACCT, // 10
TSDB_SQL_DROP_USER,
TSDB_SQL_ALTER_USER,
TSDB_SQL_ALTER_ACCT,
TSDB_SQL_ALTER_TABLE,
TSDB_SQL_ALTER_DB,
TSDB_SQL_CREATE_MNODE,
TSDB_SQL_DROP_MNODE,
TSDB_SQL_CREATE_DNODE,
TSDB_SQL_DROP_DNODE,
TSDB_SQL_CFG_DNODE, // 20
TSDB_SQL_CFG_MNODE,
TSDB_SQL_SHOW,
TSDB_SQL_RETRIEVE,
TSDB_SQL_KILL_QUERY,
TSDB_SQL_KILL_STREAM,
TSDB_SQL_KILL_CONNECTION,
TSDB_SQL_READ, // SQL below is for read operation
TSDB_SQL_CONNECT,
TSDB_SQL_USE_DB,
TSDB_SQL_META, // 30
TSDB_SQL_METRIC,
TSDB_SQL_MULTI_META,
TSDB_SQL_HB,
TSDB_SQL_LOCAL, // SQL below for client local
TSDB_SQL_DESCRIBE_TABLE,
TSDB_SQL_RETRIEVE_METRIC,
TSDB_SQL_METRIC_JOIN_RETRIEVE,
TSDB_SQL_RETRIEVE_TAGS,
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_SQL_RETRIEVE_EMPTY_RESULT,
TSDB_SQL_RESET_CACHE, // 40
TSDB_SQL_SERV_STATUS,
TSDB_SQL_CURRENT_DB,
TSDB_SQL_SERV_VERSION,
TSDB_SQL_CLI_VERSION,
TSDB_SQL_CURRENT_USER,
TSDB_SQL_CFG_LOCAL,
TSDB_SQL_MAX
};
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
...@@ -182,13 +113,6 @@ typedef struct SColumnBaseInfo { ...@@ -182,13 +113,6 @@ typedef struct SColumnBaseInfo {
struct SLocalReducer; struct SLocalReducer;
// todo move to utility
typedef struct SString {
int32_t alloc;
int32_t n;
char * z;
} SString;
typedef struct SCond { typedef struct SCond {
uint64_t uid; uint64_t uid;
char * cond; char * cond;
...@@ -246,7 +170,7 @@ typedef struct STableDataBlocks { ...@@ -246,7 +170,7 @@ typedef struct STableDataBlocks {
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref * the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* to avoid it to be removed from cache * to avoid it to be removed from cache
*/ */
SMeterMeta* pMeterMeta; SMeterMeta *pMeterMeta;
union { union {
char *filename; char *filename;
...@@ -268,53 +192,69 @@ typedef struct SDataBlockList { ...@@ -268,53 +192,69 @@ typedef struct SDataBlockList {
STableDataBlocks **pData; STableDataBlocks **pData;
} SDataBlockList; } SDataBlockList;
typedef struct { typedef struct SQueryInfo {
SOrderVal order; int16_t command; // the command may be different for each subclause, so keep it seperately.
int command; uint16_t type; // query/insert/import type
int count; // TODO refactor
union {
bool existsCheck; // check if the table exists
int8_t showType; // show command type
};
int8_t isParseFinish;
int8_t isInsertFromFile; // load data from file or not
bool import; // import/insert type
uint8_t msgType;
uint16_t type; // query type
char intervalTimeUnit; char intervalTimeUnit;
int64_t etime, stime; int64_t etime, stime;
int64_t nAggTimeInterval; // aggregation time interval int64_t nAggTimeInterval; // aggregation time interval
int64_t nSlidingTime; // sliding window in mseconds int64_t nSlidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info SSqlGroupbyExpr groupbyExpr; // group by tags info
/*
* use to keep short request msg and error msg, in such case, SSqlCmd->payload == SSqlCmd->ext;
* create table/query/insert operations will exceed the TSDB_SQLCMD_SIZE.
*
* In such cases, allocate the memory dynamically, and need to free the memory
*/
uint32_t allocSize;
char * payload;
int payloadLen;
short numOfCols;
SColumnBaseInfo colList; SColumnBaseInfo colList;
SFieldInfo fieldsInfo; SFieldInfo fieldsInfo;
SSqlExprInfo exprsInfo; SSqlExprInfo exprsInfo;
SLimitVal limit; SLimitVal limit;
SLimitVal slimit; SLimitVal slimit;
int64_t globalLimit;
STagCond tagCond; STagCond tagCond;
SOrderVal order;
int16_t interpoType; // interpolate type int16_t interpoType; // interpolate type
int16_t numOfTables; int16_t numOfTables;
// submit data blocks branched according to vnode
SDataBlockList * pDataBlocks;
SMeterMetaInfo **pMeterInfo; SMeterMetaInfo **pMeterInfo;
struct STSBuf * tsBuf; struct STSBuf * tsBuf;
// todo use dynamic allocated memory for defaultVal int64_t * defaultVal; // default value for interpolation
int64_t defaultVal[TSDB_MAX_COLUMNS]; // default value for interpolation char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset;
} SQueryInfo;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef struct {
int command;
uint8_t msgType;
union {
bool existsCheck; // check if the table exists or not
bool inStream; // denote if current sql is executed in stream or not
bool createOnDemand; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
};
union {
int32_t count;
int32_t numOfTablesInSubmit;
};
int32_t clauseIndex; // index of multiple subclause query
int8_t isParseFinish;
short numOfCols;
uint32_t allocSize;
char * payload;
int payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
// submit data blocks branched according to vnode
SDataBlockList *pDataBlocks;
// for parameter ('?') binding and batch processing // for parameter ('?') binding and batch processing
int32_t batchSize; int32_t batchSize;
...@@ -330,8 +270,10 @@ struct STSBuf; ...@@ -330,8 +270,10 @@ struct STSBuf;
typedef struct { typedef struct {
uint8_t code; uint8_t code;
int numOfRows; // num of results in current retrieved int64_t numOfRows; // num of results in current retrieved
int numOfTotal; // num of total results int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp; char * pRsp;
int rspType; int rspType;
int rspLen; int rspLen;
...@@ -394,9 +336,9 @@ typedef struct _sql_obj { ...@@ -394,9 +336,9 @@ typedef struct _sql_obj {
tsem_t emptyRspSem; tsem_t emptyRspSem;
SSqlCmd cmd; SSqlCmd cmd;
SSqlRes res; SSqlRes res;
uint16_t numOfSubs; uint8_t numOfSubs;
char* asyncTblPos; char * asyncTblPos;
void* pTableHashList; void * pTableHashList;
struct _sql_obj **pSubs; struct _sql_obj **pSubs;
struct _sql_obj * prev, *next; struct _sql_obj * prev, *next;
} SSqlObj; } SSqlObj;
...@@ -436,9 +378,11 @@ typedef struct { ...@@ -436,9 +378,11 @@ typedef struct {
} SIpStrList; } SIpStrList;
// tscSql API // tscSql API
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs(); void tscInitMsgs();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle); void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
...@@ -457,15 +401,16 @@ int taos_retrieve(TAOS_RES *res); ...@@ -457,15 +401,16 @@ int taos_retrieve(TAOS_RES *res);
* transfer function for metric query in stream computing, the function need to be change * transfer function for metric query in stream computing, the function need to be change
* before send query message to vnode * before send query message to vnode
*/ */
int32_t tscTansformSQLFunctionForMetricQuery(SSqlCmd *pCmd); int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd); void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd); void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SSqlCmd *pCmd, SSqlRes *pRes); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes); void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd); void tscFreeSqlCmdData(SSqlCmd *pCmd);
void tscFreeResData(SSqlObj* pSql);
/** /**
* free query result of the sql object * free query result of the sql object
...@@ -490,11 +435,13 @@ void tscFreeSqlObj(SSqlObj *pObj); ...@@ -490,11 +435,13 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj); void tscCloseTscObj(STscObj *pObj);
void tscProcessMultiVnodesInsert(SSqlObj *pSql); void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql); void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql); void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen); void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj); bool tscIsUpdateQuery(STscObj *pObj);
bool tscHasReachLimitation(SSqlObj* pSql); bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
...@@ -516,6 +463,8 @@ extern int tsInsertHeadSize; ...@@ -516,6 +463,8 @@ extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern SIpStrList tscMgmtIpList; extern SIpStrList tscMgmtIpList;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
此差异已折叠。
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tast.h" #include "tast.h"
#include "tlog.h" #include "tlog.h"
#include "tscSQLParser.h"
#include "tscSyntaxtreefunction.h" #include "tscSyntaxtreefunction.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsdb.h" #include "tsdb.h"
...@@ -26,7 +27,6 @@ ...@@ -26,7 +27,6 @@
#include "tstoken.h" #include "tstoken.h"
#include "ttypes.h" #include "ttypes.h"
#include "tutil.h" #include "tutil.h"
#include "tscSQLParser.h"
/* /*
* *
...@@ -115,6 +115,9 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, ...@@ -115,6 +115,9 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
int32_t i = 0; int32_t i = 0;
if (pToken->type == TK_ID) { if (pToken->type == TK_ID) {
do { do {
SSQLToken tableToken = {0};
extractTableNameFromToken(pToken, &tableToken);
size_t len = strlen(pSchema[i].name); size_t len = strlen(pSchema[i].name);
if (strncmp(pToken->z, pSchema[i].name, pToken->n) == 0 && pToken->n == len) break; if (strncmp(pToken->z, pSchema[i].name, pToken->n) == 0 && pToken->n == len) break;
} while (++i < numOfCols); } while (++i < numOfCols);
......
...@@ -26,19 +26,18 @@ ...@@ -26,19 +26,18 @@
#include "tutil.h" #include "tutil.h"
#include "tnote.h" #include "tnote.h"
void tscProcessFetchRow(SSchedMsg *pMsg); static void tscProcessFetchRow(SSchedMsg *pMsg);
void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()); static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());
/* /*
* proxy function to perform sequentially query&retrieve operation. * Proxy function to perform sequentially query&retrieve operation.
* If sql queries upon metric and two-stage merge procedure is not needed, * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
* it will sequentially query&retrieve data for all vnodes in pCmd->pMetricMeta * query), it will sequentially query&retrieve data for all vnodes
*/ */
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
// TODO return the correct error code to client in tscQueueAsyncError // TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
...@@ -81,7 +80,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, ...@@ -81,7 +80,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
return; return;
} }
pSql->sqlstr = malloc(sqlLen + 1); pSql->sqlstr = malloc(sqlLen + 1);
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql); tscError("%p failed to malloc sql string buffer", pSql);
...@@ -97,7 +95,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, ...@@ -97,7 +95,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
strtolower(pSql->sqlstr, sqlstr); strtolower(pSql->sqlstr, sqlstr);
tscDump("%p pObj:%p, Async SQL: %s", pSql, pObj, pSql->sqlstr); tscDump("%p pObj:%p, Async SQL: %s", pSql, pObj, pSql->sqlstr);
int32_t code = tsParseSql(pSql, pObj->acctId, pObj->db, true); int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -109,7 +107,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, ...@@ -109,7 +107,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
tscDoQuery(pSql); tscDoQuery(pSql);
} }
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
if (tres == NULL) { if (tres == NULL) {
return; return;
} }
...@@ -118,36 +116,32 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf ...@@ -118,36 +116,32 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx if (numOfRows == 0) {
if (numOfRows == 0 && tscProjectionQueryOnMetric(pCmd)) { if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); } else {
assert(pMeterMetaInfo->vnodeIndex >= 0); /*
* all available virtual node has been checked already, now we need to check
/* reach the maximum number of output rows, abort */ * for the next subclause queries
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { */
(*pSql->fetchFp)(param, tres, 0); if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return; return;
} }
/* update the limit value according to current retrieval results */ /*
pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal; * 1. has reach the limitation
pCmd->limit.offset = pRes->offset; * 2. no remain virtual nodes to be retrieved anymore
*/
if ((++(pMeterMetaInfo->vnodeIndex)) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { (*pSql->fetchFp)(param, pSql, 0);
tscTrace("%p retrieve data from next vnode:%d", pSql, pMeterMetaInfo->vnodeIndex); }
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncRetrieveNextVnode;
tscProcessSql(pSql);
return; return;
} }
} else { // localreducer has handle this situation
// local reducer has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
pRes->numOfTotal += pRes->numOfRows; pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
} }
(*pSql->fetchFp)(param, tres, numOfRows); (*pSql->fetchFp)(param, tres, numOfRows);
...@@ -164,7 +158,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -164,7 +158,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || numOfRows != 0) { if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
if (pRes->qhandle == 0) { if (pRes->qhandle == 0) {
tscError("qhandle is NULL"); tscError("qhandle is NULL");
} else { } else {
...@@ -183,14 +177,18 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -183,14 +177,18 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
} }
/* /*
* retrieve callback for fetch rows proxy. It serves as the callback function of querying vnode * retrieve callback for fetch rows proxy.
* The below two functions both serve as the callback function of query virtual node.
* query callback first, and then followed by retrieve callback
*/ */
static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows) { static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchRowsProxy); // query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
} }
static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows) { void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncRetrieve); // query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
} }
void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) { void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
...@@ -213,7 +211,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -213,7 +211,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
// user-defined callback function is stored in fetchFp // user-defined callback function is stored in fetchFp
pSql->fetchFp = fp; pSql->fetchFp = fp;
pSql->fp = tscProcessAsyncFetchRowsProxy; pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param; pSql->param = param;
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
...@@ -248,8 +246,12 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), ...@@ -248,8 +246,12 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if (pRes->row >= pRes->numOfRows) { if (pRes->row >= pRes->numOfRows) {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncRetrieve; pSql->fp = tscAsyncFetchSingleRowProxy;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql); tscProcessSql(pSql);
} else { } else {
SSchedMsg schedMsg; SSchedMsg schedMsg;
...@@ -261,47 +263,31 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), ...@@ -261,47 +263,31 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
} }
} }
void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj *pSql = (SSqlObj *)tres; SSqlObj *pSql = (SSqlObj *)tres;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (numOfRows == 0) { if (numOfRows == 0) {
// sequentially retrieve data from remain vnodes. if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
if (tscProjectionQueryOnMetric(pCmd)) { tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
} else {
/* /*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved * 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/ */
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
/* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
(*pSql->fetchFp)(pSql->param, pSql, NULL); (*pSql->fetchFp)(pSql->param, pSql, NULL);
return;
} }
/* update the limit value according to current retrieval results */
pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncContinueRetrieve;
tscProcessSql(pSql);
return; return;
} }
} else {
(*pSql->fetchFp)(pSql->param, pSql, NULL);
}
} else {
for (int i = 0; i < pCmd->numOfCols; ++i) for (int i = 0; i < pCmd->numOfCols; ++i)
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pCmd, i, pCmd->order) + pRes->bytes[i] * pRes->row; pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
pRes->row++; pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow); (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
}
} }
void tscProcessFetchRow(SSchedMsg *pMsg) { void tscProcessFetchRow(SSchedMsg *pMsg) {
...@@ -309,10 +295,13 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -309,10 +295,13 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
for (int i = 0; i < pCmd->numOfCols; ++i) SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pCmd, i, pCmd->order) + pRes->bytes[i] * pRes->row;
pRes->row++; for (int i = 0; i < pCmd->numOfCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
}
pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
} }
...@@ -371,7 +360,7 @@ void tscQueueAsyncRes(SSqlObj *pSql) { ...@@ -371,7 +360,7 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
tscTrace("%p SqlObj is freed, not add into queue async res", pSql); tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
return; return;
} else { } else {
tscTrace("%p add into queued async res, code:%d", pSql, pSql->res.code); tscError("%p add into queued async res, code:%d", pSql, pSql->res.code);
} }
SSchedMsg schedMsg; SSchedMsg schedMsg;
...@@ -404,10 +393,13 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) ...@@ -404,10 +393,13 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
assert(!pCmd->isInsertFromFile && pSql->signature == pSql); assert(pCmd->dataSourceType != 0 && pSql->signature == pSql);
int32_t index = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pCmd->numOfTables == 1); assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
SDataBlockList *pDataBlocks = pCmd->pDataBlocks; SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) { if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
...@@ -444,7 +436,6 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -444,7 +436,6 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj *pSql = (SSqlObj *)param; SSqlObj *pSql = (SSqlObj *)param;
if (pSql == NULL || pSql->signature != pSql) return; if (pSql == NULL || pSql->signature != pSql) return;
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
...@@ -465,9 +456,10 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -465,9 +456,10 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d", tscTrace("%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->thandle, pSql->retry); pSql, pSql->cmd.command, pSql->res.code, pSql->thandle, pSql->retry);
assert(tscGetMeterMetaInfo(&pSql->cmd, 0)->pMeterMeta == NULL); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
tscGetMeterMeta(pSql, tscGetMeterMetaInfo(&pSql->cmd, 0)->name, 0); assert(pMeterMetaInfo->pMeterMeta == NULL);
tscGetMeterMeta(pSql, pMeterMetaInfo);
code = tscSendMsgToServer(pSql); code = tscSendMsgToServer(pSql);
if (code != 0) { if (code != 0) {
pRes->code = code; pRes->code = code;
...@@ -485,24 +477,27 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -485,24 +477,27 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
} }
if (pSql->pStream == NULL) { if (pSql->pStream == NULL) {
// check if it is a sub-query of metric query first, if true, enter another routine // check if it is a sub-query of super table query first, if true, enter another routine
if ((pSql->cmd.type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL); assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param; SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj; SSqlObj * pParObj = trs->pParentSqlObj;
assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex && assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex &&
pMeterMetaInfo->pMeterMeta->numOfTags != 0); pMeterMetaInfo->pMeterMeta->numOfTags != 0);
tscTrace("%p get metricMeta during metric query successfully", pSql); tscTrace("%p get metricMeta during super table query successfully", pSql);
code = tscGetMeterMeta(pSql, tscGetMeterMetaInfo(&pSql->cmd, 0)->name, 0); code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code; pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
code = tscGetMetricMeta(pSql); code = tscGetMetricMeta(pSql, 0);
pRes->code = code; pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
...@@ -510,8 +505,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -510,8 +505,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->isParseFinish) { if (pCmd->isParseFinish) {
tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql); tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); code = tscGetMeterMeta(pSql, pMeterMetaInfo);
assert(code == TSDB_CODE_SUCCESS); assert(code == TSDB_CODE_SUCCESS);
if (pMeterMetaInfo->pMeterMeta) { if (pMeterMetaInfo->pMeterMeta) {
...@@ -519,28 +514,28 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -519,28 +514,28 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_SUCCESS) return; if (code == TSDB_CODE_SUCCESS) return;
} }
} else { } else {
code = tsParseSql(pSql, pObj->acctId, pObj->db, false); code = tsParseSql(pSql, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} }
} }
} else { // stream computing } else { // stream computing
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0); code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code; pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_METRIC(pMeterMetaInfo)) { code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
code = tscGetMetricMeta(pSql);
pRes->code = code; pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
} }
} }
if (code != 0) { if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql); tscQueueAsyncRes(pSql);
return; return;
} }
...@@ -549,10 +544,12 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -549,10 +544,12 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command); tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/* /*
* NOTE: * NOTE:
* transfer the sql function for metric query before get meter/metric meta, * transfer the sql function for super table query before get meter/metric meta,
* since in callback functions, only tscProcessSql(pStream->pSql) is executed! * since in callback functions, only tscProcessSql(pStream->pSql) is executed!
*/ */
tscTansformSQLFunctionForMetricQuery(&pSql->cmd); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream); tscIncStreamExecutionCount(pSql->pStream);
} else { } else {
tscTrace("%p get meterMeta/metricMeta successfully", pSql); tscTrace("%p get meterMeta/metricMeta successfully", pSql);
......
...@@ -72,6 +72,8 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ ...@@ -72,6 +72,8 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {} void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
void doFinalizer(SQLFunctionCtx *pCtx) { resetResultInfo(GET_RES_INFO(pCtx)); }
typedef struct tValuePair { typedef struct tValuePair {
tVariant v; tVariant v;
int64_t timestamp; int64_t timestamp;
...@@ -356,7 +358,7 @@ static void function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -356,7 +358,7 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
} }
resetResultInfo(GET_RES_INFO(pCtx)); doFinalizer(pCtx);
} }
/* /*
...@@ -889,6 +891,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) { ...@@ -889,6 +891,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
// cannot set the numOfIteratedElems again since it is set during previous iteration // cannot set the numOfIteratedElems again since it is set during previous iteration
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = 1;
doFinalizer(pCtx);
} }
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
...@@ -910,6 +913,16 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, ...@@ -910,6 +913,16 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
index = pCtx->preAggVals.maxIndex; index = pCtx->preAggVals.maxIndex;
} }
/**
* NOTE: work around the bug caused by invalid pre-calculated function.
* Here the selectivity + ts will not return correct value.
*
* The following codes of 3 lines will be removed later.
*/
if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
index = 0;
}
TSKEY key = pCtx->ptsList[index]; TSKEY key = pCtx->ptsList[index];
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
...@@ -1424,7 +1437,7 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) { ...@@ -1424,7 +1437,7 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1); SET_VAL(pCtx, 1, 1);
} }
resetResultInfo(GET_RES_INFO(pCtx)); doFinalizer(pCtx);
} }
////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////
...@@ -1456,7 +1469,9 @@ static void first_function(SQLFunctionCtx *pCtx) { ...@@ -1456,7 +1469,9 @@ static void first_function(SQLFunctionCtx *pCtx) {
} }
memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes); memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, i);
TSKEY k = pCtx->ptsList[i];
DO_UPDATE_TAG_COLUMNS(pCtx, k);
SResultInfo *pInfo = GET_RES_INFO(pCtx); SResultInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG; pInfo->hasResult = DATA_SET_FLAG;
...@@ -1824,7 +1839,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) { ...@@ -1824,7 +1839,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
} }
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = 1;
resetResultInfo(GET_RES_INFO(pCtx)); doFinalizer(pCtx);
} }
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
...@@ -2005,15 +2020,8 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) { ...@@ -2005,15 +2020,8 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
STopBotInfo *pRes = pResInfo->interResultBuf; STopBotInfo *pRes = pResInfo->interResultBuf;
tValuePair **tvp = pRes->res; tValuePair **tvp = pRes->res;
int32_t step = 0;
// in case of second stage merge, always use incremental output.
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
step = QUERY_ASC_FORWARD_STEP;
} else {
step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
}
int32_t step = QUERY_ASC_FORWARD_STEP;
int32_t len = GET_RES_INFO(pCtx)->numOfRes; int32_t len = GET_RES_INFO(pCtx)->numOfRes;
switch (type) { switch (type) {
...@@ -2393,7 +2401,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2393,7 +2401,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
GET_TRUE_DATA_TYPE(); GET_TRUE_DATA_TYPE();
copyTopBotRes(pCtx, type); copyTopBotRes(pCtx, type);
resetResultInfo(pResInfo); doFinalizer(pCtx);
} }
/////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////
...@@ -2470,7 +2478,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2470,7 +2478,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
tOrderDescDestroy(pMemBucket->pOrderDesc); tOrderDescDestroy(pMemBucket->pOrderDesc);
tMemBucketDestroy(pMemBucket); tMemBucketDestroy(pMemBucket);
resetResultInfo(GET_RES_INFO(pCtx)); doFinalizer(pCtx);
} }
////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////
...@@ -2679,7 +2687,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2679,7 +2687,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
} }
} }
resetResultInfo(pResInfo); doFinalizer(pCtx);
} }
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
...@@ -2859,7 +2867,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { ...@@ -2859,7 +2867,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
param[1][2] /= param[1][1]; param[1][2] /= param[1][1];
sprintf(pCtx->aOutputBuf, "(%lf, %lf)", param[0][2], param[1][2]); sprintf(pCtx->aOutputBuf, "(%lf, %lf)", param[0][2], param[1][2]);
resetResultInfo(GET_RES_INFO(pCtx)); doFinalizer(pCtx);
} }
static void date_col_output_function(SQLFunctionCtx *pCtx) { static void date_col_output_function(SQLFunctionCtx *pCtx) {
...@@ -2878,17 +2886,17 @@ static FORCE_INLINE void date_col_output_function_f(SQLFunctionCtx *pCtx, int32_ ...@@ -2878,17 +2886,17 @@ static FORCE_INLINE void date_col_output_function_f(SQLFunctionCtx *pCtx, int32_
static void col_project_function(SQLFunctionCtx *pCtx) { static void col_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size); INC_INIT_VAL(pCtx, pCtx->size);
char *pDest = 0; char *pData = GET_INPUT_CHAR(pCtx);
if (pCtx->order == TSQL_SO_ASC) { if (pCtx->order == TSQL_SO_ASC) {
pDest = pCtx->aOutputBuf; memcpy(pCtx->aOutputBuf, pData, (size_t)pCtx->size * pCtx->inputBytes);
} else { } else {
pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes; for(int32_t i = 0; i < pCtx->size; ++i) {
memcpy(pCtx->aOutputBuf + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes,
pCtx->inputBytes);
}
} }
char *pData = GET_INPUT_CHAR(pCtx); pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes;
memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes);
pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
} }
static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
...@@ -2903,7 +2911,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -2903,7 +2911,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index); char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes); memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
pCtx->aOutputBuf += pCtx->inputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); pCtx->aOutputBuf += pCtx->inputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
} }
/** /**
...@@ -2915,18 +2923,17 @@ static void tag_project_function(SQLFunctionCtx *pCtx) { ...@@ -2915,18 +2923,17 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size); INC_INIT_VAL(pCtx, pCtx->size);
assert(pCtx->inputBytes == pCtx->outputBytes); assert(pCtx->inputBytes == pCtx->outputBytes);
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
for (int32_t i = 0; i < pCtx->size; ++i) { for (int32_t i = 0; i < pCtx->size; ++i) {
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType); tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType);
pCtx->aOutputBuf += pCtx->outputBytes * factor; pCtx->aOutputBuf += pCtx->outputBytes;
} }
} }
static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
INC_INIT_VAL(pCtx, 1); INC_INIT_VAL(pCtx, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType); tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); pCtx->aOutputBuf += pCtx->outputBytes;
} }
/** /**
...@@ -2975,8 +2982,8 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -2975,8 +2982,8 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0; int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSQL_SO_ASC) ? 0 : pCtx->size - 1; int32_t i = (pCtx->order == TSQL_SO_ASC) ? 0 : pCtx->size - 1;
TSKEY * pTimestamp = pCtx->ptsOutputBuf; TSKEY * pTimestamp = pCtx->ptsOutputBuf;
switch (pCtx->inputType) { switch (pCtx->inputType) {
...@@ -2996,14 +3003,14 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -2996,14 +3003,14 @@ static void diff_function(SQLFunctionCtx *pCtx) {
*pOutput = pData[i] - pCtx->param[1].i64Key; *pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step; pOutput += 1;
pTimestamp += step; pTimestamp += 1;
} else { } else {
*pOutput = pData[i] - pData[i - step]; *pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step; pOutput += 1;
pTimestamp += step; pTimestamp += 1;
} }
pCtx->param[1].i64Key = pData[i]; pCtx->param[1].i64Key = pData[i];
...@@ -3028,14 +3035,14 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3028,14 +3035,14 @@ static void diff_function(SQLFunctionCtx *pCtx) {
*pOutput = pData[i] - pCtx->param[1].i64Key; *pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step; pOutput += 1;
pTimestamp += step; pTimestamp += 1;
} else { } else {
*pOutput = pData[i] - pData[i - step]; *pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step; pOutput += 1;
pTimestamp += step; pTimestamp += 1;
} }
pCtx->param[1].i64Key = pData[i]; pCtx->param[1].i64Key = pData[i];
...@@ -3059,13 +3066,13 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3059,13 +3066,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) { } else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].dKey; *pOutput = pData[i] - pCtx->param[1].dKey;
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step; pOutput += 1;
pTimestamp += step; pTimestamp += 1;
} else { } else {
*pOutput = pData[i] - pData[i - step]; *pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step; pOutput += 1;
pTimestamp += step; pTimestamp += 1;
} }
pCtx->param[1].dKey = pData[i]; pCtx->param[1].dKey = pData[i];
...@@ -3089,13 +3096,15 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3089,13 +3096,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) { } else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].dKey; *pOutput = pData[i] - pCtx->param[1].dKey;
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step; pOutput += 1;
pTimestamp += 1;
} else { } else {
*pOutput = pData[i] - pData[i - step]; *pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step; pOutput += 1;
pTimestamp += 1;
} }
// keep the last value, the remain may be all null // keep the last value, the remain may be all null
...@@ -3120,13 +3129,14 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3120,13 +3129,14 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) { } else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].i64Key; *pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step; pOutput += 1;
pTimestamp += step; pTimestamp += 1;
} else { } else {
*pOutput = pData[i] - pData[i - step]; *pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step; pOutput += 1;
pTimestamp += 1;
} }
pCtx->param[1].i64Key = pData[i]; pCtx->param[1].i64Key = pData[i];
...@@ -3150,13 +3160,15 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3150,13 +3160,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) { } else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].i64Key; *pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step; pOutput += 1;
pTimestamp += 1;
} else { } else {
*pOutput = pData[i] - pData[i - step]; *pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i]; *pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step; pOutput += 1;
pTimestamp += 1;
} }
pCtx->param[1].i64Key = pData[i]; pCtx->param[1].i64Key = pData[i];
...@@ -3181,8 +3193,8 @@ static void diff_function(SQLFunctionCtx *pCtx) { ...@@ -3181,8 +3193,8 @@ static void diff_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += forwardStep; GET_RES_INFO(pCtx)->numOfRes += forwardStep;
pCtx->aOutputBuf = pCtx->aOutputBuf + forwardStep * pCtx->outputBytes * step; pCtx->aOutputBuf += forwardStep * pCtx->outputBytes;
pCtx->ptsOutputBuf = (char *)pCtx->ptsOutputBuf + forwardStep * TSDB_KEYSIZE * step; pCtx->ptsOutputBuf += forwardStep * TSDB_KEYSIZE;
} }
} }
...@@ -3209,7 +3221,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -3209,7 +3221,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
GET_RES_INFO(pCtx)->numOfRes += 1; GET_RES_INFO(pCtx)->numOfRes += 1;
} }
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); int32_t step = 1/*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
switch (pCtx->inputType) { switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
...@@ -3277,7 +3289,8 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) { ...@@ -3277,7 +3289,8 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function); arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
pCtx->param[1].pz = NULL;
} }
static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) { static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
...@@ -3288,7 +3301,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) { ...@@ -3288,7 +3301,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order, tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function); arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order); pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
} }
#define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \ #define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \
...@@ -3504,7 +3517,6 @@ void spread_func_sec_merge(SQLFunctionCtx *pCtx) { ...@@ -3504,7 +3517,6 @@ void spread_func_sec_merge(SQLFunctionCtx *pCtx) {
pCtx->param[3].dKey = pData->max; pCtx->param[3].dKey = pData->max;
} }
// pCtx->numOfIteratedElems += 1;
GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
} }
...@@ -3537,8 +3549,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3537,8 +3549,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
*(double *)pCtx->aOutputBuf = pInfo->max - pInfo->min; *(double *)pCtx->aOutputBuf = pInfo->max - pInfo->min;
} }
// SET_VAL(pCtx, pCtx->numOfIteratedElems, 1); GET_RES_INFO(pCtx)->numOfRes = 1; // todo add test case
resetResultInfo(GET_RES_INFO(pCtx));
} }
/* /*
...@@ -4171,7 +4182,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -4171,7 +4182,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
} }
GET_RES_INFO(pCtx)->numOfRes = 1; GET_RES_INFO(pCtx)->numOfRes = 1;
resetResultInfo(GET_RES_INFO(pCtx)); doFinalizer(pCtx);
} }
/** /**
...@@ -4333,7 +4344,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { ...@@ -4333,7 +4344,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
strcpy(pCtx->aOutputBuf, pTSbuf->path); strcpy(pCtx->aOutputBuf, pTSbuf->path);
tsBufDestory(pTSbuf); tsBufDestory(pTSbuf);
resetResultInfo(GET_RES_INFO(pCtx)); doFinalizer(pCtx);
} }
/* /*
...@@ -4373,7 +4384,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4373,7 +4384,7 @@ SQLAggFuncElem aAggs[28] = {{
count_function, count_function,
count_function_f, count_function_f,
no_next_step, no_next_step,
noop1, doFinalizer,
count_func_merge, count_func_merge,
count_func_merge, count_func_merge,
count_load_data_info, count_load_data_info,
...@@ -4616,7 +4627,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4616,7 +4627,7 @@ SQLAggFuncElem aAggs[28] = {{
date_col_output_function, date_col_output_function,
date_col_output_function_f, date_col_output_function_f,
no_next_step, no_next_step,
noop1, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
no_data_info, no_data_info,
...@@ -4631,7 +4642,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4631,7 +4642,7 @@ SQLAggFuncElem aAggs[28] = {{
noop1, noop1,
noop2, noop2,
no_next_step, no_next_step,
noop1, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
data_req_load_info, data_req_load_info,
...@@ -4646,7 +4657,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4646,7 +4657,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_function, tag_function,
noop2, noop2,
no_next_step, no_next_step,
noop1, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
no_data_info, no_data_info,
...@@ -4676,7 +4687,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4676,7 +4687,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_function, tag_function,
tag_function_f, tag_function_f,
no_next_step, no_next_step,
noop1, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
no_data_info, no_data_info,
...@@ -4691,7 +4702,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4691,7 +4702,7 @@ SQLAggFuncElem aAggs[28] = {{
col_project_function, col_project_function,
col_project_function_f, col_project_function_f,
no_next_step, no_next_step,
noop1, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
data_req_load_info, data_req_load_info,
...@@ -4706,7 +4717,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4706,7 +4717,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_project_function, tag_project_function,
tag_project_function_f, tag_project_function_f,
no_next_step, no_next_step,
noop1, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
no_data_info, no_data_info,
...@@ -4721,7 +4732,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4721,7 +4732,7 @@ SQLAggFuncElem aAggs[28] = {{
arithmetic_function, arithmetic_function,
arithmetic_function_f, arithmetic_function_f,
no_next_step, no_next_step,
noop1, doFinalizer,
copy_function, copy_function,
copy_function, copy_function,
data_req_load_info, data_req_load_info,
...@@ -4736,7 +4747,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4736,7 +4747,7 @@ SQLAggFuncElem aAggs[28] = {{
diff_function, diff_function,
diff_function_f, diff_function_f,
no_next_step, no_next_step,
noop1, doFinalizer,
noop1, noop1,
noop1, noop1,
data_req_load_info, data_req_load_info,
...@@ -4782,7 +4793,7 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4782,7 +4793,7 @@ SQLAggFuncElem aAggs[28] = {{
interp_function, interp_function,
do_sum_f, // todo filter handle do_sum_f, // todo filter handle
no_next_step, no_next_step,
noop1, doFinalizer,
noop1, noop1,
copy_function, copy_function,
no_data_info, no_data_info,
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -125,7 +125,7 @@ extern "C" { ...@@ -125,7 +125,7 @@ extern "C" {
#define TSDB_CODE_BATCH_SIZE_TOO_BIG 104 #define TSDB_CODE_BATCH_SIZE_TOO_BIG 104
#define TSDB_CODE_TIMESTAMP_OUT_OF_RANGE 105 #define TSDB_CODE_TIMESTAMP_OUT_OF_RANGE 105
#define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode #define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode
#define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered #define TSDB_CODE_SORTED_RES_TOO_MANY 107 // too many result for ordered super table projection query
#define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered #define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered
#define TSDB_CODE_INVALID_COMMIT_LOG 109 // commit log init failed #define TSDB_CODE_INVALID_COMMIT_LOG 109 // commit log init failed
#define TSDB_CODE_SERV_NO_DISKSPACE 110 #define TSDB_CODE_SERV_NO_DISKSPACE 110
......
此差异已折叠。
此差异已折叠。
...@@ -127,6 +127,7 @@ extern int tsEnableMonitorModule; ...@@ -127,6 +127,7 @@ extern int tsEnableMonitorModule;
extern int tsRestRowLimit; extern int tsRestRowLimit;
extern int tsCompressMsgSize; extern int tsCompressMsgSize;
extern int tsMaxSQLStringLen; extern int tsMaxSQLStringLen;
extern int tsMaxNumOfOrderedResults;
extern char tsSocketType[4]; extern char tsSocketType[4];
...@@ -136,6 +137,7 @@ extern int tsMinIntervalTime; ...@@ -136,6 +137,7 @@ extern int tsMinIntervalTime;
extern int tsMaxStreamComputDelay; extern int tsMaxStreamComputDelay;
extern int tsStreamCompStartDelay; extern int tsStreamCompStartDelay;
extern int tsStreamCompRetryDelay; extern int tsStreamCompRetryDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int tsProjectExecInterval; extern int tsProjectExecInterval;
extern int64_t tsMaxRetentWindow; extern int64_t tsMaxRetentWindow;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册