提交 d7f7e327 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode

...@@ -805,19 +805,19 @@ typedef struct { ...@@ -805,19 +805,19 @@ typedef struct {
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg; } SCreateVnodeReq, SAlterVnodeReq;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t dnodeId; int32_t dnodeId;
char db[TSDB_DB_FNAME_LEN];
uint64_t dbUid; uint64_t dbUid;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; char db[TSDB_DB_FNAME_LEN];
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t accessState; int8_t accessState;
} SAuthVnodeMsg; } SAuthVnodeReq;
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
......
...@@ -32,6 +32,8 @@ extern "C" { ...@@ -32,6 +32,8 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct SVnodeCfg { typedef struct SVnodeCfg {
int32_t vgId;
/** vnode buffer pool options */ /** vnode buffer pool options */
struct { struct {
/** write buffer size */ /** write buffer size */
......
...@@ -20,16 +20,16 @@ ...@@ -20,16 +20,16 @@
extern "C" { extern "C" {
#endif #endif
typedef void* qinfo_t; typedef void* qTaskInfo_t;
/** /**
* create the qinfo object according to QueryTableMsg * create the qinfo object according to QueryTableMsg
* @param tsdb * @param tsdb
* @param pQueryTableMsg * @param pQueryTableMsg
* @param qinfo * @param pTaskInfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId); int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryTableMsg, qTaskInfo_t* pTaskInfo, uint64_t qId);
/** /**
* the main query execution function, including query on both table and multiple tables, * the main query execution function, including query on both table and multiple tables,
...@@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableM ...@@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableM
* @param qinfo * @param qinfo
* @return * @return
*/ */
bool qTableQuery(qinfo_t qinfo, uint64_t *qId); bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId);
/** /**
* Retrieve the produced results information, if current query is not paused or completed, * Retrieve the produced results information, if current query is not paused or completed,
...@@ -48,7 +48,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId); ...@@ -48,7 +48,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId);
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext); int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext);
/** /**
* *
...@@ -60,41 +60,41 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex ...@@ -60,41 +60,41 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
* @param contLen payload length * @param contLen payload length
* @return * @return
*/ */
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/** /**
* return the transporter context (RPC) * return the transporter context (RPC)
* @param qinfo * @param qinfo
* @return * @return
*/ */
void* qGetResultRetrieveMsg(qinfo_t qinfo); void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
/** /**
* kill the ongoing query and free the query handle and corresponding resources automatically * kill the ongoing query and free the query handle and corresponding resources automatically
* @param qinfo qhandle * @param qinfo qhandle
* @return * @return
*/ */
int32_t qKillQuery(qinfo_t qinfo); int32_t qKillTask(qTaskInfo_t qinfo);
/** /**
* return whether query is completed or not * return whether query is completed or not
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qIsQueryCompleted(qinfo_t qinfo); int32_t qIsQueryCompleted(qTaskInfo_t qinfo);
/** /**
* destroy query info structure * destroy query info structure
* @param qHandle * @param qHandle
*/ */
void qDestroyQueryInfo(qinfo_t qHandle); void qDestroyTask(qTaskInfo_t qHandle);
/** /**
* Get the queried table uid * Get the queried table uid
* @param qHandle * @param qHandle
* @return * @return
*/ */
int64_t qGetQueriedTableUid(qinfo_t qHandle); int64_t qGetQueriedTableUid(qTaskInfo_t qHandle);
/** /**
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks. * Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
...@@ -121,7 +121,7 @@ int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGro ...@@ -121,7 +121,7 @@ int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGro
* @param type operation type: ADD|DROP * @param type operation type: ADD|DROP
* @return * @return
*/ */
int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type);
//================================================================================================ //================================================================================================
// query handle management // query handle management
...@@ -130,13 +130,13 @@ int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type); ...@@ -130,13 +130,13 @@ int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type);
* @param vgId * @param vgId
* @return * @return
*/ */
void* qOpenQueryMgmt(int32_t vgId); void* qOpenTaskMgmt(int32_t vgId);
/** /**
* broadcast the close information and wait for all query stop. * broadcast the close information and wait for all query stop.
* @param pExecutor * @param pExecutor
*/ */
void qQueryMgmtNotifyClosed(void* pExecutor); void qTaskMgmtNotifyClosing(void* pExecutor);
/** /**
* Re-open the query handle management module when opening the vnode again. * Re-open the query handle management module when opening the vnode again.
...@@ -148,7 +148,7 @@ void qQueryMgmtReOpen(void *pExecutor); ...@@ -148,7 +148,7 @@ void qQueryMgmtReOpen(void *pExecutor);
* Close query mgmt and clean up resources. * Close query mgmt and clean up resources.
* @param pExecutor * @param pExecutor
*/ */
void qCleanupQueryMgmt(void* pExecutor); void qCleanupTaskMgmt(void* pExecutor);
/** /**
* Add the query into the query mgmt object * Add the query into the query mgmt object
...@@ -157,7 +157,7 @@ void qCleanupQueryMgmt(void* pExecutor); ...@@ -157,7 +157,7 @@ void qCleanupQueryMgmt(void* pExecutor);
* @param qInfo * @param qInfo
* @return * @return
*/ */
void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo); void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo);
/** /**
* acquire the query handle according to the key from query mgmt object. * acquire the query handle according to the key from query mgmt object.
...@@ -165,7 +165,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo); ...@@ -165,7 +165,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo);
* @param key * @param key
* @return * @return
*/ */
void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qAcquireTask(void* pMgmt, uint64_t key);
/** /**
* release the query handle and decrease the reference count in cache * release the query handle and decrease the reference count in cache
...@@ -174,7 +174,7 @@ void** qAcquireQInfo(void* pMgmt, uint64_t key); ...@@ -174,7 +174,7 @@ void** qAcquireQInfo(void* pMgmt, uint64_t key);
* @param freeHandle * @param freeHandle
* @return * @return
*/ */
void** qReleaseQInfo(void* pMgmt, void* pQInfo); void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
/** /**
* De-register the query handle from the management module and free it immediately. * De-register the query handle from the management module and free it immediately.
......
...@@ -89,7 +89,7 @@ enum { ...@@ -89,7 +89,7 @@ enum {
}; };
enum { enum {
MASTER_SCAN = 0x0u, MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
MERGE_STAGE = 0x20u, MERGE_STAGE = 0x20u,
...@@ -183,7 +183,6 @@ typedef struct tExprNode { ...@@ -183,7 +183,6 @@ typedef struct tExprNode {
struct {// function node struct {// function node
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; char functionName[FUNCTIONS_NAME_MAX_LENGTH];
// int32_t functionId;
int32_t num; int32_t num;
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the // Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
......
...@@ -155,7 +155,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** ...@@ -155,7 +155,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
// @subplan subplan to be schedule // @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan // @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans // @ep one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
......
...@@ -277,9 +277,12 @@ int32_t* taosGetErrno(); ...@@ -277,9 +277,12 @@ int32_t* taosGetErrno();
#define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452) #define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452)
#define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453) #define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453)
#define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454) #define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454)
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0460) #define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0460)
#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0461) #define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0461)
#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0462) #define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0462)
#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0463)
#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0464)
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0465)
// vnode // vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
......
...@@ -29,12 +29,12 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); ...@@ -29,12 +29,12 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) { ...@@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) {
} }
static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) {
SBnodeMgmt *pMgmt = &pDnode->bmgmt; if (pBnode == NULL) return;
int32_t refCount = 0;
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pBnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dTrace("release bnode, refCount:%d", refCount);
if (pBnode != NULL) {
dTrace("release bnode, refCount:%d", refCount);
}
} }
static int32_t dndReadBnodeFile(SDnode *pDnode) { static int32_t dndReadBnodeFile(SDnode *pDnode) {
......
...@@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { ...@@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) {
} }
static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; if (pMnode == NULL) return;
int32_t refCount = 0;
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pMnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dTrace("release mnode, refCount:%d", refCount);
if (pMnode != NULL) {
dTrace("release mnode, refCount:%d", refCount);
}
} }
static int32_t dndReadMnodeFile(SDnode *pDnode) { static int32_t dndReadMnodeFile(SDnode *pDnode) {
......
...@@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) { ...@@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) {
} }
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) { static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt; if (pQnode == NULL) return;
int32_t refCount = 0;
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pQnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dTrace("release qnode, refCount:%d", refCount);
if (pQnode != NULL) {
dTrace("release qnode, refCount:%d", refCount);
}
} }
static int32_t dndReadQnodeFile(SDnode *pDnode) { static int32_t dndReadQnodeFile(SDnode *pDnode) {
......
...@@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) { ...@@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) {
} }
static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt; if (pSnode == NULL) return;
int32_t refCount = 0;
SSnodeMgmt *pMgmt = &pDnode->smgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
if (pSnode != NULL) { int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
}
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dTrace("release snode, refCount:%d", refCount);
if (pSnode != NULL) {
dTrace("release snode, refCount:%d", refCount);
}
} }
static int32_t dndReadSnodeFile(SDnode *pDnode) { static int32_t dndReadSnodeFile(SDnode *pDnode) {
......
...@@ -4,8 +4,5 @@ add_subdirectory(qnode) ...@@ -4,8 +4,5 @@ add_subdirectory(qnode)
add_subdirectory(bnode) add_subdirectory(bnode)
add_subdirectory(snode) add_subdirectory(snode)
add_subdirectory(mnode) add_subdirectory(mnode)
add_subdirectory(db) add_subdirectory(vnode)
add_subdirectory(stb)
add_subdirectory(vgroup)
add_subdirectory(sut) add_subdirectory(sut)
aux_source_directory(. VGROUP_SRC)
add_executable(dnode_test_vgroup ${VGROUP_SRC})
target_link_libraries(
dnode_test_vgroup
PUBLIC sut
)
add_test(
NAME dnode_test_vgroup
COMMAND dnode_test_vgroup
)
aux_source_directory(. VNODE_SRC)
add_executable(dnode_test_vnode ${VNODE_SRC})
target_link_libraries(
dnode_test_vnode
PUBLIC sut
)
add_test(
NAME dnode_test_vnode
COMMAND dnode_test_vnode
)
...@@ -11,9 +11,9 @@ ...@@ -11,9 +11,9 @@
#include "sut.h" #include "sut.h"
class DndTestVgroup : public ::testing::Test { class DndTestVnode : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vgroup", 9150); } static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vnode", 9150); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
...@@ -23,14 +23,14 @@ class DndTestVgroup : public ::testing::Test { ...@@ -23,14 +23,14 @@ class DndTestVgroup : public ::testing::Test {
void TearDown() override {} void TearDown() override {}
}; };
Testbase DndTestVgroup::test; Testbase DndTestVnode::test;
TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) {
{ {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SCreateVnodeMsg); int32_t contLen = sizeof(SCreateVnodeReq);
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(contLen); SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2); pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1"); strcpy(pReq->db, "1.d1");
...@@ -68,9 +68,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { ...@@ -68,9 +68,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{ {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SAlterVnodeMsg); int32_t contLen = sizeof(SAlterVnodeReq);
SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(contLen); SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2); pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1"); strcpy(pReq->db, "1.d1");
...@@ -108,9 +108,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { ...@@ -108,9 +108,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{ {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
int32_t contLen = sizeof(SDropVnodeMsg); int32_t contLen = sizeof(SDropVnodeReq);
SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(contLen); SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen);
pReq->vgId = htonl(2); pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1); pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1"); strcpy(pReq->db, "1.d1");
...@@ -118,7 +118,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { ...@@ -118,7 +118,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq; rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropVnodeMsg); rpcMsg.contLen = sizeof(SDropVnodeReq);
rpcMsg.msgType = TDMT_DND_DROP_VNODE; rpcMsg.msgType = TDMT_DND_DROP_VNODE;
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen);
......
...@@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); ...@@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -331,11 +331,11 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -331,11 +331,11 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SCreateVnodeMsg); action.contLen = sizeof(SCreateVnodeReq);
action.msgType = TDMT_DND_CREATE_VNODE; action.msgType = TDMT_DND_CREATE_VNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -360,11 +360,11 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -360,11 +360,11 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SDropVnodeMsg); action.contLen = sizeof(SDropVnodeReq);
action.msgType = TDMT_DND_DROP_VNODE; action.msgType = TDMT_DND_DROP_VNODE;
if (mndTransAppendUndoAction(pTrans, &action) != 0) { if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -593,11 +593,11 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj ...@@ -593,11 +593,11 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SAlterVnodeMsg *pMsg = (SAlterVnodeMsg *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SAlterVnodeMsg); action.contLen = sizeof(SAlterVnodeReq);
action.msgType = TDMT_DND_ALTER_VNODE; action.msgType = TDMT_DND_ALTER_VNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -757,11 +757,11 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * ...@@ -757,11 +757,11 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SCreateVnodeMsg); action.contLen = sizeof(SCreateVnodeReq);
action.msgType = TDMT_DND_DROP_VNODE; action.msgType = TDMT_DND_DROP_VNODE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
......
...@@ -189,8 +189,8 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { ...@@ -189,8 +189,8 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SCreateVnodeMsg *pCreate = calloc(1, sizeof(SCreateVnodeMsg)); SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq));
if (pCreate == NULL) { if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -248,8 +248,8 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb ...@@ -248,8 +248,8 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
return pCreate; return pCreate;
} }
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SDropVnodeMsg *pDrop = calloc(1, sizeof(SDropVnodeMsg)); SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq));
if (pDrop == NULL) { if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
......
...@@ -10,3 +10,5 @@ add_subdirectory(show) ...@@ -10,3 +10,5 @@ add_subdirectory(show)
add_subdirectory(profile) add_subdirectory(profile)
add_subdirectory(dnode) add_subdirectory(dnode)
add_subdirectory(mnode) add_subdirectory(mnode)
add_subdirectory(db)
add_subdirectory(stb)
aux_source_directory(. DB_SRC) aux_source_directory(. DB_SRC)
add_executable(dnode_test_db ${DB_SRC}) add_executable(mnode_test_db ${DB_SRC})
target_link_libraries( target_link_libraries(
dnode_test_db mnode_test_db
PUBLIC sut PUBLIC sut
) )
add_test( add_test(
NAME dnode_test_db NAME mnode_test_db
COMMAND dnode_test_db COMMAND mnode_test_db
) )
/** /**
* @file db.cpp * @file db.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module db-msg tests * @brief MNODE module db tests
* @version 0.1 * @version 1.0
* @date 2021-12-15 * @date 2022-01-11
* *
* @copyright Copyright (c) 2021 * @copyright Copyright (c) 2022
* *
*/ */
#include "sut.h" #include "sut.h"
class DndTestDb : public ::testing::Test { class MndTestDb : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_db", 9040); } static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
...@@ -23,9 +23,9 @@ class DndTestDb : public ::testing::Test { ...@@ -23,9 +23,9 @@ class DndTestDb : public ::testing::Test {
void TearDown() override {} void TearDown() override {}
}; };
Testbase DndTestDb::test; Testbase MndTestDb::test;
TEST_F(DndTestDb, 01_ShowDb) { TEST_F(MndTestDb, 01_ShowDb) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
CHECK_META("show databases", 18); CHECK_META("show databases", 18);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name"); CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
...@@ -51,7 +51,7 @@ TEST_F(DndTestDb, 01_ShowDb) { ...@@ -51,7 +51,7 @@ TEST_F(DndTestDb, 01_ShowDb) {
EXPECT_EQ(test.GetShowRows(), 0); EXPECT_EQ(test.GetShowRows(), 0);
} }
TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
{ {
int32_t contLen = sizeof(SCreateDbMsg); int32_t contLen = sizeof(SCreateDbMsg);
...@@ -211,7 +211,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { ...@@ -211,7 +211,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
EXPECT_EQ(test.GetShowRows(), 0); EXPECT_EQ(test.GetShowRows(), 0);
} }
TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
{ {
int32_t contLen = sizeof(SCreateDbMsg); int32_t contLen = sizeof(SCreateDbMsg);
...@@ -281,7 +281,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -281,7 +281,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(pInfo->numOfEps, 1); EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0]; SEpAddrMsg* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port); pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9040); EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost"); EXPECT_STREQ(pAddr->fqdn, "localhost");
} }
...@@ -297,7 +297,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { ...@@ -297,7 +297,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(pInfo->numOfEps, 1); EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0]; SEpAddrMsg* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port); pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9040); EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost"); EXPECT_STREQ(pAddr->fqdn, "localhost");
} }
} }
......
/** /**
* @file stb.cpp * @file stb.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module db-msg tests * @brief MNODE module stb tests
* @version 0.1 * @version 1.0
* @date 2021-12-17 * @date 2022-01-12
* *
* @copyright Copyright (c) 2021 * @copyright Copyright (c) 2022
* *
*/ */
#include "sut.h" #include "sut.h"
class DndTestStb : public ::testing::Test { class MndTestStb : public ::testing::Test {
protected: protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_stb", 9101); } static void SetUpTestSuite() { test.Init("/tmp/mnode_test_stb", 9034); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
...@@ -23,9 +23,9 @@ class DndTestStb : public ::testing::Test { ...@@ -23,9 +23,9 @@ class DndTestStb : public ::testing::Test {
void TearDown() override {} void TearDown() override {}
}; };
Testbase DndTestStb::test; Testbase MndTestStb::test;
TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{ {
int32_t contLen = sizeof(SCreateDbMsg); int32_t contLen = sizeof(SCreateDbMsg);
......
...@@ -24,9 +24,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { ...@@ -24,9 +24,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
// Set default options // Set default options
if (pVnodeCfg == NULL) { //if (pVnodeCfg == NULL) {
pVnodeCfg = &defaultVnodeOptions; pVnodeCfg = &defaultVnodeOptions;
} //}
// Validate options // Validate options
if (vnodeValidateOptions(pVnodeCfg) < 0) { if (vnodeValidateOptions(pVnodeCfg) < 0) {
......
...@@ -88,37 +88,37 @@ typedef struct SResultRowPool { ...@@ -88,37 +88,37 @@ typedef struct SResultRowPool {
SArray* pData; // SArray<void*> SArray* pData; // SArray<void*>
} SResultRowPool; } SResultRowPool;
struct SQueryAttr; struct STaskAttr;
struct SQueryRuntimeEnv; struct STaskRuntimeEnv;
struct SUdfInfo; struct SUdfInfo;
int32_t getOutputInterResultBufSize(struct SQueryAttr* pQueryAttr); int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
size_t getResultRowSize(struct SQueryRuntimeEnv* pRuntimeEnv); size_t getResultRowSize(struct STaskRuntimeEnv* pRuntimeEnv);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
void resetResultRowInfo(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo);
int32_t initResultRow(SResultRow *pResultRow); int32_t initResultRow(SResultRow *pResultRow);
void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot);
bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot);
void clearResultRow(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type);
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr); void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr);
void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols); void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols);
int32_t getRowNumForMultioutput(struct SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable); int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable);
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
return pResultRowInfo->pResult[slot]; return pResultRowInfo->pResult[slot];
} }
static FORCE_INLINE char* getPosInResultPage(struct SQueryAttr* pQueryAttr, SFilePage* page, int32_t rowOffset, static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFilePage* page, int32_t rowOffset,
int32_t offset) { int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL); assert(rowOffset >= 0 && pQueryAttr != NULL);
...@@ -155,7 +155,7 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo); ...@@ -155,7 +155,7 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo);
bool incNextGroup(SGroupResInfo* pGroupResInfo); bool incNextGroup(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset); int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset);
int32_t initUdfInfo(struct SUdfInfo* pUdfInfo); int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
......
...@@ -21,13 +21,14 @@ ...@@ -21,13 +21,14 @@
#include "tvariant.h" #include "tvariant.h"
#include "thash.h" #include "thash.h"
//#include "parser.h"
#include "executil.h" #include "executil.h"
#include "taosdef.h" #include "taosdef.h"
#include "tarray.h" #include "tarray.h"
#include "tfilter.h" #include "tfilter.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tpagedfile.h" #include "tpagedfile.h"
#include "planner.h"
struct SColumnFilterElem; struct SColumnFilterElem;
...@@ -65,7 +66,6 @@ enum { ...@@ -65,7 +66,6 @@ enum {
QUERY_OVER = 0x4u, QUERY_OVER = 0x4u,
}; };
typedef struct SResultRowCell { typedef struct SResultRowCell {
uint64_t groupId; uint64_t groupId;
SResultRow *pRow; SResultRow *pRow;
...@@ -100,7 +100,7 @@ typedef struct STableQueryInfo { ...@@ -100,7 +100,7 @@ typedef struct STableQueryInfo {
TSKEY lastKey; TSKEY lastKey;
int32_t groupIndex; // group id in table list int32_t groupIndex; // group id in table list
SVariant tag; SVariant tag;
STimeWindow win; STimeWindow win; // todo remove it later
STSCursor cur; STSCursor cur;
void* pTable; // for retrieve the page id list void* pTable; // for retrieve the page id list
SResultRowInfo resInfo; SResultRowInfo resInfo;
...@@ -128,31 +128,34 @@ typedef struct { ...@@ -128,31 +128,34 @@ typedef struct {
int64_t sumRunTimes; int64_t sumRunTimes;
} SOperatorProfResult; } SOperatorProfResult;
typedef struct SQueryCostInfo { typedef struct STaskCostInfo {
uint64_t loadStatisTime; int64_t start;
uint64_t loadFileBlockTime; int64_t end;
uint64_t loadDataInCacheTime;
uint64_t loadStatisSize; uint64_t loadStatisTime;
uint64_t loadFileBlockSize; uint64_t loadFileBlockTime;
uint64_t loadDataInCacheSize; uint64_t loadDataInCacheTime;
uint64_t loadStatisSize;
uint64_t loadDataTime; uint64_t loadFileBlockSize;
uint64_t totalRows; uint64_t loadDataInCacheSize;
uint64_t totalCheckedRows;
uint32_t totalBlocks; uint64_t loadDataTime;
uint32_t loadBlocks; uint64_t totalRows;
uint32_t loadBlockStatis; uint64_t totalCheckedRows;
uint32_t discardBlocks; uint32_t totalBlocks;
uint64_t elapsedTime; uint32_t loadBlocks;
uint64_t firstStageMergeTime; uint32_t loadBlockStatis;
uint64_t winInfoSize; uint32_t discardBlocks;
uint64_t tableInfoSize; uint64_t elapsedTime;
uint64_t hashSize; uint64_t firstStageMergeTime;
uint64_t numOfTimeWindows; uint64_t winInfoSize;
uint64_t tableInfoSize;
SArray* queryProfEvents; //SArray<SQueryProfEvent> uint64_t hashSize;
SHashObj* operatorProfResults; //map<operator_type, SQueryProfEvent> uint64_t numOfTimeWindows;
} SQueryCostInfo;
SArray *queryProfEvents; //SArray<SQueryProfEvent>
SHashObj *operatorProfResults; //map<operator_type, SQueryProfEvent>
} STaskCostInfo;
typedef struct { typedef struct {
int64_t vgroupLimit; int64_t vgroupLimit;
...@@ -166,7 +169,7 @@ typedef struct { ...@@ -166,7 +169,7 @@ typedef struct {
// The basic query information extracted from the SQueryInfo tree to support the // The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node. // execution of query in a data node.
typedef struct SQueryAttr { typedef struct STaskAttr {
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
...@@ -229,16 +232,40 @@ typedef struct SQueryAttr { ...@@ -229,16 +232,40 @@ typedef struct SQueryAttr {
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId; int32_t vgId;
SArray *pUdfInfo; // no need to free SArray *pUdfInfo; // no need to free
} SQueryAttr; } STaskAttr;
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
struct SOperatorInfo; struct SOperatorInfo;
typedef struct SQueryRuntimeEnv { typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
uint64_t subplanId;
uint64_t templateId;
uint64_t taskId; // this is a subplan id
} STaskIdInfo;
typedef struct STaskInfo {
STaskIdInfo id;
char *content;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads
// tsem_t ready;
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char *sql; // query sql string
jmp_buf env;
} STaskInfo;
typedef struct STaskRuntimeEnv {
jmp_buf env; jmp_buf env;
SQueryAttr* pQueryAttr; STaskAttr* pQueryAttr;
uint32_t status; // query status uint32_t status; // query status
void* qinfo; void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not uint8_t scanFlag; // denotes reversed scan of data or not
...@@ -271,7 +298,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -271,7 +298,7 @@ typedef struct SQueryRuntimeEnv {
SRspResultInfo resultInfo; SRspResultInfo resultInfo;
SHashObj *pTableRetrieveTsMap; SHashObj *pTableRetrieveTsMap;
struct SUdfInfo *pUdfInfo; struct SUdfInfo *pUdfInfo;
} SQueryRuntimeEnv; } STaskRuntimeEnv;
enum { enum {
OP_IN_EXECUTING = 1, OP_IN_EXECUTING = 1,
...@@ -287,10 +314,11 @@ typedef struct SOperatorInfo { ...@@ -287,10 +314,11 @@ typedef struct SOperatorInfo {
char *name; // name, used to show the query execution plan char *name; // name, used to show the query execution plan
void *info; // extension attribution void *info; // extension attribution
SExprInfo *pExpr; SExprInfo *pExpr;
SQueryRuntimeEnv *pRuntimeEnv; STaskRuntimeEnv *pRuntimeEnv;
STaskInfo *pTaskInfo;
struct SOperatorInfo **upstream; // upstream pointer list struct SOperatorInfo **pDownstream; // downstram pointer list
int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__operator_fn_t exec; __operator_fn_t exec;
__optr_cleanup_fn_t cleanup; __optr_cleanup_fn_t cleanup;
} SOperatorInfo; } SOperatorInfo;
...@@ -312,8 +340,8 @@ typedef struct SQInfo { ...@@ -312,8 +340,8 @@ typedef struct SQInfo {
int32_t code; // error code to returned to client int32_t code; // error code to returned to client
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
SQueryRuntimeEnv runtimeEnv; STaskRuntimeEnv runtimeEnv;
SQueryAttr query; STaskAttr query;
void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
...@@ -322,10 +350,10 @@ typedef struct SQInfo { ...@@ -322,10 +350,10 @@ typedef struct SQInfo {
void* rspContext; // response context void* rspContext; // response context
int64_t startExecTs; // start to exec timestamp int64_t startExecTs; // start to exec timestamp
char* sql; // query sql string char* sql; // query sql string
SQueryCostInfo summary; STaskCostInfo summary;
} SQInfo; } SQInfo;
typedef struct SQueryParam { typedef struct STaskParam {
char *sql; char *sql;
char *tagCond; char *tagCond;
char *colCond; char *colCond;
...@@ -345,7 +373,7 @@ typedef struct SQueryParam { ...@@ -345,7 +373,7 @@ typedef struct SQueryParam {
int32_t tableScanOperator; int32_t tableScanOperator;
SArray *pOperator; SArray *pOperator;
struct SUdfInfo *pUdfInfo; struct SUdfInfo *pUdfInfo;
} SQueryParam; } STaskParam;
typedef struct STableScanInfo { typedef struct STableScanInfo {
void *pQueryHandle; void *pQueryHandle;
...@@ -366,9 +394,12 @@ typedef struct STableScanInfo { ...@@ -366,9 +394,12 @@ typedef struct STableScanInfo {
SSDataBlock block; SSDataBlock block;
int32_t numOfOutput; int32_t numOfOutput;
int64_t elapsedTime; int64_t elapsedTime;
int32_t tableIndex; int32_t tableIndex;
int32_t prevGroupId; // previous table group id
int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
STimeWindow window;
} STableScanInfo; } STableScanInfo;
typedef struct STagScanInfo { typedef struct STagScanInfo {
...@@ -512,34 +543,34 @@ typedef struct SOrderOperatorInfo { ...@@ -512,34 +543,34 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger); int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
...@@ -561,8 +592,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -561,8 +592,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(SQueryParam *param); void freeParam(STaskParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
...@@ -575,13 +606,13 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo ...@@ -575,13 +606,13 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo); SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start, int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger); int32_t prevResultLen, void* merger);
int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId); int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf);
STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); STableQueryInfo* createTmpTableQueryInfo(STimeWindow win);
int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg);
...@@ -590,9 +621,9 @@ bool isQueryKilled(SQInfo *pQInfo); ...@@ -590,9 +621,9 @@ bool isQueryKilled(SQInfo *pQInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
bool checkNeedToCompressQueryCol(SQInfo *pQInfo); bool checkNeedToCompressQueryCol(SQInfo *pQInfo);
bool doBuildResCheck(SQInfo* pQInfo); bool doBuildResCheck(SQInfo* pQInfo);
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
bool onlyQueryTags(SQueryAttr* pQueryAttr); bool onlyQueryTags(STaskAttr* pQueryAttr);
void destroyUdfInfo(struct SUdfInfo* pUdfInfo); void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool isValidQInfo(void *param); bool isValidQInfo(void *param);
...@@ -607,8 +638,8 @@ void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code); ...@@ -607,8 +638,8 @@ void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo); void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo); void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo); void doDestroyTask(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery); void freeQueryAttr(STaskAttr *pQuery);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
......
...@@ -30,7 +30,7 @@ typedef struct SCompSupporter { ...@@ -30,7 +30,7 @@ typedef struct SCompSupporter {
int32_t order; int32_t order;
} SCompSupporter; } SCompSupporter;
int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) { int32_t getRowNumForMultioutput(STaskAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) { if (pQueryAttr && (!stable)) {
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) { for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) { // if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) {
...@@ -42,7 +42,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo ...@@ -42,7 +42,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo
return 1; return 1;
} }
int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) { int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t size = 0; int32_t size = 0;
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
...@@ -86,7 +86,7 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { ...@@ -86,7 +86,7 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
tfree(pResultRowInfo->pResult); tfree(pResultRowInfo->pResult);
} }
void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) {
return; return;
} }
...@@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { ...@@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) {
getResultRow(pResultRowInfo, slot)->closed = true; getResultRow(pResultRowInfo, slot)->closed = true;
} }
void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) { void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) {
if (pResultRow == NULL) { if (pResultRow == NULL) {
return; return;
} }
...@@ -174,8 +174,8 @@ struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, ...@@ -174,8 +174,8 @@ struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index,
return NULL; return NULL;
} }
size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) { size_t getResultRowSize(STaskRuntimeEnv* pRuntimeEnv) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
return 0; return 0;
// return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow); // return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
} }
...@@ -393,8 +393,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { ...@@ -393,8 +393,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return (int32_t) taosArrayGetSize(pGroupResInfo->pRows); return (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
} }
static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) { static int64_t getNumOfResultWindowRes(STaskRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) { for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) {
int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId; int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId;
...@@ -488,7 +488,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) { ...@@ -488,7 +488,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) {
} }
} }
void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
__compar_fn_t fn = NULL; __compar_fn_t fn = NULL;
if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) {
fn = tsAscOrder; fn = tsAscOrder;
...@@ -499,7 +499,7 @@ void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { ...@@ -499,7 +499,7 @@ void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) {
taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); taosArraySort(pRuntimeEnv->pResultRowArrayList, fn);
} }
static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
if (!pGroupResInfo->ordered) { if (!pGroupResInfo->ordered) {
orderTheResultRows(pRuntimeEnv); orderTheResultRows(pRuntimeEnv);
pGroupResInfo->ordered = true; pGroupResInfo->ordered = true;
...@@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR ...@@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList,
int32_t* rowCellInfoOffset) { int32_t* rowCellInfoOffset) {
bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr);
...@@ -630,7 +630,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn ...@@ -630,7 +630,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn
return code; return code;
} }
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) { int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, int32_t* offset) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
#include "exception.h"
#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"
typedef struct STaskMgmt {
pthread_mutex_t lock;
SCacheObj *qinfoPool; // query handle pool
int32_t vgId;
bool closed;
} STaskMgmt;
static void taskMgmtKillTaskFn(void* handle, void* param1) {
void** fp = (void**)handle;
qKillTask(*fp);
}
static void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillTask(*handle);
qDestroyTask(*handle);
}
void freeParam(STaskParam *param) {
tfree(param->sql);
tfree(param->tagCond);
tfree(param->tbnameCond);
tfree(param->pTableIdList);
taosArrayDestroy(param->pOperator);
tfree(param->pExprs);
tfree(param->pSecExprs);
tfree(param->pExpr);
tfree(param->pSecExpr);
tfree(param->pGroupColIndex);
tfree(param->pTagColumnInfo);
tfree(param->pGroupbyExpr);
tfree(param->prevResult);
}
// todo parse json to get the operator tree.
int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryMsg, qTaskInfo_t* pTaskInfo, uint64_t taskId) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
#if 0
STaskParam param = {0};
code = convertQueryMsg(pQueryMsg, &param);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
if (pQueryMsg->numOfTables <= 0) {
qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) {
qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
SQueriedTableInfo info = { .numOfTags = pQueryMsg->numOfTags, .numOfCols = pQueryMsg->numOfCols, .colList = pQueryMsg->tableCols};
if ((code = createQueryFunc(&info, pQueryMsg->numOfOutput, &param.pExprs, param.pExpr, param.pTagColumnInfo,
pQueryMsg->queryType, pQueryMsg, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
if (param.pSecExpr != NULL) {
if ((code = createIndirectQueryFuncExprFromMsg(pQueryMsg, pQueryMsg->secondStageOutput, &param.pSecExprs, param.pSecExpr, param.pExprs, param.pUdfInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
if (param.colCond != NULL) {
if ((code = createQueryFilter(param.colCond, pQueryMsg->colCondLen, &param.pFilters)) != TSDB_CODE_SUCCESS) {
goto _over;
}
}
param.pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, param.pGroupColIndex, &code);
if ((param.pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _over;
}
bool isSTableQuery = false;
STableGroupInfo tableGroupInfo = {0};
int64_t st = taosGetTimestampUs();
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
qDebug("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, pQueryMsg->window.skey, &tableGroupInfo)) != TSDB_CODE_SUCCESS) {
goto _over;
}
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true;
// also note there's possibility that only one table in the super table
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
STableIdInfo *id = taosArrayGet(param.pTableIdList, 0);
// group by normal column, do not pass the group by condition to tsdb to group table into different group
int32_t numOfGroupByCols = pQueryMsg->numOfGroupCols;
if (pQueryMsg->numOfGroupCols == 1 && !TSDB_COL_IS_TAG(param.pGroupColIndex->flag)) {
numOfGroupByCols = 0;
}
qDebug("qmsg:%p query stable, uid:%"PRIu64", tid:%d", pQueryMsg, id->uid, id->tid);
code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, param.tagCond, pQueryMsg->tagCondLen,
pQueryMsg->tagNameRelType, param.tbnameCond, &tableGroupInfo, param.pGroupColIndex, numOfGroupByCols);
if (code != TSDB_CODE_SUCCESS) {
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
goto _over;
}
} else {
code = tsdbGetTableGroupFromIdList(tsdb, param.pTableIdList, &tableGroupInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
qDebug("qmsg:%p query on %u tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
}
int64_t el = taosGetTimestampUs() - st;
qDebug("qmsg:%p tag filter completed, numOfTables:%u, elapsed time:%"PRId64"us", pQueryMsg, tableGroupInfo.numOfTables, el);
} else {
assert(0);
}
code = checkForQueryBuf(tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
goto _over;
}
assert(pQueryMsg->stableQuery == isSTableQuery);
(*pTaskInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo,
param.pTagColumnInfo, param.pFilters, vgId, param.sql, qId, param.pUdfInfo);
param.sql = NULL;
param.pExprs = NULL;
param.pSecExprs = NULL;
param.pGroupbyExpr = NULL;
param.pTagColumnInfo = NULL;
param.pFilters = NULL;
if ((*pTaskInfo) == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _over;
}
param.pUdfInfo = NULL;
code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pTaskInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
_over:
if (param.pGroupbyExpr != NULL) {
taosArrayDestroy(param.pGroupbyExpr->columnInfo);
}
tfree(param.colCond);
destroyUdfInfo(param.pUdfInfo);
taosArrayDestroy(param.pTableIdList);
param.pTableIdList = NULL;
freeParam(&param);
for (int32_t i = 0; i < pQueryMsg->numOfCols; i++) {
SColumnInfo* column = pQueryMsg->tableCols + i;
freeColumnFilterInfo(column->flist.filterInfo, column->flist.numOfFilters);
}
filterFreeInfo(param.pFilters);
//pTaskInfo already freed in initQInfo, but *pTaskInfo may not pointer to null;
if (code != TSDB_CODE_SUCCESS) {
*pTaskInfo = NULL;
}
#endif
// if failed to add ref for all tables in this query, abort current query
return code;
}
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
pos += 3;
ms = atoi(pos);
while(*pos >= '0' && *pos <= '9'){
pos ++;
}
char unit_char = *pos;
if(unit_char == 'h'){
ms *= 3600*1000;
} else if(unit_char == 'm'){
ms *= 60*1000;
} else if(unit_char == 's'){
ms *= 1000;
}
}
if(ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if(ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while(used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isQueryKilled(pQInfo)){
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
int64_t threadId = taosGetSelfPthreadId();
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pQInfo->owner, 0, threadId)) != 0) {
qError("QInfo:0x%"PRIx64"-%p qhandle is now executed by thread:%p", pQInfo->qId, pQInfo, (void*) curOwner);
pQInfo->code = TSDB_CODE_QRY_IN_EXEC;
return false;
}
*qId = pQInfo->qId;
if(pQInfo->startExecTs == 0)
pQInfo->startExecTs = taosGetTimestampMs();
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" it is already killed, abort", pQInfo->qId);
return doBuildResCheck(pQInfo);
}
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", pQInfo->qId);
// setTaskStatus(pRuntimeEnv, QUERY_COMPLETED);
return doBuildResCheck(pQInfo);
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
}
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = taosGetTimestampUs();
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
#ifdef TEST_IMPL
waitMoment(pQInfo);
#endif
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" query is killed", pQInfo->qId);
} else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pQInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
pRuntimeEnv->resultInfo.total);
} else {
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pQInfo->qId,
GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total);
}
return doBuildResCheck(pQInfo);
}
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
qError("QInfo invalid qhandle");
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
*buildRes = false;
if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code);
return pQInfo->code;
}
int32_t code = TSDB_CODE_SUCCESS;
if (tsRetrieveBlockingModel) {
pQInfo->rspContext = pRspContext;
tsem_wait(&pQInfo->ready);
*buildRes = true;
code = pQInfo->code;
} else {
STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
pthread_mutex_lock(&pQInfo->lock);
assert(pQInfo->rspContext == NULL);
if (pQInfo->dataReady == QUERY_RESULT_READY) {
*buildRes = true;
qDebug("QInfo:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize,
GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code));
} else {
*buildRes = false;
qDebug("QInfo:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId);
pQInfo->rspContext = pRspContext;
assert(pQInfo->rspContext != NULL);
}
code = pQInfo->code;
pthread_mutex_unlock(&pQInfo->lock);
}
return code;
}
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
assert(pQInfo != NULL);
return pQInfo->rspContext;
}
int32_t qKillTask(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query killed", pQInfo->qId);
setQueryKilled(pQInfo);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while (pQInfo->owner != 0) {
taosMsleep(100);
}
return TSDB_CODE_SUCCESS;
}
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
return isQueryKilled(pQInfo) || Q_STATUS_EQUAL(pQInfo->runtimeEnv.status, QUERY_OVER);
}
void qDestroyTask(qTaskInfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) {
return;
}
qDebug("QInfo:0x%"PRIx64" query completed", pQInfo->qId);
queryCostStatis(pQInfo); // print the query cost summary
doDestroyTask(pQInfo);
}
void* qOpenTaskMgmt(int32_t vgId) {
const int32_t refreshHandleInterval = 30; // every 30 seconds, refresh handle pool
char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId);
STaskMgmt* pTaskMgmt = calloc(1, sizeof(STaskMgmt));
if (pTaskMgmt == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pTaskMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshHandleInterval, true, freeqinfoFn, cacheName);
pTaskMgmt->closed = false;
pTaskMgmt->vgId = vgId;
pthread_mutex_init(&pTaskMgmt->lock, NULL);
qDebug("vgId:%d, open queryTaskMgmt success", vgId);
return pTaskMgmt;
}
void qTaskMgmtNotifyClosing(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
STaskMgmt* pQueryMgmt = pQMgmt;
qInfo("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, taskMgmtKillTaskFn, NULL);
}
void qQueryMgmtReOpen(void *pQMgmt) {
if (pQMgmt == NULL) {
return;
}
STaskMgmt *pQueryMgmt = pQMgmt;
qInfo("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId);
pthread_mutex_lock(&pQueryMgmt->lock);
pQueryMgmt->closed = false;
pthread_mutex_unlock(&pQueryMgmt->lock);
}
void qCleanupTaskMgmt(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
STaskMgmt* pQueryMgmt = pQMgmt;
int32_t vgId = pQueryMgmt->vgId;
assert(pQueryMgmt->closed);
SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool;
pQueryMgmt->qinfoPool = NULL;
taosCacheCleanup(pqinfoPool);
pthread_mutex_destroy(&pQueryMgmt->lock);
tfree(pQueryMgmt);
qDebug("vgId:%d, queryMgmt cleanup completed", vgId);
}
void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
if (pMgmt == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
STaskMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock);
qError("QInfo:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
} else {
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE),
(getMaximumIdleDurationSec()*1000));
pthread_mutex_unlock(&pQueryMgmt->lock);
return handle;
}
}
void** qAcquireTask(void* pMgmt, uint64_t _key) {
STaskMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->closed) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
return NULL;
}
if (pQueryMgmt->qinfoPool == NULL) {
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
return NULL;
}
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &_key, sizeof(_key));
if (handle == NULL || *handle == NULL) {
terrno = TSDB_CODE_QRY_INVALID_QHANDLE;
return NULL;
} else {
return handle;
}
}
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle) {
STaskMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
return NULL;
}
taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle);
return 0;
}
#if 0
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t error = TSDB_CODE_SUCCESS;
void** handle = qAcquireTask(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
setQueryKilled(pQInfo);
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
error = TSDB_CODE_FAILED;
break;
}
}
qReleaseTask(pMgmt, (void **)&handle, true);
return error;
}
#endif
\ No newline at end of file
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "index.h" #include "index.h"
#include "index_fst.h" #include "index_fst.h"
#include "taos.h" #include "taos.h"
#include "tchecksum.h"
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
......
...@@ -34,6 +34,7 @@ typedef struct WriterCtx { ...@@ -34,6 +34,7 @@ typedef struct WriterCtx {
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
int (*flush)(struct WriterCtx* ctx); int (*flush)(struct WriterCtx* ctx);
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset); int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
int (*size)(struct WriterCtx* ctx);
WriterType type; WriterType type;
union { union {
struct { struct {
......
...@@ -34,7 +34,10 @@ void indexInit() { ...@@ -34,7 +34,10 @@ void indexInit() {
// refactor later // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
} }
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); } void indexCleanUp() {
// refacto later
taosCleanUpScheduler(indexQhandle);
}
static int uidCompare(const void* a, const void* b) { static int uidCompare(const void* a, const void* b) {
// add more version compare // add more version compare
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000 #define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 1024 * 1024 * 2 #define MEM_THRESHOLD 1024 * 1024
#define MEM_ESTIMATE_RADIO 1.5 #define MEM_ESTIMATE_RADIO 1.5
static void indexMemRef(MemTable* tbl); static void indexMemRef(MemTable* tbl);
......
...@@ -935,7 +935,10 @@ Fst* fstCreate(FstSlice* slice) { ...@@ -935,7 +935,10 @@ Fst* fstCreate(FstSlice* slice) {
uint32_t checkSum = 0; uint32_t checkSum = 0;
len -= sizeof(checkSum); len -= sizeof(checkSum);
taosDecodeFixedU32(buf + len, &checkSum); taosDecodeFixedU32(buf + len, &checkSum);
if (taosCheckChecksum(buf, len, checkSum)) {
// verify fst
return NULL;
}
CompiledAddr rootAddr; CompiledAddr rootAddr;
len -= sizeof(rootAddr); len -= sizeof(rootAddr);
taosDecodeFixedU64(buf + len, &rootAddr); taosDecodeFixedU64(buf + len, &rootAddr);
......
...@@ -59,6 +59,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off ...@@ -59,6 +59,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
} }
return nRead; return nRead;
} }
static int writeCtxGetSize(WriterCtx* ctx) {
if (ctx->type == TFile && ctx->file.readOnly) {
// refactor later
return ctx->file.size;
}
return 0;
}
static int writeCtxDoFlush(WriterCtx* ctx) { static int writeCtxDoFlush(WriterCtx* ctx) {
if (ctx->type == TFile) { if (ctx->type == TFile) {
// taosFsyncFile(ctx->file.fd); // taosFsyncFile(ctx->file.fd);
...@@ -109,6 +116,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -109,6 +116,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
ctx->read = writeCtxDoRead; ctx->read = writeCtxDoRead;
ctx->flush = writeCtxDoFlush; ctx->flush = writeCtxDoFlush;
ctx->readFrom = writeCtxDoReadFrom; ctx->readFrom = writeCtxDoReadFrom;
ctx->size = writeCtxGetSize;
ctx->offset = 0; ctx->offset = 0;
ctx->limit = capacity; ctx->limit = capacity;
...@@ -159,6 +167,8 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) ...@@ -159,6 +167,8 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len)
int nWrite = ctx->write(ctx, buf, len); int nWrite = ctx->write(ctx, buf, len);
assert(nWrite == len); assert(nWrite == len);
write->count += len; write->count += len;
write->summer = taosCalcChecksum(write->summer, buf, len);
return len; return len;
} }
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) { int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
...@@ -169,7 +179,10 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) ...@@ -169,7 +179,10 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len)
return nRead; return nRead;
} }
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
// opt
return write->summer;
}
int fstCountingWriterFlush(FstCountingWriter* write) { int fstCountingWriterFlush(FstCountingWriter* write) {
WriterCtx* ctx = write->wrt; WriterCtx* ctx = write->wrt;
......
...@@ -21,8 +21,11 @@ p * ...@@ -21,8 +21,11 @@ p *
#include "index_fst_counting_writer.h" #include "index_fst_counting_writer.h"
#include "index_util.h" #include "index_util.h"
#include "taosdef.h" #include "taosdef.h"
#include "tcoding.h"
#include "tcompare.h" #include "tcompare.h"
const static uint64_t tfileMagicNumber = 0xdb4775248b80fb57ull;
typedef struct TFileFstIter { typedef struct TFileFstIter {
FstStreamBuilder* fb; FstStreamBuilder* fb;
StreamWithState* st; StreamWithState* st;
...@@ -40,9 +43,12 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds); ...@@ -40,9 +43,12 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteHeader(TFileWriter* writer);
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
static int tfileWriteData(TFileWriter* write, TFileValue* tval); static int tfileWriteData(TFileWriter* write, TFileValue* tval);
static int tfileWriteFooter(TFileWriter* write);
// handle file corrupt later
static int tfileReaderLoadHeader(TFileReader* reader); static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader); static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReaderVerify(TFileReader* reader);
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static SArray* tfileGetFileList(const char* path); static SArray* tfileGetFileList(const char* path);
...@@ -138,8 +144,14 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { ...@@ -138,8 +144,14 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
TFileReader* reader = calloc(1, sizeof(TFileReader)); TFileReader* reader = calloc(1, sizeof(TFileReader));
if (reader == NULL) { return NULL; } if (reader == NULL) { return NULL; }
// T_REF_INC(reader);
reader->ctx = ctx; reader->ctx = ctx;
if (0 != tfileReaderVerify(reader)) {
tfileReaderDestroy(reader);
indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
return NULL;
}
// T_REF_INC(reader);
if (0 != tfileReaderLoadHeader(reader)) { if (0 != tfileReaderLoadHeader(reader)) {
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
...@@ -296,6 +308,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -296,6 +308,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
fstBuilderFinish(tw->fb); fstBuilderFinish(tw->fb);
fstBuilderDestroy(tw->fb); fstBuilderDestroy(tw->fb);
tw->fb = NULL; tw->fb = NULL;
tfileWriteFooter(tw);
return 0; return 0;
} }
void tfileWriterClose(TFileWriter* tw) { void tfileWriterClose(TFileWriter* tw) {
...@@ -502,6 +516,14 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { ...@@ -502,6 +516,14 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
} }
return 0; return 0;
} }
static int tfileWriteFooter(TFileWriter* write) {
char buf[sizeof(tfileMagicNumber) + 1] = {0};
void* pBuf = (void*)buf;
taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
int nwrite = write->ctx->write(write->ctx, buf, strlen(buf));
assert(nwrite == sizeof(tfileMagicNumber));
return nwrite;
}
static int tfileReaderLoadHeader(TFileReader* reader) { static int tfileReaderLoadHeader(TFileReader* reader) {
// TODO simple tfile header later // TODO simple tfile header later
char buf[TFILE_HEADER_SIZE] = {0}; char buf[TFILE_HEADER_SIZE] = {0};
...@@ -527,9 +549,14 @@ static int tfileReaderLoadFst(TFileReader* reader) { ...@@ -527,9 +549,14 @@ static int tfileReaderLoadFst(TFileReader* reader) {
if (buf == NULL) { return -1; } if (buf == NULL) { return -1; }
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); int size = ctx->size(ctx);
indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf,
ctx->file.size); int64_t ts = taosGetTimestampUs();
int32_t nread =
ctx->readFrom(ctx, buf, size - reader->header.fstOffset - sizeof(tfileMagicNumber), reader->header.fstOffset);
int64_t cost = taosGetTimestampUs() - ts;
indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d, time cost: %" PRId64 "us", nread,
reader->header.fstOffset, ctx->file.buf, ctx->file.size, cost);
// we assuse fst size less than FST_MAX_SIZE // we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread < FST_MAX_SIZE); assert(nread > 0 && nread < FST_MAX_SIZE);
...@@ -558,6 +585,25 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* ...@@ -558,6 +585,25 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
free(buf); free(buf);
return 0; return 0;
} }
static int tfileReaderVerify(TFileReader* reader) {
// just validate header and Footer, file corrupted also shuild be verified later
WriterCtx* ctx = reader->ctx;
uint64_t tMagicNumber = 0;
char buf[sizeof(tMagicNumber) + 1] = {0};
int size = ctx->size(ctx);
if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) {
return -1;
} else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) {
return -1;
}
taosDecodeFixedU64(buf, &tMagicNumber);
return tMagicNumber == tfileMagicNumber ? 0 : -1;
}
void tfileReaderRef(TFileReader* reader) { void tfileReaderRef(TFileReader* reader) {
if (reader == NULL) { return; } if (reader == NULL) { return; }
int ref = T_REF_INC(reader); int ref = T_REF_INC(reader);
......
#include <algorithm>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <thread> #include <thread>
...@@ -12,7 +13,6 @@ ...@@ -12,7 +13,6 @@
#include "index_tfile.h" #include "index_tfile.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tutil.h" #include "tutil.h"
void* callback(void* s) { return s; } void* callback(void* s) { return s; }
static std::string fileName = "/tmp/tindex.tindex"; static std::string fileName = "/tmp/tindex.tindex";
...@@ -293,7 +293,7 @@ void validateTFile(char* arg) { ...@@ -293,7 +293,7 @@ void validateTFile(char* arg) {
std::thread threads[NUM_OF_THREAD]; std::thread threads[NUM_OF_THREAD];
// std::vector<std::thread> threads; // std::vector<std::thread> threads;
TFileReader* reader = tfileReaderOpen(arg, 0, 999992, "tag1"); TFileReader* reader = tfileReaderOpen(arg, 0, 20000000, "tag1");
for (int i = 0; i < NUM_OF_THREAD; i++) { for (int i = 0; i < NUM_OF_THREAD; i++) {
threads[i] = std::thread(fst_get, reader->fst); threads[i] = std::thread(fst_get, reader->fst);
...@@ -306,13 +306,41 @@ void validateTFile(char* arg) { ...@@ -306,13 +306,41 @@ void validateTFile(char* arg) {
} }
tfCleanup(); tfCleanup();
} }
void iterTFileReader(char* path, char* ver) {
tfInit();
int version = atoi(ver);
TFileReader* reader = tfileReaderOpen(path, 0, version, "tag1");
Iterate* iter = tfileIteratorCreate(reader);
bool tn = iter ? iter->next(iter) : false;
int count = 0;
int termCount = 0;
while (tn == true) {
count++;
IterateValue* cv = iter->getValue(iter);
termCount += (int)taosArrayGetSize(cv->val);
printf("col val: %s, size: %d\n", cv->colVal, (int)taosArrayGetSize(cv->val));
tn = iter->next(iter);
}
printf("total size: %d\n term count: %d\n", count, termCount);
tfileIteratorDestroy(iter);
tfCleanup();
}
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
// tool to check all kind of fst test // tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); } // if (argc > 1) { validateTFile(argv[1]); }
if (argc > 2) {
// opt
iterTFileReader(argv[1], argv[2]);
}
// checkFstCheckIterator(); // checkFstCheckIterator();
// checkFstLongTerm(); // checkFstLongTerm();
// checkFstPrefixSearch(); // checkFstPrefixSearch();
checkMillonWriteAndReadOfFst(); // checkMillonWriteAndReadOfFst();
return 1; return 1;
} }
...@@ -665,14 +665,19 @@ class IndexObj { ...@@ -665,14 +665,19 @@ class IndexObj {
size_t numOfTable = 100 * 10000) { size_t numOfTable = 100 * 10000) {
std::string tColVal = colVal; std::string tColVal = colVal;
size_t colValSize = tColVal.size(); size_t colValSize = tColVal.size();
int skip = 100;
numOfTable /= skip;
for (int i = 0; i < numOfTable; i++) { for (int i = 0; i < numOfTable; i++) {
tColVal[i % colValSize] = 'a' + i % 26; for (int k = 0; k < 10 && k < colVal.size(); k++) {
// opt
tColVal[rand() % colValSize] = 'a' + k % 26;
}
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
tColVal.c_str(), tColVal.size()); tColVal.c_str(), tColVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 10; i++) { for (size_t j = 0; j < skip; j++) {
int ret = Put(terms, i); int ret = Put(terms, j);
assert(ret == 0); assert(ret == 0);
} }
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
...@@ -939,10 +944,11 @@ TEST_F(IndexEnv2, testIndex_read_performance) { ...@@ -939,10 +944,11 @@ TEST_F(IndexEnv2, testIndex_read_performance) {
TEST_F(IndexEnv2, testIndexMultiTag) { TEST_F(IndexEnv2, testIndexMultiTag) {
std::string path = "/tmp/multi_tag"; std::string path = "/tmp/multi_tag";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {}
index->WriteMultiMillonData("tag1", "Hello", 100 * 10000); int64_t st = taosGetTimestampUs();
index->WriteMultiMillonData("tag2", "Test", 100 * 10000); int32_t num = 1000 * 10000;
index->WriteMultiMillonData("tag3", "Test", 100 * 10000); index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
index->WriteMultiMillonData("tag4", "Test", 100 * 10000); std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
} }
TEST_F(IndexEnv2, testLongComVal) { TEST_F(IndexEnv2, testLongComVal) {
std::string path = "/tmp/long_colVal"; std::string path = "/tmp/long_colVal";
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册