提交 5c921ca8 编写于 作者: S slguan

Merge branch 'develop' into 2.0

......@@ -211,6 +211,12 @@
# whether to enable HTTP compression transmission
# httpEnableCompress 0
# the delayed time for launching each continuous query. 10% of the whole computing time window by default.
# streamCompDelayRatio 0.1
# the max allowed delayed time for launching continuous query. 20ms by default
# tsMaxStreamComputDelay 20000
# whether the telegraf table name contains the number of tags and the number of fields
# telegrafUseFieldNum 0
......
......@@ -27,7 +27,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondSubquery(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
......@@ -121,7 +121,7 @@ STSBuf* tsBufCreate(bool autoDelete);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void tsBufDestory(STSBuf* pTSBuf);
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
......
......@@ -21,9 +21,70 @@ extern "C" {
#endif
#include "taos.h"
#include "taosmsg.h"
#include "tsqldef.h"
#include "ttypes.h"
#include "taosmsg.h"
enum _sql_cmd {
TSDB_SQL_SELECT = 1,
TSDB_SQL_FETCH,
TSDB_SQL_INSERT,
TSDB_SQL_MGMT, // the SQL below is for mgmt node
TSDB_SQL_CREATE_DB,
TSDB_SQL_CREATE_TABLE,
TSDB_SQL_DROP_DB,
TSDB_SQL_DROP_TABLE,
TSDB_SQL_CREATE_ACCT,
TSDB_SQL_CREATE_USER, //10
TSDB_SQL_DROP_ACCT,
TSDB_SQL_DROP_USER,
TSDB_SQL_ALTER_USER,
TSDB_SQL_ALTER_ACCT,
TSDB_SQL_ALTER_TABLE,
TSDB_SQL_ALTER_DB,
TSDB_SQL_CREATE_MNODE,
TSDB_SQL_DROP_MNODE,
TSDB_SQL_CREATE_DNODE,
TSDB_SQL_DROP_DNODE, // 20
TSDB_SQL_CFG_DNODE,
TSDB_SQL_CFG_MNODE,
TSDB_SQL_SHOW,
TSDB_SQL_RETRIEVE,
TSDB_SQL_KILL_QUERY,
TSDB_SQL_KILL_STREAM,
TSDB_SQL_KILL_CONNECTION,
TSDB_SQL_READ, // SQL below is for read operation
TSDB_SQL_CONNECT,
TSDB_SQL_USE_DB, // 30
TSDB_SQL_META,
TSDB_SQL_METRIC,
TSDB_SQL_MULTI_META,
TSDB_SQL_HB,
TSDB_SQL_LOCAL, // SQL below for client local
TSDB_SQL_DESCRIBE_TABLE,
TSDB_SQL_RETRIEVE_METRIC,
TSDB_SQL_METRIC_JOIN_RETRIEVE,
TSDB_SQL_RETRIEVE_TAGS,
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_SQL_RETRIEVE_EMPTY_RESULT, //40
TSDB_SQL_RESET_CACHE,
TSDB_SQL_SERV_STATUS,
TSDB_SQL_CURRENT_DB,
TSDB_SQL_SERV_VERSION,
TSDB_SQL_CLI_VERSION,
TSDB_SQL_CURRENT_USER,
TSDB_SQL_CFG_LOCAL,
TSDB_SQL_MAX //48
};
#define MAX_TOKEN_LEN 30
......@@ -72,72 +133,12 @@ typedef struct tFieldList {
TAOS_FIELD *p;
} tFieldList;
// sql operation type
// create table operation type
enum TSQL_TYPE {
TSQL_CREATE_NORMAL_METER = 0x01,
TSQL_CREATE_NORMAL_METRIC = 0x02,
TSQL_CREATE_METER_FROM_METRIC = 0x04,
TSQL_CREATE_STREAM = 0x08,
TSQL_QUERY_METER = 0x10,
TSQL_INSERT = 0x20,
DROP_DNODE = 0x40,
DROP_DATABASE = 0x41,
DROP_TABLE = 0x42,
DROP_USER = 0x43,
DROP_ACCOUNT = 0x44,
USE_DATABASE = 0x50,
// show operation
SHOW_DATABASES = 0x60,
SHOW_TABLES = 0x61,
SHOW_STABLES = 0x62,
SHOW_MNODES = 0x63,
SHOW_DNODES = 0x64,
SHOW_ACCOUNTS = 0x65,
SHOW_USERS = 0x66,
SHOW_VGROUPS = 0x67,
SHOW_QUERIES = 0x68,
SHOW_STREAMS = 0x69,
SHOW_CONFIGS = 0x6a,
SHOW_SCORES = 0x6b,
SHOW_MODULES = 0x6c,
SHOW_CONNECTIONS = 0x6d,
SHOW_GRANTS = 0x6e,
SHOW_VNODES = 0x6f,
// create dnode
CREATE_DNODE = 0x80,
CREATE_DATABASE = 0x81,
CREATE_USER = 0x82,
CREATE_ACCOUNT = 0x83,
DESCRIBE_TABLE = 0x90,
ALTER_USER_PASSWD = 0xA0,
ALTER_USER_PRIVILEGES = 0xA1,
ALTER_DNODE = 0xA2,
ALTER_LOCAL = 0xA3,
ALTER_DATABASE = 0xA4,
ALTER_ACCT = 0xA5,
// reset operation
RESET_QUERY_CACHE = 0xB0,
// alter tags
ALTER_TABLE_TAGS_ADD = 0xC0,
ALTER_TABLE_TAGS_DROP = 0xC1,
ALTER_TABLE_TAGS_CHG = 0xC2,
ALTER_TABLE_TAGS_SET = 0xC4,
// alter table column
ALTER_TABLE_ADD_COLUMN = 0xD0,
ALTER_TABLE_DROP_COLUMN = 0xD1,
KILL_QUERY = 0xD2,
KILL_STREAM = 0xD3,
KILL_CONNECTION = 0xD4,
TSQL_CREATE_TABLE = 0x1,
TSQL_CREATE_STABLE = 0x2,
TSQL_CREATE_TABLE_FROM_STABLE = 0x3,
TSQL_CREATE_STREAM = 0x4,
};
typedef struct SQuerySQL {
......@@ -158,32 +159,30 @@ typedef struct SCreateTableSQL {
struct SSQLToken name; // meter name, create table [meterName] xxx
bool existCheck;
int8_t type; // create normal table/from super table/ stream
struct {
tFieldList *pTagColumns; // for normal table, pTagColumns = NULL;
tFieldList *pColumns;
} colInfo;
struct {
SSQLToken metricName; // metric name, for using clause
SSQLToken stableName; // super table name, for using clause
tVariantList *pTagVals; // create by using metric, tag value
STagData tagdata;
} usingInfo;
SQuerySQL *pSelect;
} SCreateTableSQL;
typedef struct SAlterTableSQL {
SSQLToken name;
int16_t type;
STagData tagData;
tFieldList * pAddColumns;
SSQLToken dropTagToken;
tVariantList *varList; // set t=val or: change src dst
} SAlterTableSQL;
typedef struct SInsertSQL {
SSQLToken name;
struct tSQLExprListList *pValue;
} SInsertSQL;
typedef struct SCreateDBInfo {
SSQLToken dbname;
int32_t replica;
......@@ -204,40 +203,67 @@ typedef struct SCreateDBInfo {
} SCreateDBInfo;
typedef struct SCreateAcctSQL {
int32_t users;
int32_t dbs;
int32_t tseries;
int32_t streams;
int32_t pps;
int64_t storage;
int64_t qtime;
int32_t conns;
int32_t maxUsers;
int32_t maxDbs;
int32_t maxTimeSeries;
int32_t maxStreams;
int32_t maxPointsPerSecond;
int64_t maxStorage;
int64_t maxQueryTime;
int32_t maxConnections;
SSQLToken stat;
} SCreateAcctSQL;
typedef struct SShowInfo {
uint8_t showType;
SSQLToken prefix;
SSQLToken pattern;
} SShowInfo;
typedef struct SUserInfo {
SSQLToken user;
SSQLToken passwd;
// bool hasPasswd;
SSQLToken privilege;
// bool hasPrivilege;
int16_t type;
} SUserInfo;
typedef struct tDCLSQL {
int32_t nTokens; /* Number of expressions on the list */
int32_t nAlloc; /* Number of entries allocated below */
SSQLToken *a; /* one entry for element */
bool existsCheck;
union {
SCreateDBInfo dbOpt;
SCreateAcctSQL acctOpt;
SShowInfo showOpt;
SSQLToken ip;
};
SUserInfo user;
} tDCLSQL;
typedef struct SSubclauseInfo { // "UNION" multiple select sub-clause
SQuerySQL **pClause;
int32_t numOfClause;
} SSubclauseInfo;
typedef struct SSqlInfo {
int32_t sqlType;
bool validSql;
int32_t type;
bool valid;
union {
SCreateTableSQL *pCreateTableInfo;
SInsertSQL * pInsertInfo;
SAlterTableSQL * pAlterInfo;
SQuerySQL * pQueryInfo;
tDCLSQL * pDCLInfo;
};
SSubclauseInfo subclauseInfo;
char pzErrMsg[256];
} SSqlInfo;
......@@ -338,7 +364,7 @@ SQuerySQL *tSetQuerySQLElems(SSQLToken *pSelectToken, tSQLExprList *pSelection,
SCreateTableSQL *tSetCreateSQLElems(tFieldList *pCols, tFieldList *pTags, SSQLToken *pMetricName,
tVariantList *pTagVals, SQuerySQL *pSelect, int32_t type);
void tSQLExprDestroy(tSQLExpr *);
void tSQLExprNodeDestroy(tSQLExpr *pExpr);
tSQLExpr *tSQLExprNodeClone(tSQLExpr *pExpr);
......@@ -346,23 +372,31 @@ SAlterTableSQL *tAlterTableSQLElems(SSQLToken *pMeterName, tFieldList *pCols, tV
tSQLExprListList *tSQLListListAppend(tSQLExprListList *pList, tSQLExprList *pExprList);
void tSetInsertSQLElems(SSqlInfo *pInfo, SSQLToken *pName, tSQLExprListList *pList);
void destroyAllSelectClause(SSubclauseInfo *pSql);
void doDestroyQuerySql(SQuerySQL *pSql);
void destroyQuerySql(SQuerySQL *pSql);
SSqlInfo * setSQLInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SSQLToken *pMeterName, int32_t type);
SSubclauseInfo *setSubclause(SSubclauseInfo *pClause, void *pSqlExprInfo);
void setSQLInfo(SSqlInfo *pInfo, void *pSqlExprInfo, SSQLToken *pMeterName, int32_t type);
SSubclauseInfo *appendSelectClause(SSubclauseInfo *pInfo, void *pSubclause);
void setCreatedMeterName(SSqlInfo *pInfo, SSQLToken *pMeterName, SSQLToken *pIfNotExists);
void SQLInfoDestroy(SSqlInfo *pInfo);
void setDCLSQLElems(SSqlInfo *pInfo, int32_t type, int32_t nParams, ...);
void setDropDBTableInfo(SSqlInfo *pInfo, int32_t type, SSQLToken* pToken, SSQLToken* existsCheck);
void setShowOptions(SSqlInfo *pInfo, int32_t type, SSQLToken* prefix, SSQLToken* pPatterns);
tDCLSQL *tTokenListAppend(tDCLSQL *pTokenList, SSQLToken *pToken);
void setCreateDBSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pToken, SCreateDBInfo *pDB, SSQLToken *pIgExists);
void setCreateAcctSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *pName, SSQLToken *pPwd, SCreateAcctSQL *pAcctInfo);
void setCreateUserSQL(SSqlInfo *pInfo, SSQLToken *pName, SSQLToken *pPasswd);
void setKillSQL(SSqlInfo *pInfo, int32_t type, SSQLToken *ip);
void setAlterUserSQL(SSqlInfo *pInfo, int16_t type, SSQLToken *pName, SSQLToken* pPwd, SSQLToken *pPrivilege);
void setDefaultCreateDbOption(SCreateDBInfo *pDBInfo);
// prefix show db.tables;
......
......@@ -120,7 +120,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
void tscDestroyLocalReducer(SSqlObj *pSql);
int32_t tscLocalDoReduce(SSqlObj *pSql);
int32_t tscDoLocalreduce(SSqlObj *pSql);
#ifdef __cplusplus
}
......
......@@ -29,9 +29,9 @@ extern "C" {
#include "tsclient.h"
#include "tsdb.h"
#define UTIL_METER_IS_METRIC(metaInfo) \
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_METRIC(metaInfo)))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
......@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter {
} SJoinSubquerySupporter;
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableDataBlocks** dataBlocks);
SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
......@@ -81,7 +81,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId,
int32_t startOffset, int32_t rowSize, const char* tableId, SMeterMeta* pMeterMeta,
STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx);
......@@ -95,23 +95,27 @@ SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
* @param pSql sql object
* @return
*/
bool tscIsPointInterpQuery(SSqlCmd* pCmd);
bool tscIsTWAQuery(SSqlCmd* pCmd);
bool tscProjectionQueryOnMetric(SSqlCmd* pCmd);
bool tscProjectionQueryOnTable(SSqlCmd* pCmd);
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsTwoStageMergeMetricQuery(SSqlCmd* pCmd);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SSqlCmd* pCmd);
bool tscQueryMetricTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SSqlCmd* pCmd, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t isTag);
void addRequiredTagColumn(SSqlCmd* pCmd, int32_t tagColIndex, int32_t tableIndex);
void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
int32_t setMeterID(SSqlObj* pSql, SSQLToken* pzTableName, int32_t tableIndex);
void tscClearInterpInfo(SSqlCmd* pCmd);
int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertOrImportData(char* sqlstr);
......@@ -125,29 +129,33 @@ void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIE
void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes);
void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible);
void tscFieldInfoCalOffset(SSqlCmd* pCmd);
void tscFieldInfoUpdateOffset(SSqlCmd* pCmd);
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
void tscFieldInfoCopy(SFieldInfo* src, SFieldInfo* dst, const int32_t* indexList, int32_t size);
void tscFieldInfoCopyAll(SFieldInfo* src, SFieldInfo* dst);
void tscFieldInfoCopyAll(SFieldInfo* dst, SFieldInfo* src);
TAOS_FIELD* tscFieldInfoGetField(SSqlCmd* pCmd, int32_t index);
int16_t tscFieldInfoGetOffset(SSqlCmd* pCmd, int32_t index);
int32_t tscGetResRowLength(SSqlCmd* pCmd);
TAOS_FIELD* tscFieldInfoGetField(SQueryInfo* pQueryInfo, int32_t index);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscGetResRowLength(SQueryInfo* pQueryInfo);
void tscClearFieldInfo(SFieldInfo* pFieldInfo);
int32_t tscNumOfFields(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(SFieldInfo* pFieldInfo1, SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes, int16_t tableIndex);
SSqlExpr* tscSqlExprInsert(SSqlCmd* pCmd, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SSqlExpr* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t interSize);
SSqlExpr* tscSqlExprInsertEmpty(SSqlCmd* pCmd, int32_t index, int16_t functionId);
SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId);
SSqlExpr* tscSqlExprUpdate(SSqlCmd* pCmd, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
SSqlExpr* tscSqlExprGet(SSqlCmd* pCmd, int32_t index);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid);
void* tscSqlExprDestroy(SSqlExpr* pExpr);
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo);
SColumnBase* tscColumnBaseInfoInsert(SSqlCmd* pCmd, SColumnIndex* colIndex);
SColumnBase* tscColumnBaseInfoInsert(SQueryInfo* pQueryInfo, SColumnIndex* colIndex);
void tscColumnFilterInfoCopy(SColumnFilterInfo* dst, const SColumnFilterInfo* src);
void tscColumnBaseCopy(SColumnBase* dst, const SColumnBase* src);
......@@ -162,7 +170,7 @@ int32_t tscValidateName(SSQLToken* pToken);
void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(SSqlCmd* pCmd, int32_t colId);
bool tscValidateColumnId(SMeterMetaInfo* pMeterMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex);
......@@ -171,30 +179,38 @@ void tsSetMetricQueryCond(STagCond* pTagCond, uint64_t uid, const char* str);
void tscTagCondCopy(STagCond* dest, const STagCond* src);
void tscTagCondRelease(STagCond* pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SSqlCmd* pCmd);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
void tscSetFreeHeatBeat(STscObj* pObj);
bool tscShouldFreeHeatBeat(SSqlObj* pHb);
void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscRemoveAllMeterMetaInfo(SSqlCmd* pCmd, bool removeFromCache);
SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd* pCmd, int32_t index);
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SSqlCmd* pCmd, uint64_t uid, int32_t* index);
void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo);
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache);
SMeterMetaInfo* tscAddMeterMetaInfo(SSqlCmd* pCmd, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta,
SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta,
int16_t numOfTags, int16_t* tags);
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SSqlCmd* pCmd);
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SSqlCmd* pCmd, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql);
int tscGetMeterMeta(SSqlObj* pSql, char* meterId, int32_t tableIndex);
int tscGetMeterMetaEx(SSqlObj* pSql, char* meterId, bool createIfNotExists);
void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex);
int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscAddTimestampColumn(SSqlCmd* pCmd, int16_t functionId, int16_t tableIndex);
void tscAddTimestampColumn(SQueryInfo* pQueryInfo, int16_t functionId, int16_t tableIndex);
void tscDoQuery(SSqlObj* pSql);
/**
......@@ -215,9 +231,9 @@ void tscDoQuery(SSqlObj* pSql);
* @return
*/
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t tableIndex);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SSqlCmd* pCmd, int32_t tagIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
int16_t tscGetJoinTagColIndexByUid(STagCond* pTagCond, uint64_t uid);
......@@ -226,7 +242,13 @@ TAOS* taos_connect_a(char* ip, char* user, char* pass, char* db, uint16_t port,
void sortRemoveDuplicates(STableDataBlocks* dataBuf);
void tscPrintSelectClause(SSqlCmd* pCmd);
void tscPrintSelectClause(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
#ifdef __cplusplus
}
......
......@@ -20,14 +20,6 @@
extern "C" {
#endif
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "os.h"
#include "taos.h"
#include "taosmsg.h"
......@@ -39,69 +31,8 @@ extern "C" {
#include "tsqlfunction.h"
#include "tutil.h"
#define TSC_GET_RESPTR_BASE(res, cmd, col, ord) \
((res->data + tscFieldInfoGetOffset(cmd, col) * res->numOfRows) + \
(1 - ord.order) * (res->numOfRows - 1) * tscFieldInfoGetField(cmd, col)->bytes)
enum _sql_cmd {
TSDB_SQL_SELECT,
TSDB_SQL_FETCH,
TSDB_SQL_INSERT,
TSDB_SQL_MGMT, // the SQL below is for mgmt node
TSDB_SQL_CREATE_DB,
TSDB_SQL_CREATE_TABLE,
TSDB_SQL_DROP_DB,
TSDB_SQL_DROP_TABLE,
TSDB_SQL_CREATE_ACCT,
TSDB_SQL_CREATE_USER,
TSDB_SQL_DROP_ACCT, // 10
TSDB_SQL_DROP_USER,
TSDB_SQL_ALTER_USER,
TSDB_SQL_ALTER_ACCT,
TSDB_SQL_ALTER_TABLE,
TSDB_SQL_ALTER_DB,
TSDB_SQL_CREATE_MNODE,
TSDB_SQL_DROP_MNODE,
TSDB_SQL_CREATE_DNODE,
TSDB_SQL_DROP_DNODE,
TSDB_SQL_CFG_DNODE, // 20
TSDB_SQL_CFG_MNODE,
TSDB_SQL_SHOW,
TSDB_SQL_RETRIEVE,
TSDB_SQL_KILL_QUERY,
TSDB_SQL_KILL_STREAM,
TSDB_SQL_KILL_CONNECTION,
TSDB_SQL_READ, // SQL below is for read operation
TSDB_SQL_CONNECT,
TSDB_SQL_USE_DB,
TSDB_SQL_META, // 30
TSDB_SQL_METRIC,
TSDB_SQL_MULTI_META,
TSDB_SQL_HB,
TSDB_SQL_LOCAL, // SQL below for client local
TSDB_SQL_DESCRIBE_TABLE,
TSDB_SQL_RETRIEVE_METRIC,
TSDB_SQL_METRIC_JOIN_RETRIEVE,
TSDB_SQL_RETRIEVE_TAGS,
/*
* build empty result instead of accessing dnode to fetch result
* reset the client cache
*/
TSDB_SQL_RETRIEVE_EMPTY_RESULT,
TSDB_SQL_RESET_CACHE, // 40
TSDB_SQL_SERV_STATUS,
TSDB_SQL_CURRENT_DB,
TSDB_SQL_SERV_VERSION,
TSDB_SQL_CLI_VERSION,
TSDB_SQL_CURRENT_USER,
TSDB_SQL_CFG_LOCAL,
TSDB_SQL_MAX
};
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
// forward declaration
struct SSqlInfo;
......@@ -182,13 +113,6 @@ typedef struct SColumnBaseInfo {
struct SLocalReducer;
// todo move to utility
typedef struct SString {
int32_t alloc;
int32_t n;
char * z;
} SString;
typedef struct SCond {
uint64_t uid;
char * cond;
......@@ -246,7 +170,7 @@ typedef struct STableDataBlocks {
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* to avoid it to be removed from cache
*/
SMeterMeta* pMeterMeta;
SMeterMeta *pMeterMeta;
union {
char *filename;
......@@ -268,53 +192,69 @@ typedef struct SDataBlockList {
STableDataBlocks **pData;
} SDataBlockList;
typedef struct {
SOrderVal order;
int command;
int count; // TODO refactor
union {
bool existsCheck; // check if the table exists
int8_t showType; // show command type
};
int8_t isParseFinish;
int8_t isInsertFromFile; // load data from file or not
bool import; // import/insert type
uint8_t msgType;
uint16_t type; // query type
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint16_t type; // query/insert/import type
char intervalTimeUnit;
int64_t etime, stime;
int64_t nAggTimeInterval; // aggregation time interval
int64_t nSlidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info
/*
* use to keep short request msg and error msg, in such case, SSqlCmd->payload == SSqlCmd->ext;
* create table/query/insert operations will exceed the TSDB_SQLCMD_SIZE.
*
* In such cases, allocate the memory dynamically, and need to free the memory
*/
uint32_t allocSize;
char * payload;
int payloadLen;
short numOfCols;
SColumnBaseInfo colList;
SFieldInfo fieldsInfo;
SSqlExprInfo exprsInfo;
SLimitVal limit;
SLimitVal slimit;
int64_t globalLimit;
STagCond tagCond;
SOrderVal order;
int16_t interpoType; // interpolate type
int16_t numOfTables;
// submit data blocks branched according to vnode
SDataBlockList * pDataBlocks;
SMeterMetaInfo **pMeterInfo;
struct STSBuf * tsBuf;
// todo use dynamic allocated memory for defaultVal
int64_t defaultVal[TSDB_MAX_COLUMNS]; // default value for interpolation
int64_t * defaultVal; // default value for interpolation
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset;
} SQueryInfo;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef struct {
int command;
uint8_t msgType;
union {
bool existsCheck; // check if the table exists or not
bool inStream; // denote if current sql is executed in stream or not
bool createOnDemand; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
};
union {
int32_t count;
int32_t numOfTablesInSubmit;
};
int32_t clauseIndex; // index of multiple subclause query
int8_t isParseFinish;
short numOfCols;
uint32_t allocSize;
char * payload;
int payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
// submit data blocks branched according to vnode
SDataBlockList *pDataBlocks;
// for parameter ('?') binding and batch processing
int32_t batchSize;
......@@ -330,8 +270,10 @@ struct STSBuf;
typedef struct {
uint8_t code;
int numOfRows; // num of results in current retrieved
int numOfTotal; // num of total results
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp;
int rspType;
int rspLen;
......@@ -394,9 +336,9 @@ typedef struct _sql_obj {
tsem_t emptyRspSem;
SSqlCmd cmd;
SSqlRes res;
uint16_t numOfSubs;
char* asyncTblPos;
void* pTableHashList;
uint8_t numOfSubs;
char * asyncTblPos;
void * pTableHashList;
struct _sql_obj **pSubs;
struct _sql_obj * prev, *next;
} SSqlObj;
......@@ -436,9 +378,11 @@ typedef struct {
} SIpStrList;
// tscSql API
int tsParseSql(SSqlObj *pSql, char *acct, char *db, bool multiVnodeInsertion);
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle);
int tscProcessSql(SSqlObj *pSql);
......@@ -457,15 +401,16 @@ int taos_retrieve(TAOS_RES *res);
* transfer function for metric query in stream computing, the function need to be change
* before send query message to vnode
*/
int32_t tscTansformSQLFunctionForMetricQuery(SSqlCmd *pCmd);
void tscRestoreSQLFunctionForMetricQuery(SSqlCmd *pCmd);
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SSqlCmd *pCmd, SSqlRes *pRes);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd);
void tscFreeResData(SSqlObj* pSql);
/**
* free query result of the sql object
......@@ -490,11 +435,13 @@ void tscFreeSqlObj(SSqlObj *pObj);
void tscCloseTscObj(STscObj *pObj);
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertForFile(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj);
bool tscHasReachLimitation(SSqlObj* pSql);
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
......@@ -516,6 +463,8 @@ extern int tsInsertHeadSize;
extern int tscNumOfThreads;
extern SIpStrList tscMgmtIpList;
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
#ifdef __cplusplus
}
#endif
......
此差异已折叠。
......@@ -17,6 +17,7 @@
#include "taosmsg.h"
#include "tast.h"
#include "tlog.h"
#include "tscSQLParser.h"
#include "tscSyntaxtreefunction.h"
#include "tschemautil.h"
#include "tsdb.h"
......@@ -26,7 +27,6 @@
#include "tstoken.h"
#include "ttypes.h"
#include "tutil.h"
#include "tscSQLParser.h"
/*
*
......@@ -115,6 +115,9 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
int32_t i = 0;
if (pToken->type == TK_ID) {
do {
SSQLToken tableToken = {0};
extractTableNameFromToken(pToken, &tableToken);
size_t len = strlen(pSchema[i].name);
if (strncmp(pToken->z, pSchema[i].name, pToken->n) == 0 && pToken->n == len) break;
} while (++i < numOfCols);
......
......@@ -26,19 +26,18 @@
#include "tutil.h"
#include "tnote.h"
void tscProcessFetchRow(SSchedMsg *pMsg);
void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());
/*
* proxy function to perform sequentially query&retrieve operation.
* If sql queries upon metric and two-stage merge procedure is not needed,
* it will sequentially query&retrieve data for all vnodes in pCmd->pMetricMeta
* Proxy function to perform sequentially query&retrieve operation.
* If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
* query), it will sequentially query&retrieve data for all vnodes
*/
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
......@@ -81,7 +80,6 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
return;
}
pSql->sqlstr = malloc(sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("%p failed to malloc sql string buffer", pSql);
......@@ -97,7 +95,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
strtolower(pSql->sqlstr, sqlstr);
tscDump("%p pObj:%p, Async SQL: %s", pSql, pObj, pSql->sqlstr);
int32_t code = tsParseSql(pSql, pObj->acctId, pObj->db, true);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code != TSDB_CODE_SUCCESS) {
......@@ -109,7 +107,7 @@ void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *,
tscDoQuery(pSql);
}
static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
if (tres == NULL) {
return;
}
......@@ -118,36 +116,32 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx
if (numOfRows == 0 && tscProjectionQueryOnMetric(pCmd)) {
// vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
/* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
(*pSql->fetchFp)(param, tres, 0);
if (numOfRows == 0) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
} else {
/*
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
/* update the limit value according to current retrieval results */
pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
pCmd->limit.offset = pRes->offset;
if ((++(pMeterMetaInfo->vnodeIndex)) < pMeterMetaInfo->pMetricMeta->numOfVnodes) {
tscTrace("%p retrieve data from next vnode:%d", pSql, pMeterMetaInfo->vnodeIndex);
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
(*pSql->fetchFp)(param, pSql, 0);
}
tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncRetrieveNextVnode;
tscProcessSql(pSql);
return;
}
} else { // localreducer has handle this situation
// local reducer has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) {
pRes->numOfTotal += pRes->numOfRows;
}
pRes->numOfTotalInCurrentClause += pRes->numOfRows;
}
(*pSql->fetchFp)(param, tres, numOfRows);
......@@ -164,7 +158,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if (pRes->qhandle == 0 || numOfRows != 0) {
if ((pRes->qhandle == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
} else {
......@@ -183,14 +177,18 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
}
/*
* retrieve callback for fetch rows proxy. It serves as the callback function of querying vnode
* retrieve callback for fetch rows proxy.
* The below two functions both serve as the callback function of query virtual node.
* query callback first, and then followed by retrieve callback
*/
static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchRowsProxy);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
// query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
}
static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncRetrieve);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
// query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
}
void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) {
......@@ -213,7 +211,7 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscProcessAsyncFetchRowsProxy;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param;
tscResetForNextRetrieve(pRes);
......@@ -248,8 +246,12 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
if (pRes->row >= pRes->numOfRows) {
tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncRetrieve;
pSql->fp = tscAsyncFetchSingleRowProxy;
if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
} else {
SSchedMsg schedMsg;
......@@ -261,47 +263,31 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
}
}
void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) {
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj *pSql = (SSqlObj *)tres;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (numOfRows == 0) {
// sequentially retrieve data from remain vnodes.
if (tscProjectionQueryOnMetric(pCmd)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
} else {
/*
* vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pMeterMetaInfo->vnodeIndex >= 0);
/* reach the maximum number of output rows, abort */
if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) {
(*pSql->fetchFp)(pSql->param, pSql, NULL);
return;
}
/* update the limit value according to current retrieval results */
pCmd->limit.limit = pCmd->globalLimit - pRes->numOfTotal;
if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) {
pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first.
tscResetForNextRetrieve(pRes);
pSql->fp = tscProcessAsyncContinueRetrieve;
tscProcessSql(pSql);
return;
}
} else {
(*pSql->fetchFp)(pSql->param, pSql, NULL);
}
} else {
for (int i = 0; i < pCmd->numOfCols; ++i)
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pCmd, i, pCmd->order) + pRes->bytes[i] * pRes->row;
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
}
}
void tscProcessFetchRow(SSchedMsg *pMsg) {
......@@ -309,10 +295,13 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
for (int i = 0; i < pCmd->numOfCols; ++i)
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pCmd, i, pCmd->order) + pRes->bytes[i] * pRes->row;
pRes->row++;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < pCmd->numOfCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row;
}
pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}
......@@ -371,7 +360,7 @@ void tscQueueAsyncRes(SSqlObj *pSql) {
tscTrace("%p SqlObj is freed, not add into queue async res", pSql);
return;
} else {
tscTrace("%p add into queued async res, code:%d", pSql, pSql->res.code);
tscError("%p add into queued async res, code:%d", pSql, pSql->res.code);
}
SSchedMsg schedMsg;
......@@ -404,10 +393,13 @@ void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows)
SSqlCmd *pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
assert(!pCmd->isInsertFromFile && pSql->signature == pSql);
assert(pCmd->dataSourceType != 0 && pSql->signature == pSql);
int32_t index = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
assert(pCmd->numOfTables == 1);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
......@@ -444,7 +436,6 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj *pSql = (SSqlObj *)param;
if (pSql == NULL || pSql->signature != pSql) return;
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
......@@ -465,9 +456,10 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->thandle, pSql->retry);
assert(tscGetMeterMetaInfo(&pSql->cmd, 0)->pMeterMeta == NULL);
tscGetMeterMeta(pSql, tscGetMeterMetaInfo(&pSql->cmd, 0)->name, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL);
tscGetMeterMeta(pSql, pMeterMetaInfo);
code = tscSendMsgToServer(pSql);
if (code != 0) {
pRes->code = code;
......@@ -485,24 +477,27 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
}
if (pSql->pStream == NULL) {
// check if it is a sub-query of metric query first, if true, enter another routine
if ((pSql->cmd.type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
// check if it is a sub-query of super table query first, if true, enter another routine
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj;
assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex &&
pMeterMetaInfo->pMeterMeta->numOfTags != 0);
tscTrace("%p get metricMeta during metric query successfully", pSql);
tscTrace("%p get metricMeta during super table query successfully", pSql);
code = tscGetMeterMeta(pSql, tscGetMeterMetaInfo(&pSql->cmd, 0)->name, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
code = tscGetMetricMeta(pSql);
code = tscGetMetricMeta(pSql, 0);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -510,8 +505,8 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->isParseFinish) {
tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
assert(code == TSDB_CODE_SUCCESS);
if (pMeterMetaInfo->pMeterMeta) {
......@@ -519,28 +514,28 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code == TSDB_CODE_SUCCESS) return;
}
} else {
code = tsParseSql(pSql, pObj->acctId, pObj->db, false);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
}
}
} else { // stream computing
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo->name, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0);
if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
code = tscGetMetricMeta(pSql);
if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
}
}
if (code != 0) {
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscQueueAsyncRes(pSql);
return;
}
......@@ -549,10 +544,12 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p stream:%p meta is updated, start new query, command:%d", pSql, pSql->pStream, pSql->cmd.command);
/*
* NOTE:
* transfer the sql function for metric query before get meter/metric meta,
* transfer the sql function for super table query before get meter/metric meta,
* since in callback functions, only tscProcessSql(pStream->pSql) is executed!
*/
tscTansformSQLFunctionForMetricQuery(&pSql->cmd);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream);
} else {
tscTrace("%p get meterMeta/metricMeta successfully", pSql);
......
......@@ -72,6 +72,8 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
void doFinalizer(SQLFunctionCtx *pCtx) { resetResultInfo(GET_RES_INFO(pCtx)); }
typedef struct tValuePair {
tVariant v;
int64_t timestamp;
......@@ -356,7 +358,7 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
}
resetResultInfo(GET_RES_INFO(pCtx));
doFinalizer(pCtx);
}
/*
......@@ -889,6 +891,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
// cannot set the numOfIteratedElems again since it is set during previous iteration
GET_RES_INFO(pCtx)->numOfRes = 1;
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////////////////
......@@ -910,6 +913,16 @@ static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin,
index = pCtx->preAggVals.maxIndex;
}
/**
* NOTE: work around the bug caused by invalid pre-calculated function.
* Here the selectivity + ts will not return correct value.
*
* The following codes of 3 lines will be removed later.
*/
if (index < 0 || index >= pCtx->size + pCtx->startOffset) {
index = 0;
}
TSKEY key = pCtx->ptsList[index];
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
......@@ -1424,7 +1437,7 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
SET_VAL(pCtx, 1, 1);
}
resetResultInfo(GET_RES_INFO(pCtx));
doFinalizer(pCtx);
}
//////////////////////////////////////////////////////////////////////////////////////
......@@ -1456,7 +1469,9 @@ static void first_function(SQLFunctionCtx *pCtx) {
}
memcpy(pCtx->aOutputBuf, data, pCtx->inputBytes);
DO_UPDATE_TAG_COLUMNS(pCtx, i);
TSKEY k = pCtx->ptsList[i];
DO_UPDATE_TAG_COLUMNS(pCtx, k);
SResultInfo *pInfo = GET_RES_INFO(pCtx);
pInfo->hasResult = DATA_SET_FLAG;
......@@ -1824,7 +1839,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
}
GET_RES_INFO(pCtx)->numOfRes = 1;
resetResultInfo(GET_RES_INFO(pCtx));
doFinalizer(pCtx);
}
//////////////////////////////////////////////////////////////////////////////////
......@@ -2005,15 +2020,8 @@ static void copyTopBotRes(SQLFunctionCtx *pCtx, int32_t type) {
STopBotInfo *pRes = pResInfo->interResultBuf;
tValuePair **tvp = pRes->res;
int32_t step = 0;
// in case of second stage merge, always use incremental output.
if (pCtx->currentStage == SECONDARY_STAGE_MERGE) {
step = QUERY_ASC_FORWARD_STEP;
} else {
step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
}
int32_t step = QUERY_ASC_FORWARD_STEP;
int32_t len = GET_RES_INFO(pCtx)->numOfRes;
switch (type) {
......@@ -2393,7 +2401,7 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
GET_TRUE_DATA_TYPE();
copyTopBotRes(pCtx, type);
resetResultInfo(pResInfo);
doFinalizer(pCtx);
}
///////////////////////////////////////////////////////////////////////////////////////////////
......@@ -2470,7 +2478,7 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
tOrderDescDestroy(pMemBucket->pOrderDesc);
tMemBucketDestroy(pMemBucket);
resetResultInfo(GET_RES_INFO(pCtx));
doFinalizer(pCtx);
}
//////////////////////////////////////////////////////////////////////////////////
......@@ -2679,7 +2687,7 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
}
}
resetResultInfo(pResInfo);
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////
......@@ -2859,7 +2867,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
param[1][2] /= param[1][1];
sprintf(pCtx->aOutputBuf, "(%lf, %lf)", param[0][2], param[1][2]);
resetResultInfo(GET_RES_INFO(pCtx));
doFinalizer(pCtx);
}
static void date_col_output_function(SQLFunctionCtx *pCtx) {
......@@ -2878,17 +2886,17 @@ static FORCE_INLINE void date_col_output_function_f(SQLFunctionCtx *pCtx, int32_
static void col_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size);
char *pDest = 0;
char *pData = GET_INPUT_CHAR(pCtx);
if (pCtx->order == TSQL_SO_ASC) {
pDest = pCtx->aOutputBuf;
memcpy(pCtx->aOutputBuf, pData, (size_t)pCtx->size * pCtx->inputBytes);
} else {
pDest = pCtx->aOutputBuf - (pCtx->size - 1) * pCtx->inputBytes;
for(int32_t i = 0; i < pCtx->size; ++i) {
memcpy(pCtx->aOutputBuf + (pCtx->size - 1 - i) * pCtx->inputBytes, pData + i * pCtx->inputBytes,
pCtx->inputBytes);
}
}
char *pData = GET_INPUT_CHAR(pCtx);
memcpy(pDest, pData, (size_t)pCtx->size * pCtx->inputBytes);
pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->size * pCtx->outputBytes;
}
static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
......@@ -2903,7 +2911,7 @@ static void col_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
memcpy(pCtx->aOutputBuf, pData, pCtx->inputBytes);
pCtx->aOutputBuf += pCtx->inputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->inputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
}
/**
......@@ -2915,18 +2923,17 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL(pCtx, pCtx->size);
assert(pCtx->inputBytes == pCtx->outputBytes);
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
for (int32_t i = 0; i < pCtx->size; ++i) {
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType);
pCtx->aOutputBuf += pCtx->outputBytes * factor;
pCtx->aOutputBuf += pCtx->outputBytes;
}
}
static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) {
INC_INIT_VAL(pCtx, 1);
tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType);
pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->outputBytes;
}
/**
......@@ -2975,8 +2982,8 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSQL_SO_ASC) ? 0 : pCtx->size - 1;
TSKEY * pTimestamp = pCtx->ptsOutputBuf;
switch (pCtx->inputType) {
......@@ -2996,14 +3003,14 @@ static void diff_function(SQLFunctionCtx *pCtx) {
*pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
} else {
*pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64Key = pData[i];
......@@ -3028,14 +3035,14 @@ static void diff_function(SQLFunctionCtx *pCtx) {
*pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
} else {
*pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64Key = pData[i];
......@@ -3059,13 +3066,13 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].dKey;
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
} else {
*pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].dKey = pData[i];
......@@ -3089,13 +3096,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].dKey;
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
} else {
*pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
}
// keep the last value, the remain may be all null
......@@ -3120,13 +3129,14 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
} else {
*pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64Key = pData[i];
......@@ -3150,13 +3160,15 @@ static void diff_function(SQLFunctionCtx *pCtx) {
} else if ((i == 0 && pCtx->order == TSQL_SO_ASC) || (i == pCtx->size - 1 && pCtx->order == TSQL_SO_DESC)) {
*pOutput = pData[i] - pCtx->param[1].i64Key;
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
} else {
*pOutput = pData[i] - pData[i - step];
*pTimestamp = pCtx->ptsList[i];
pOutput += step;
pTimestamp += step;
pOutput += 1;
pTimestamp += 1;
}
pCtx->param[1].i64Key = pData[i];
......@@ -3181,8 +3193,8 @@ static void diff_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += forwardStep;
pCtx->aOutputBuf = pCtx->aOutputBuf + forwardStep * pCtx->outputBytes * step;
pCtx->ptsOutputBuf = (char *)pCtx->ptsOutputBuf + forwardStep * TSDB_KEYSIZE * step;
pCtx->aOutputBuf += forwardStep * pCtx->outputBytes;
pCtx->ptsOutputBuf += forwardStep * TSDB_KEYSIZE;
}
}
......@@ -3209,7 +3221,7 @@ static void diff_function_f(SQLFunctionCtx *pCtx, int32_t index) {
GET_RES_INFO(pCtx)->numOfRes += 1;
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t step = 1/*GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_INT: {
......@@ -3277,7 +3289,8 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
pCtx->param[1].pz = NULL;
}
static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
......@@ -3288,7 +3301,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
}
#define LIST_MINMAX_N(ctx, minOutput, maxOutput, elemCnt, data, type, tsdbType, numOfNotNullElem) \
......@@ -3504,7 +3517,6 @@ void spread_func_sec_merge(SQLFunctionCtx *pCtx) {
pCtx->param[3].dKey = pData->max;
}
// pCtx->numOfIteratedElems += 1;
GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
}
......@@ -3537,8 +3549,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
*(double *)pCtx->aOutputBuf = pInfo->max - pInfo->min;
}
// SET_VAL(pCtx, pCtx->numOfIteratedElems, 1);
resetResultInfo(GET_RES_INFO(pCtx));
GET_RES_INFO(pCtx)->numOfRes = 1; // todo add test case
}
/*
......@@ -4171,7 +4182,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
}
GET_RES_INFO(pCtx)->numOfRes = 1;
resetResultInfo(GET_RES_INFO(pCtx));
doFinalizer(pCtx);
}
/**
......@@ -4333,7 +4344,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
strcpy(pCtx->aOutputBuf, pTSbuf->path);
tsBufDestory(pTSbuf);
resetResultInfo(GET_RES_INFO(pCtx));
doFinalizer(pCtx);
}
/*
......@@ -4373,7 +4384,7 @@ SQLAggFuncElem aAggs[28] = {{
count_function,
count_function_f,
no_next_step,
noop1,
doFinalizer,
count_func_merge,
count_func_merge,
count_load_data_info,
......@@ -4616,7 +4627,7 @@ SQLAggFuncElem aAggs[28] = {{
date_col_output_function,
date_col_output_function_f,
no_next_step,
noop1,
doFinalizer,
copy_function,
copy_function,
no_data_info,
......@@ -4631,7 +4642,7 @@ SQLAggFuncElem aAggs[28] = {{
noop1,
noop2,
no_next_step,
noop1,
doFinalizer,
copy_function,
copy_function,
data_req_load_info,
......@@ -4646,7 +4657,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_function,
noop2,
no_next_step,
noop1,
doFinalizer,
copy_function,
copy_function,
no_data_info,
......@@ -4676,7 +4687,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_function,
tag_function_f,
no_next_step,
noop1,
doFinalizer,
copy_function,
copy_function,
no_data_info,
......@@ -4691,7 +4702,7 @@ SQLAggFuncElem aAggs[28] = {{
col_project_function,
col_project_function_f,
no_next_step,
noop1,
doFinalizer,
copy_function,
copy_function,
data_req_load_info,
......@@ -4706,7 +4717,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_project_function,
tag_project_function_f,
no_next_step,
noop1,
doFinalizer,
copy_function,
copy_function,
no_data_info,
......@@ -4721,7 +4732,7 @@ SQLAggFuncElem aAggs[28] = {{
arithmetic_function,
arithmetic_function_f,
no_next_step,
noop1,
doFinalizer,
copy_function,
copy_function,
data_req_load_info,
......@@ -4736,7 +4747,7 @@ SQLAggFuncElem aAggs[28] = {{
diff_function,
diff_function_f,
no_next_step,
noop1,
doFinalizer,
noop1,
noop1,
data_req_load_info,
......@@ -4782,7 +4793,7 @@ SQLAggFuncElem aAggs[28] = {{
interp_function,
do_sum_f, // todo filter handle
no_next_step,
noop1,
doFinalizer,
noop1,
copy_function,
no_data_info,
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -125,7 +125,7 @@ extern "C" {
#define TSDB_CODE_BATCH_SIZE_TOO_BIG 104
#define TSDB_CODE_TIMESTAMP_OUT_OF_RANGE 105
#define TSDB_CODE_INVALID_QUERY_MSG 106 // failed to validate the sql expression msg by vnode
#define TSDB_CODE_CACHE_BLOCK_TS_DISORDERED 107 // time stamp in cache block is disordered
#define TSDB_CODE_SORTED_RES_TOO_MANY 107 // too many result for ordered super table projection query
#define TSDB_CODE_FILE_BLOCK_TS_DISORDERED 108 // time stamp in file block is disordered
#define TSDB_CODE_INVALID_COMMIT_LOG 109 // commit log init failed
#define TSDB_CODE_SERV_NO_DISKSPACE 110
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -162,8 +162,8 @@ typedef struct SExtTagsInfo {
// sql function runtime context
typedef struct SQLFunctionCtx {
int32_t startOffset;
int32_t size;
int32_t order;
int32_t size; // number of rows
int32_t order; // asc|desc
int32_t scanFlag; // TODO merge with currentStage
int16_t inputType;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册