提交 71b723e8 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-13066-3.0

......@@ -17,6 +17,7 @@
#define _TD_INDEX_H_
#include "os.h"
#include "taoserror.h"
#include "tarray.h"
#ifdef __cplusplus
......@@ -41,11 +42,12 @@ typedef enum {
UPDATE_VALUE, // update index column value
ADD_INDEX, // add index on specify column
DROP_INDEX, // drop existed index
DROP_SATBLE // drop stable
DROP_SATBLE, // drop stable
DEFAULT // query
} SIndexOperOnColumn;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3, QUERY_RANGE = 4 } EIndexQueryType;
typedef enum { MUST = 0, SHOULD, NOT } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX, QUERY_SUFFIX, QUERY_REGEX, QUERY_RANGE } EIndexQueryType;
/*
* create multi query
......@@ -166,8 +168,8 @@ void indexOptsDestroy(SIndexOpts* opts);
* @param:
*/
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal);
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, int8_t qType, uint8_t colType,
const char* colName, int32_t nColName, const char* colVal, int32_t nColVal);
void indexTermDestroy(SIndexTerm* p);
/*
......
......@@ -264,7 +264,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
#define TSDB_CODE_MND_TRANS_CANT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4)
#define TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4)
// mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
......
......@@ -57,11 +57,11 @@ typedef enum {
TRN_STAGE_PREPARE = 0,
TRN_STAGE_REDO_LOG = 1,
TRN_STAGE_REDO_ACTION = 2,
TRN_STAGE_COMMIT = 3,
TRN_STAGE_COMMIT_LOG = 4,
TRN_STAGE_UNDO_ACTION = 5,
TRN_STAGE_UNDO_LOG = 6,
TRN_STAGE_ROLLBACK = 7,
TRN_STAGE_ROLLBACK = 3,
TRN_STAGE_UNDO_ACTION = 4,
TRN_STAGE_UNDO_LOG = 5,
TRN_STAGE_COMMIT = 6,
TRN_STAGE_COMMIT_LOG = 7,
TRN_STAGE_FINISHED = 8
} ETrnStage;
......@@ -72,6 +72,7 @@ typedef enum {
TRN_TYPE_DROP_USER = 1003,
TRN_TYPE_CREATE_FUNC = 1004,
TRN_TYPE_DROP_FUNC = 1005,
TRN_TYPE_CREATE_SNODE = 1006,
TRN_TYPE_DROP_SNODE = 1007,
TRN_TYPE_CREATE_QNODE = 1008,
......@@ -91,10 +92,12 @@ typedef enum {
TRN_TYPE_CONSUMER_LOST = 1022,
TRN_TYPE_CONSUMER_RECOVER = 1023,
TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001,
TRN_TYPE_DROP_DNODE = 2002,
TRN_TYPE_GLOBAL_SCOPE_END,
TRN_TYPE_DB_SCOPE = 3000,
TRN_TYPE_CREATE_DB = 3001,
TRN_TYPE_ALTER_DB = 3002,
......@@ -102,6 +105,7 @@ typedef enum {
TRN_TYPE_SPLIT_VGROUP = 3004,
TRN_TYPE_MERGE_VGROUP = 3015,
TRN_TYPE_DB_SCOPE_END,
TRN_TYPE_STB_SCOPE = 4000,
TRN_TYPE_CREATE_STB = 4001,
TRN_TYPE_ALTER_STB = 4002,
......@@ -131,7 +135,7 @@ typedef struct {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
ETrnType transType;
ETrnType type;
int32_t code;
int32_t failedTimes;
void* rpcHandle;
......
......@@ -24,9 +24,12 @@ extern "C" {
int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode);
SUserObj *mndAcquireUser(SMnode *pMnode, char *userName);
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName);
void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
// for trans test
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
#ifdef __cplusplus
}
#endif
......
......@@ -25,7 +25,6 @@
#define USER_RESERVE_SIZE 64
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
static SSdbRaw *mndUserActionEncode(SUserObj *pUser);
static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw);
static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser);
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser);
......@@ -90,7 +89,7 @@ static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
return 0;
}
static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
......@@ -238,7 +237,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
return 0;
}
SUserObj *mndAcquireUser(SMnode *pMnode, char *userName) {
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName) {
SSdb *pSdb = pMnode->pSdb;
SUserObj *pUser = sdbAcquire(pSdb, SDB_USER, userName);
if (pUser == NULL) {
......
aux_source_directory(. MNODE_TRANS_TEST_SRC)
add_executable(transTest ${MNODE_TRANS_TEST_SRC})
add_executable(transTest1 "")
target_sources(transTest1
PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/trans1.cpp"
)
target_link_libraries(
transTest
transTest1
PUBLIC sut
)
target_include_directories(
transTest1
PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
)
add_test(
NAME transTest1
COMMAND transTest1
)
add_executable(transTest2 "")
target_sources(transTest2
PRIVATE
"${CMAKE_CURRENT_SOURCE_DIR}/trans2.cpp"
)
target_link_libraries(
transTest2
PUBLIC dnode mnode gtest_main
)
target_include_directories(
transTest2
PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
)
add_test(
NAME transTest
COMMAND transTest
NAME transTest2
COMMAND transTest2
)
......@@ -11,10 +11,10 @@
#include "sut.h"
class MndTestTrans : public ::testing::Test {
class MndTestTrans1 : public ::testing::Test {
protected:
static void SetUpTestSuite() {
test.Init("/tmp/mnode_test_trans", 9013);
test.Init("/tmp/mnode_test_trans1", 9013);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9013";
server2.Start("/tmp/mnode_test_trans2", fqdn, 9020, firstEp);
......@@ -26,7 +26,7 @@ class MndTestTrans : public ::testing::Test {
}
static void KillThenRestartServer() {
char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data";
char file[PATH_MAX] = "/tmp/mnode_test_trans1/mnode/data/sdb.data";
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
int32_t size = 3 * 1024 * 1024;
void* buffer = taosMemoryMalloc(size);
......@@ -60,10 +60,10 @@ class MndTestTrans : public ::testing::Test {
void TearDown() override {}
};
Testbase MndTestTrans::test;
TestServer MndTestTrans::server2;
Testbase MndTestTrans1::test;
TestServer MndTestTrans1::server2;
TEST_F(MndTestTrans, 00_Create_User_Crash) {
TEST_F(MndTestTrans1, 00_Create_User_Crash) {
{
test.SendShowReq(TSDB_MGMT_TABLE_TRANS, "trans", "");
EXPECT_EQ(test.GetShowRows(), 0);
......@@ -83,7 +83,7 @@ TEST_F(MndTestTrans, 00_Create_User_Crash) {
}
}
TEST_F(MndTestTrans, 01_Create_User_Crash) {
TEST_F(MndTestTrans1, 01_Create_User_Crash) {
{
SCreateUserReq createReq = {0};
strcpy(createReq.user, "u1");
......@@ -107,7 +107,7 @@ TEST_F(MndTestTrans, 01_Create_User_Crash) {
EXPECT_EQ(test.GetShowRows(), 2);
}
TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) {
TEST_F(MndTestTrans1, 02_Create_Qnode1_Crash) {
{
SMCreateQnodeReq createReq = {0};
createReq.dnodeId = 1;
......@@ -142,7 +142,7 @@ TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) {
}
}
TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
TEST_F(MndTestTrans1, 03_Create_Qnode2_Crash) {
{
SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
......
/**
* @file trans.cpp
* @author slguan (slguan@taosdata.com)
* @brief MNODE module trans tests
* @version 1.0
* @date 2022-05-02
*
* @copyright Copyright (c) 2022
*
*/
#include <gtest/gtest.h>
#include "mndTrans.h"
#include "mndUser.h"
#include "tcache.h"
void reportStartup(SMgmtWrapper *pWrapper, const char *name, const char *desc) {}
class MndTestTrans2 : public ::testing::Test {
protected:
static void SetUpTestSuite() {
dDebugFlag = 143;
vDebugFlag = 0;
mDebugFlag = 207;
cDebugFlag = 0;
jniDebugFlag = 0;
tmrDebugFlag = 135;
uDebugFlag = 135;
rpcDebugFlag = 143;
qDebugFlag = 0;
wDebugFlag = 0;
sDebugFlag = 0;
tsdbDebugFlag = 0;
tsLogEmbedded = 1;
tsAsyncLog = 0;
const char *logpath = "/tmp/td";
taosRemoveDir(logpath);
taosMkDir(logpath);
tstrncpy(tsLogDir, logpath, PATH_MAX);
if (taosInitLog("taosdlog", 1) != 0) {
printf("failed to init log file\n");
}
walInit();
static SMsgCb msgCb = {0};
msgCb.reportStartupFp = reportStartup;
msgCb.pWrapper = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefaultMsgCb(&msgCb);
SMnodeOpt opt = {0};
opt.deploy = 1;
opt.replica = 1;
opt.replicas[0].id = 1;
opt.replicas[0].port = 9040;
strcpy(opt.replicas[0].fqdn, "localhost");
opt.msgCb = msgCb;
const char *mnodepath = "/tmp/mnode_test_trans";
taosRemoveDir(mnodepath);
pMnode = mndOpen(mnodepath, &opt);
}
static void TearDownTestSuite() {
mndClose(pMnode);
walCleanUp();
taosCloseLog();
taosStopCacheRefreshWorker();
}
static SMnode *pMnode;
public:
void SetUp() override {}
void TearDown() override {}
void CreateUser(const char *user) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, "root", TSDB_USER_LEN);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
char *param = strdup("====> test param <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans);
}
};
SMnode *MndTestTrans2::pMnode;
TEST_F(MndTestTrans2, 01_CbFunc) {
ASSERT_NE(pMnode, nullptr);
const char *user1 = "test1";
CreateUser(user1);
SUserObj *pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
}
......@@ -155,7 +155,6 @@ static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbReadHandle* pTsdbReadHandle);
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
......@@ -1337,6 +1336,8 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
return code;
}
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
bool* exists) {
SQueryFilePos* cur = &pTsdbReadHandle->cur;
......
......@@ -73,7 +73,7 @@ typedef struct SResKeyPos {
} SResKeyPos;
typedef struct SResultRowInfo {
SResultRowPosition *pPosition;
SResultRowPosition *pPosition; // todo remove this
int32_t size; // number of result set
int32_t capacity; // max capacity
SResultRowPosition cur;
......
......@@ -20,6 +20,12 @@
extern "C" {
#endif
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
#ifdef __cplusplus
}
......
......@@ -40,6 +40,7 @@ extern "C" {
#include "tpagedbuf.h"
#include "vnode.h"
#include "executorInt.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
......@@ -118,6 +119,17 @@ typedef struct SLimit {
int64_t offset;
} SLimit;
typedef struct SFileBlockLoadRecorder {
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t skipBlocks;
uint32_t filterOutBlocks;
uint64_t elapsedTime;
} SFileBlockLoadRecorder;
typedef struct STaskCostInfo {
int64_t created;
int64_t start;
......@@ -131,14 +143,10 @@ typedef struct STaskCostInfo {
uint64_t loadDataInCacheSize;
uint64_t loadDataTime;
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t skipBlocks;
uint32_t filterOutBlocks;
SFileBlockLoadRecorder* pRecoder;
uint64_t elapsedTime;
uint64_t firstStageMergeTime;
uint64_t winInfoSize;
uint64_t tableInfoSize;
......@@ -196,7 +204,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain);
......@@ -267,7 +275,7 @@ typedef struct SOperatorFpSet {
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blockingOptr; // block operator or not
bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results
char* name; // name, used to show the query execution plan
......@@ -332,17 +340,14 @@ typedef struct SScanInfo {
typedef struct STableScanInfo {
void* dataReader;
int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped;
int32_t numOfBlockStatis;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
// int32_t prevGroupId; // previous table group id
SScanInfo scanInfo;
int32_t current;
SNode* pFilterNode; // filter operator info
SqlFunctionCtx* pCtx; // next operator query context
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset;
SExprInfo* pExpr;
......@@ -396,7 +401,6 @@ typedef struct SSysTableScanInfo {
SArray* scanCols; // SArray<int16_t> scan column id list
SName name;
SSDataBlock* pRes;
int32_t capacity;
int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo;
} SSysTableScanInfo;
......@@ -424,7 +428,7 @@ typedef struct STimeWindowSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct STableIntervalOperatorInfo {
typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
......@@ -439,7 +443,7 @@ typedef struct STableIntervalOperatorInfo {
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info
} STableIntervalOperatorInfo;
} SIntervalAggOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
......@@ -478,16 +482,8 @@ typedef struct SFillOperatorInfo {
void** p;
SSDataBlock* existNewGroupBlock;
bool multigroupResult;
SInterval intervalInfo;
} SFillOperatorInfo;
typedef struct {
char* pData;
bool isNull;
int16_t type;
int32_t bytes;
} SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols; // group by columns, SArray<SColumn>
......@@ -540,6 +536,7 @@ typedef struct SSessionAggOperatorInfo {
SWindowRowsSup winSup;
bool reptScan; // next round scan
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
} SSessionAggOperatorInfo;
......@@ -557,6 +554,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t colIndex; // start row index
bool hasKey;
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
// bool reptScan;
} SStateWindowOperatorInfo;
......@@ -613,6 +611,9 @@ typedef struct SJoinOperatorInfo {
SNode *pOnCondition;
} SJoinOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain);
......@@ -623,8 +624,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset, SqlFunctionCtx* pCtx);
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
......@@ -642,6 +643,16 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
void cleanupAggSup(SAggSupporter* pAggSup);
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowCellInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
......@@ -663,10 +674,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
......@@ -676,14 +689,15 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
SArray* pTableIdList, SExecTaskInfo* pTaskInfo, SNode* pConditions);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
......@@ -704,7 +718,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
......
......@@ -27,13 +27,12 @@ extern "C" {
struct SSDataBlock;
typedef struct SFillColInfo {
// STColumn col; // column info
SResSchema col;
int16_t functionId; // sql function id
SExprInfo *pExpr;
// SResSchema schema;
// int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
int32_t offset;
union {int64_t i; double d;} val;
SVariant fillVal;
} SFillColInfo;
typedef struct {
......@@ -56,9 +55,10 @@ typedef struct SFillInfo {
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
SInterval interval;
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data
char** pData; // original result data block involved in filling data
SArray *prev;
SArray *next;
SSDataBlock *pSrcBlock;
int32_t alloc; // data buffer size in rows
SFillColInfo* pFillCol; // column info for fill operations
......@@ -72,7 +72,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SNodeListNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
......@@ -80,7 +80,7 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3
struct SFillColInfo* pCol, const char* id);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
......
......@@ -154,14 +154,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
bool newgroup = false;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0;
st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup);
int64_t st = taosGetTimestampUs();
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
......
......@@ -256,7 +256,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
}
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
while(1) {
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
......@@ -353,7 +353,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
pOperator->name = "GroupbyAggOperator";
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExprInfo;
......@@ -537,11 +537,12 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->pageIndex += 1;
blockDataUpdateTsWindow(pInfo->binfo.pRes);
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
return pInfo->binfo.pRes;
}
static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
......@@ -558,7 +559,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
......@@ -611,7 +612,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pOperator->name = "PartitionOperator";
pOperator->blockingOptr = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResultBlock;
......
......@@ -55,13 +55,27 @@ typedef struct SIFParam {
SArray *result;
char * condValue;
uint8_t colValType;
col_id_t colId;
int64_t suid; // add later
char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
} SIFParam;
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
if (src == OP_TYPE_GREATER_THAN || src == OP_TYPE_GREATER_EQUAL || src == OP_TYPE_LOWER_THAN ||
src == OP_TYPE_LOWER_EQUAL) {
*dst = QUERY_RANGE;
} else if (src == OP_TYPE_EQUAL) {
*dst = QUERY_TERM;
} else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) {
*dst = QUERY_REGEX;
} else {
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
// construct tag filter operator later
static void destroyTagFilterOperatorInfo(void *param) {
......@@ -145,10 +159,11 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SColumnNode *cn = (SColumnNode *)node;
/*only support tag column*/
SIF_ERR_RET(sifValidateColumn(cn));
param->colId = cn->colId;
param->colValType = cn->node.resType.type;
memcpy(param->dbName, cn->dbName, sizeof(cn->dbName));
memcpy(param->colName, cn->colName, sizeof(cn->colName));
break;
}
case QUERY_NODE_NODE_LIST: {
......@@ -231,61 +246,76 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
qError("index-filter not support buildin function");
return TSDB_CODE_QRY_INVALID_INPUT;
}
static int32_t sifIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexMultiTermQuery *mq = indexMultiTermQueryCreate(MUST);
return TSDB_CODE_SUCCESS;
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName,
strlen(left->colName), right->condValue, strlen(right->condValue));
if (operType == OP_TYPE_LOWER_EQUAL || operType == OP_TYPE_GREATER_EQUAL || operType == OP_TYPE_GREATER_THAN ||
operType == OP_TYPE_LOWER_THAN) {
}
if (tm == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
EIndexQueryType qtype = 0;
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
indexMultiTermQueryAdd(mtm, tm, qtype);
int ret = indexSearch(NULL, mtm, output->result);
indexMultiTermQueryDestroy(mtm);
return ret;
}
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_LOWER_THAN;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_LOWER_EQUAL;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_GREATER_THAN;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_GREATER_EQUAL;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_EQUAL;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NOT_EQUAL;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_IN;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NOT_IN;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_LIKE;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NOT_LIKE;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_MATCH;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NMATCH;
return sifIndex(left, right, id, output);
return sifDoIndex(left, right, id, output);
}
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// add more except
......@@ -460,6 +490,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
qError("index-filter failed to taosHashInit");
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
SIF_ERR_RET(ctx.code);
......@@ -498,6 +529,7 @@ SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) {
if (pFilterNode == NULL) {
return SFLT_NOT_INDEX;
}
// impl later
return SFLT_ACCURATE_INDEX;
}
此差异已折叠。
此差异已折叠。
......@@ -55,7 +55,7 @@ typedef struct SDummyInputInfo {
SSDataBlock* pBlock;
} SDummyInputInfo;
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
......@@ -121,7 +121,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
if (pInfo->current >= pInfo->totalPages) {
return NULL;
......
......@@ -78,6 +78,8 @@ typedef struct SDiffInfo {
int64_t i64;
double d64;
} prev;
int64_t prevTs;
} SDiffInfo;
typedef struct SSpreadInfo {
......@@ -1196,9 +1198,6 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
bool isFirstBlock = (pDiffInfo->hasPrev == false);
int32_t numOfElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
......@@ -1206,7 +1205,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
switch (pInputCol->info.type) {
case TSDB_DATA_TYPE_INT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (pCtx->order == TSDB_ORDER_ASC) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
......@@ -1238,12 +1238,53 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
pDiffInfo->hasPrev = true;
numOfElems++;
}
} else {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
int32_t pos = startOffset + numOfElems;
// there is a row of previous data block to be handled in the first place.
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
}
pDiffInfo->hasPrev = false;
}
// it is not the last row of current block
if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);
int32_t delta = v - next; // direct previous may be null
colDataAppendInt32(pOutput, pos, &delta);
if (pTsOutput != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
} else {
pDiffInfo->prev.i64 = v;
if (pTsOutput != NULL) {
pDiffInfo->prevTs = tsList[i];
}
pDiffInfo->hasPrev = true;
}
numOfElems++;
}
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
......@@ -1378,7 +1419,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
}
// initial value is not set yet
if (!pDiffInfo->hasPrev || numOfElems <= 0) {
if (numOfElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
......@@ -1386,15 +1427,7 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
assert(pCtx->hasNull);
return 0;
} else {
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
// if (tagCtx->functionId == TSDB_FUNC_TAG_DUMMY) {
// aAggs[TSDB_FUNC_TAGPRJ].xFunction(tagCtx);
// }
// }
int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
return forwardStep;
return (isFirstBlock) ? numOfElems - 1 : numOfElems;
}
}
......
......@@ -86,6 +86,7 @@ typedef struct SIndexTerm {
int32_t nColName;
char* colVal;
int32_t nColVal;
int8_t qType; // just use for range
} SIndexTerm;
typedef struct SIndexTermQuery {
......
......@@ -175,55 +175,19 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
return 0;
}
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
#ifdef USE_LUCENE
EIndexOperatorType opera = multiQuerys->opera;
int nQuery = taosArrayGetSize(multiQuerys->query);
char** fields = taosMemoryMalloc(sizeof(char*) * nQuery);
char** keys = taosMemoryMalloc(sizeof(char*) * nQuery);
int* types = taosMemoryMalloc(sizeof(int) * nQuery);
for (int i = 0; i < nQuery; i++) {
SIndexTermQuery* p = taosArrayGet(multiQuerys->query, i);
SIndexTerm* term = p->field_value;
fields[i] = taosMemoryCalloc(1, term->nKey + 1);
keys[i] = taosMemoryCalloc(1, term->nVal + 1);
memcpy(fields[i], term->key, term->nKey);
memcpy(keys[i], term->val, term->nVal);
types[i] = (int)(p->type);
}
int* tResult = NULL;
int tsz = 0;
index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
for (int i = 0; i < tsz; i++) {
taosArrayPush(result, &tResult[i]);
}
for (int i = 0; i < nQuery; i++) {
taosMemoryFree(fields[i]);
taosMemoryFree(keys[i]);
}
taosMemoryFree(fields);
taosMemoryFree(keys);
taosMemoryFree(types);
#endif
#ifdef USE_INVERTED_INDEX
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray* interResults = taosArrayInit(4, POINTER_BYTES);
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
int nQuery = taosArrayGetSize(multiQuerys->query);
for (size_t i = 0; i < nQuery; i++) {
SIndexTermQuery* qTerm = taosArrayGet(multiQuerys->query, i);
SArray* tResult = NULL;
indexTermSearch(index, qTerm, &tResult);
taosArrayPush(interResults, (void*)&tResult);
SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
SArray* trslt = NULL;
indexTermSearch(index, qterm, &trslt);
taosArrayPush(iRslts, (void*)&trslt);
}
indexMergeFinalResults(interResults, opera, result);
indexInterResultsDestroy(interResults);
indexMergeFinalResults(iRslts, opera, result);
indexInterResultsDestroy(iRslts);
#endif
return 0;
......@@ -280,8 +244,8 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
return 0;
}
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryType, uint8_t colType,
const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
if (tm == NULL) {
return NULL;
......@@ -298,6 +262,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
memcpy(tm->colVal, colVal, nColVal);
tm->nColVal = nColVal;
tm->qType = queryType;
return tm;
}
......
......@@ -34,9 +34,64 @@ static char* indexCacheTermGet(const void* pData);
static MemTable* indexInternalCacheCreate(int8_t type);
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchRange};
static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera);
static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
if (cache == NULL) {
return 0;
}
MemTable* mem = cache;
char* key = indexCacheTermGet(ct);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
if (node == NULL) {
break;
}
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (0 == strcmp(c->colVal, ct->colVal)) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
} else {
break;
}
}
tSkipListDestroyIter(iter);
return 0;
}
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
// impl later
return 0;
}
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
......@@ -263,33 +318,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SI
if (mem == NULL) {
return 0;
}
char* key = indexCacheTermGet(ct);
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
if (node != NULL) {
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
if (qtype == QUERY_TERM) {
if (0 == strcmp(c->colVal, ct->colVal)) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
} else {
break;
}
} else if (qtype == QUERY_PREFIX) {
} else if (qtype == QUERY_SUFFIX) {
} else if (qtype == QUERY_RANGE) {
}
}
}
tSkipListDestroyIter(iter);
return 0;
return cacheSearch[qtype](mem, ct, tr, s);
}
int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) {
int64_t st = taosGetTimestampUs();
......
此差异已折叠。
......@@ -40,7 +40,7 @@ TEST_F(JsonEnv, testWrite) {
{
std::string colName("test");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -53,7 +53,7 @@ TEST_F(JsonEnv, testWrite) {
{
std::string colName("voltage");
std::string colVal("ab1");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -66,7 +66,7 @@ TEST_F(JsonEnv, testWrite) {
{
std::string colName("voltage");
std::string colVal("123");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -81,7 +81,7 @@ TEST_F(JsonEnv, testWrite) {
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
......@@ -95,7 +95,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
{
std::string colName("test");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -110,7 +110,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
std::string colVal("abxxxxxxxxxxxx");
for (int i = 0; i < 1000; i++) {
colVal[i % colVal.size()] = '0' + i % 128;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -124,7 +124,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
{
std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
......@@ -139,7 +139,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
......
......@@ -1521,7 +1521,7 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
SFillPhysiNode* pNode = (SFillPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkFillPhysiPlanMode, pNode->mode);
}
......
......@@ -945,7 +945,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x - %s", code, tstrerror(code));
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
......
......@@ -271,7 +271,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL, "Conflicting transaction not completed")
// mnode-mq
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册