diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 73bc748705c49700d973b641dd0fd5a4534980fc..e6defca7243fb3d9337daeeeb4efe4fc861e3851 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -1332,7 +1332,7 @@ typedef struct {
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
- int8_t ignoreNotExists;
+ int64_t suid;
} SVDropTbReq;
typedef struct {
diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c
index 156009fa6822ee922170ac44b7e9aae0d8a39482..c174de9893a96d1efa4d4789d9f8b4ffd0adf15b 100644
--- a/source/dnode/mgmt/impl/src/dndVnodes.c
+++ b/source/dnode/mgmt/impl/src/dndVnodes.c
@@ -421,6 +421,10 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt->totalVnodes = numOfVnodes;
int32_t threadNum = pDnode->env.numOfCores;
+#if 1
+ threadNum = 1;
+#endif
+
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c
index 79e5d9eae595eb95be701c3f0da8518cbd55f660..b458403dbf1f7a60becc9ba5d04c4f823eccf8cd 100644
--- a/source/dnode/mnode/impl/src/mndProfile.c
+++ b/source/dnode/mnode/impl/src/mndProfile.c
@@ -179,7 +179,12 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
}
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
- SMnode *pMnode = pReq->pMnode;
+ SMnode *pMnode = pReq->pMnode;
+ SUserObj *pUser = NULL;
+ SDbObj *pDb = NULL;
+ SConnObj *pConn = NULL;
+ int32_t code = -1;
+
SConnectReq *pConnReq = pReq->rpcMsg.pCont;
pConnReq->pid = htonl(pConnReq->pid);
pConnReq->startTime = htobe64(pConnReq->startTime);
@@ -187,54 +192,61 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
SRpcConnInfo info = {0};
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
- return -1;
+ goto CONN_OVER;
}
char ip[30];
taosIp2String(info.clientIp, ip);
+ pUser = mndAcquireUser(pMnode, pReq->user);
+ if (pUser == NULL) {
+ mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
+ goto CONN_OVER;
+ }
+
if (pConnReq->db[0]) {
- snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pReq->acctId, TS_PATH_DELIMITER, pConnReq->db);
- SDbObj *pDb = mndAcquireDb(pMnode, pReq->db);
+ snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db);
+ pDb = mndAcquireDb(pMnode, pReq->db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
- return -1;
+ goto CONN_OVER;
}
- mndReleaseDb(pMnode, pDb);
}
- SConnObj *pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
+ pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
- return -1;
+ goto CONN_OVER;
}
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
if (pRsp == NULL) {
- mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
- return -1;
- }
-
- SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
- if (pUser != NULL) {
- pRsp->acctId = htonl(pUser->acctId);
- pRsp->superUser = pUser->superUser;
- mndReleaseUser(pMnode, pUser);
+ goto CONN_OVER;
}
+ pRsp->acctId = htonl(pUser->acctId);
+ pRsp->superUser = pUser->superUser;
pRsp->clusterId = htobe64(pMnode->clusterId);
pRsp->connId = htonl(pConn->id);
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
- mndReleaseConn(pMnode, pConn);
pReq->contLen = sizeof(SConnectRsp);
pReq->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
- return 0;
+
+ code = 0;
+
+CONN_OVER:
+
+ mndReleaseUser(pMnode, pUser);
+ mndReleaseDb(pMnode, pDb);
+ mndReleaseConn(pMnode, pConn);
+
+ return code;
}
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
@@ -258,33 +270,27 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
}
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
- SMnode *pMnode = pReq->pMnode;
- char *batchReqStr = pReq->rpcMsg.pCont;
+ SMnode *pMnode = pReq->pMnode;
+ char *batchReqStr = pReq->rpcMsg.pCont;
SClientHbBatchReq batchReq = {0};
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
SArray *pArray = batchReq.reqs;
- int sz = taosArrayGetSize(pArray);
+ int sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) {
- SClientHbReq* pHbReq = taosArrayGet(pArray, i);
+ SClientHbReq *pHbReq = taosArrayGet(pArray, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
-
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
- SClientHbRsp rsp = {
- .status = 0,
- .connKey = pHbReq->connKey,
- .bodyLen = 0,
- .body = NULL
- };
+ SClientHbRsp rsp = {.status = 0, .connKey = pHbReq->connKey, .bodyLen = 0, .body = NULL};
taosArrayPush(batchRsp.rsps, &rsp);
}
}
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
- void* buf = rpcMallocCont(tlen);
- void* bufCopy = buf;
+ void *buf = rpcMallocCont(tlen);
+ void *bufCopy = buf;
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp);
pReq->contLen = tlen;
pReq->pCont = buf;
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index ad8c16f826bf222eef9d04d7e4ce8384f55482c1..a6fd2a3c5852a87729e46393240e136bd58987aa 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -31,16 +31,16 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb);
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
-static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb);
-static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg);
-static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg);
-static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg);
-static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg);
-static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg);
-static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg);
-static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg);
-static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
-static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
+static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
+static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq);
+static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq);
+static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
+static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp);
+static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp);
+static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp);
+static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq);
+static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
+static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
int32_t mndInitStb(SMnode *pMnode) {
@@ -52,13 +52,13 @@ int32_t mndInitStb(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
- mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcesSMCreateStbReq);
- mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcesSMAlterStbReq);
- mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcesSMDropStbReq);
- mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp);
- mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp);
- mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp);
- mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaMsg);
+ mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
+ mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
+ mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
+ mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
+ mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp);
+ mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
+ mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
@@ -177,27 +177,27 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
return 0;
}
-static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) {
- mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb);
- atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime);
- atomic_exchange_32(&pOldStb->version, pNewStb->version);
+static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
+ mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
+ atomic_exchange_32(&pOld->updateTime, pNew->updateTime);
+ atomic_exchange_32(&pOld->version, pNew->version);
- taosWLockLatch(&pOldStb->lock);
- pOldStb->numOfColumns = pNewStb->numOfColumns;
- pOldStb->numOfTags = pNewStb->numOfTags;
- int32_t totalCols = pNewStb->numOfTags + pNewStb->numOfColumns;
+ taosWLockLatch(&pOld->lock);
+ pOld->numOfColumns = pNew->numOfColumns;
+ pOld->numOfTags = pNew->numOfTags;
+ int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema);
- if (pOldStb->numOfTags + pOldStb->numOfColumns < totalCols) {
+ if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
void *pSchema = malloc(totalSize);
if (pSchema != NULL) {
- free(pOldStb->pSchema);
- pOldStb->pSchema = pSchema;
+ free(pOld->pSchema);
+ pOld->pSchema = pSchema;
}
}
- memcpy(pOldStb->pSchema, pNewStb->pSchema, totalSize);
- taosWUnLockLatch(&pOldStb->lock);
+ memcpy(pOld->pSchema, pNew->pSchema, totalSize);
+ taosWUnLockLatch(&pOld->lock);
return 0;
}
@@ -215,7 +215,7 @@ void mndReleaseStb(SMnode *pMnode, SStbObj *pStb) {
sdbRelease(pSdb, pStb);
}
-static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
+static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) {
SName name = {0};
tNameFromString(&name, stbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@@ -225,17 +225,17 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, char *stbName) {
return mndAcquireDb(pMnode, db);
}
-static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int *pContLen) {
+static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) {
SVCreateTbReq req;
void *buf;
- int bsize;
+ int32_t bsize;
SMsgHead *pMsgHead;
req.ver = 0;
SName name = {0};
- tNameFromString(&name, pStb->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
+ tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
- req.name = (char*) tNameGetTableName(&name);
+ req.name = (char *)tNameGetTableName(&name);
req.ttl = 0;
req.keep = 0;
req.type = TD_SUPER_TABLE;
@@ -264,7 +264,7 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
return buf;
}
-static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
+static SVDropTbReq *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropTbReq);
SVDropTbReq *pDrop = calloc(1, contLen);
@@ -276,12 +276,12 @@ static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj
pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
- // pDrop->suid = htobe64(pStb->uid);
+ pDrop->suid = htobe64(pStb->uid);
return pDrop;
}
-static int32_t mndCheckCreateStbMsg(SMCreateStbReq *pCreate) {
+static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
pCreate->numOfTags = htonl(pCreate->numOfTags);
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
@@ -356,15 +356,15 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
void *pIter = NULL;
- int contLen;
+ int32_t contLen;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
- void *pMsg = mndBuildCreateStbMsg(pMnode, pVgroup, pStb, &contLen);
- if (pMsg == NULL) {
+ void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
+ if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -373,11 +373,11 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
- action.pCont = pMsg;
+ action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_STB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
- free(pMsg);
+ free(pReq);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
@@ -398,8 +398,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
- SVDropTbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
- if (pMsg == NULL) {
+ SVDropTbReq *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb);
+ if (pReq == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -408,11 +408,11 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
- action.pCont = pMsg;
+ action.pCont = pReq;
action.contLen = sizeof(SVDropTbReq);
action.msgType = TDMT_VND_DROP_STB;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
- free(pMsg);
+ free(pReq);
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
return -1;
@@ -423,7 +423,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
-static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCreate, SDbObj *pDb) {
+static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0};
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
@@ -449,43 +449,17 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCr
}
int32_t code = 0;
- STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
- if (pTrans == NULL) {
- mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
- return -1;
- }
- mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
-
- if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
- mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
- goto CREATE_STB_OVER;
- }
-
- if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
- mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
- goto CREATE_STB_OVER;
- }
+ STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
+ if (pTrans == NULL) goto CREATE_STB_OVER;
- if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) {
- mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
- goto CREATE_STB_OVER;
- }
-
- if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
- mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
- goto CREATE_STB_OVER;
- }
-
- if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) {
- mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
- goto CREATE_STB_OVER;
- }
+ mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
- if (mndTransPrepare(pMnode, pTrans) != 0) {
- mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
- mndTransDrop(pTrans);
- return -1;
- }
+ if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
+ if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
+ if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
+ if (mndSetCreateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
+ if (mndSetCreateStbUndoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
+ if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_STB_OVER;
code = 0;
@@ -494,13 +468,13 @@ CREATE_STB_OVER:
return code;
}
-static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
- SMnode *pMnode = pMsg->pMnode;
- SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont;
+static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
+ SMnode *pMnode = pReq->pMnode;
+ SMCreateStbReq *pCreate = pReq->rpcMsg.pCont;
mDebug("stb:%s, start to create", pCreate->name);
- if (mndCheckCreateStbMsg(pCreate) != 0) {
+ if (mndCheckCreateStbReq(pCreate) != 0) {
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
@@ -536,7 +510,7 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
return -1;
}
- int32_t code = mndCreateStb(pMnode, pMsg, pCreate, pDb);
+ int32_t code = mndCreateStb(pMnode, pReq, pCreate, pDb);
mndReleaseDb(pMnode, pDb);
if (code != 0) {
@@ -548,12 +522,12 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
-static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
- mndTransProcessRsp(pMsg);
+static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
+ mndTransProcessRsp(pRsp);
return 0;
}
-static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
+static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
SSchema *pSchema = &pAlter->schema;
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
@@ -578,15 +552,15 @@ static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
return 0;
}
-static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
+static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pOld, SStbObj *pNew) { return 0; }
-static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
- SMnode *pMnode = pMsg->pMnode;
- SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont;
+static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
+ SMnode *pMnode = pReq->pMnode;
+ SMAlterStbReq *pAlter = pReq->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name);
- if (mndCheckAlterStbMsg(pAlter) != 0) {
+ if (mndCheckAlterStbReq(pAlter) != 0) {
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
return -1;
}
@@ -601,7 +575,7 @@ static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
SStbObj stbObj = {0};
memcpy(&stbObj, pStb, sizeof(SStbObj));
- int32_t code = mndUpdateStb(pMnode, pMsg, pStb, &stbObj);
+ int32_t code = mndUpdateStb(pMnode, pReq, pStb, &stbObj);
mndReleaseStb(pMnode, pStb);
if (code != 0) {
@@ -612,8 +586,8 @@ static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
-static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) {
- mndTransProcessRsp(pMsg);
+static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) {
+ mndTransProcessRsp(pRsp);
return 0;
}
@@ -648,44 +622,19 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SStbObj
static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { return 0; }
-static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
+static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pStb) {
int32_t code = -1;
- STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
- if (pTrans == NULL) {
- mError("stb:%s, failed to drop since %s", pStb->name, terrstr());
- return -1;
- }
- mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
-
- if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) {
- mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
- goto DROP_STB_OVER;
- }
-
- if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) {
- mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
- goto DROP_STB_OVER;
- }
+ STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
+ if (pTrans == NULL)goto DROP_STB_OVER;
- if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) {
- mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
- goto DROP_STB_OVER;
- }
-
- if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) {
- mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
- goto DROP_STB_OVER;
- }
-
- if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) {
- mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
- goto DROP_STB_OVER;
- }
+ mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
- if (mndTransPrepare(pMnode, pTrans) != 0) {
- mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
- goto DROP_STB_OVER;
- }
+ if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
+ if (mndSetDropStbUndoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
+ if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
+ if (mndSetDropStbRedoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
+ if (mndSetDropStbUndoActions(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
+ if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_STB_OVER;
code = 0;
@@ -694,9 +643,9 @@ DROP_STB_OVER:
return 0;
}
-static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
- SMnode *pMnode = pMsg->pMnode;
- SMDropStbReq *pDrop = pMsg->rpcMsg.pCont;
+static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
+ SMnode *pMnode = pReq->pMnode;
+ SMDropStbReq *pDrop = pReq->rpcMsg.pCont;
mDebug("stb:%s, start to drop", pDrop->name);
@@ -712,7 +661,7 @@ static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
}
}
- int32_t code = mndDropStb(pMnode, pMsg, pStb);
+ int32_t code = mndDropStb(pMnode, pReq, pStb);
mndReleaseStb(pMnode, pStb);
if (code != 0) {
@@ -724,14 +673,14 @@ static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
-static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
- mndTransProcessRsp(pMsg);
+static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) {
+ mndTransProcessRsp(pRsp);
return 0;
}
-static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
- SMnode *pMnode = pMsg->pMnode;
- STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
+static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
+ SMnode *pMnode = pReq->pMnode;
+ STableInfoReq *pInfo = pReq->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
@@ -786,8 +735,8 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
- pMsg->pCont = pMeta;
- pMsg->contLen = contLen;
+ pReq->pCont = pMeta;
+ pReq->contLen = contLen;
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
return 0;
@@ -820,8 +769,8 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
return 0;
}
-static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
- SMnode *pMnode = pMsg->pMnode;
+static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
+ SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetNumOfStbs(pMnode, pShow->db, &pShow->numOfRows) != 0) {
@@ -883,8 +832,8 @@ static void mndExtractTableName(char *tableId, char *name) {
}
}
-static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
- SMnode *pMnode = pMsg->pMnode;
+static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
+ SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStbObj *pStb = NULL;
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 86ba2d99a30799ae7d8907c928f12d2cd5e77423..bf804904164724f634f2785baabbbded569eaac9 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -85,7 +85,6 @@ enum {
typedef struct STableCheckInfo {
uint64_t tableId;
TSKEY lastKey;
- STable* pTableObj;
SBlockInfo* pCompInfo;
int32_t compSize;
int32_t numOfBlocks:29; // number of qualified data blocks not the original blocks
@@ -141,8 +140,6 @@ typedef struct STsdbReadHandle {
STableBlockInfo* pDataBlockInfo;
SDataCols *pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
-// STsdb
-// STsdbMemTable * pMemTable;
SArray *defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQueryAttr */
@@ -204,8 +201,8 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
// the primary timestamp column does not be included in the the specified load column list, add it
- if (loadTS && colId != 0) {
- int16_t columnId = 0;
+ if (loadTS && colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
+ int16_t columnId = PRIMARYKEY_TIMESTAMP_COL_ID;
taosArrayInsert(pLocalIdList, 0, &columnId);
}
@@ -292,7 +289,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
for (int32_t j = 0; j < gsize; ++j) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
- STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable };
+ STableCheckInfo info = { .lastKey = pKeyInfo->lastKey};
// assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
// info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
@@ -315,10 +312,9 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S
// taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar);
size_t gsize = taosArrayGetSize(pTableCheckInfo);
- for (int32_t i = 0; i < gsize; ++i) {
- STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
- taosArrayPush(pTable, &pInfo->pTableObj);
- }
+// for (int32_t i = 0; i < gsize; ++i) {
+// STableCheckInfo* pInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, i);
+// }
*psTable = pTable;
return pTableCheckInfo;
@@ -347,15 +343,11 @@ static void resetCheckInfo(STsdbReadHandle* pTsdbReadHandle) {
// only one table, not need to sort again
static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY skey, SArray** psTable) {
SArray* pNew = taosArrayInit(1, sizeof(STableCheckInfo));
- SArray* pTable = taosArrayInit(1, sizeof(STable*));
- STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj};
+ STableCheckInfo info = { .lastKey = skey};
info.tableId = pCheckInfo->tableId;
taosArrayPush(pNew, &info);
- taosArrayPush(pTable, &pCheckInfo->pTableObj);
-
- *psTable = pTable;
return pNew;
}
@@ -461,9 +453,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
}
-// STsdbMeta* pMeta = NULL;//tsdbGetMeta(tsdb);
-// assert(pMeta != NULL);
-
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
if (pReadHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId);
@@ -641,12 +630,6 @@ SArray* tsdbGetQueriedTableList(tsdbReadHandleT *pHandle) {
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
SArray* res = taosArrayInit(size, POINTER_BYTES);
-
- for(int32_t i = 0; i < size; ++i) {
- STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
- taosArrayPush(res, &pCheckInfo->pTableObj);
- }
-
return res;
}
@@ -1049,7 +1032,10 @@ static int32_t loadBlockInfo(STsdbReadHandle * pTsdbReadHandle, int32_t index, i
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
pCheckInfo->numOfBlocks = 0;
- if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, pCheckInfo->pTableObj) != TSDB_CODE_SUCCESS) {
+ STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
+ table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
+
+ if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
code = terrno;
return code;
}
@@ -1149,7 +1135,7 @@ static int32_t getFileCompInfo(STsdbReadHandle* pTsdbReadHandle, int32_t* numOfB
static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, int32_t slotIndex) {
int64_t st = taosGetTimestampUs();
- STSchema *pSchema = NULL;//tsdbGetTableSchema(pCheckInfo->pTableObj);
+ STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId);
@@ -1184,7 +1170,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
pBlockLoadInfo->fileGroup = pTsdbReadHandle->pFileGroup;
pBlockLoadInfo->slot = pTsdbReadHandle->cur.slot;
- pBlockLoadInfo->uid = pCheckInfo->pTableObj->uid;
+ pBlockLoadInfo->uid = pCheckInfo->tableId;
SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0];
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows);
@@ -1878,7 +1864,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
- STable* pTable = pCheckInfo->pTableObj;
+ STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
@@ -1932,7 +1918,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
rv2 = memRowVersion(row2);
}
- mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
+ mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
@@ -1958,7 +1944,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE;
- mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
+ mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, forceSetNull);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
@@ -2745,7 +2731,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// if (ret != TSDB_CODE_SUCCESS) {
// return false;
// }
- mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->pTableObj, NULL, NULL, true);
+ mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, 0, pRow, NULL, numOfCols, pCheckInfo->tableId, NULL, NULL, true);
tfree(pRow);
// update the last key value
@@ -3389,14 +3375,14 @@ SArray* tsdbRetrieveDataBlock(tsdbReadHandleT* pTsdbReadHandle, SArray* pIdList)
if (pHandle->cur.mixBlock) {
return pHandle->pColumns;
} else {
- SDataBlockInfo binfo = {0};/*GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);*/
+ SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
assert(pHandle->realNumOfRows <= binfo.rows);
// data block has been loaded, todo extract method
SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo;
if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fid == pHandle->cur.fid &&
- pBlockLoadInfo->uid == pCheckInfo->pTableObj->tid) {
+ pBlockLoadInfo->uid == pCheckInfo->tableId) {
return pHandle->pColumns;
} else { // only load the file block
SBlock* pBlock = pBlockInfo->compBlock;
diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c
index c4beac452d2eb3cd637e850396d4c48ac3a710ff..3dcbb7888b767988a13bf023daf68b78050f379e 100644
--- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c
@@ -551,7 +551,7 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32
static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds,
int numOfColIds) {
ASSERT(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
- ASSERT(colIds[0] == 0);
+ ASSERT(colIds[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
SDFile * pDFile = (pBlock->last) ? TSDB_READ_LAST_FILE(pReadh) : TSDB_READ_DATA_FILE(pReadh);
SBlockCol blockCol = {0};
@@ -588,7 +588,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
if (pDataCol == NULL) continue;
ASSERT(pDataCol->colId == colId);
- if (colId == 0) { // load the key row
+ if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row
blockCol.colId = colId;
blockCol.len = pBlock->keyLen;
blockCol.type = pDataCol->type;
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 93f792e6e3593853c842bb8f28f8ee786e9c9719..a515804234168510a33c109177ed07e3744f7cbc 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -4927,6 +4927,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
if (NULL == pMsg) { // todo handle malloc error
+
}
SEpSet epSet;
@@ -7381,6 +7382,7 @@ int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* r
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
cond.twindow = pTableScanNode->window;
+ cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
for(int32_t i = 0; i < cond.numOfCols; ++i) {
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc
index 468aeba8a9adad92d730bc251b3416eddea7e60f..151deaf29bfa183e40f3ad32d0ca9c8994ea6e39 100644
--- a/source/libs/transport/test/transportTests.cc
+++ b/source/libs/transport/test/transportTests.cc
@@ -13,23 +13,126 @@
* along with this program. If not, see .
*/
+#ifdef USE_UV
+
#include
#include
#include
#include
#include
+#include
#include "transportInt.h"
#include "trpc.h"
using namespace std;
-int main() {
- SRpcInit init = {.localPort = 6030, .label = "rpc", .numOfThreads = 5};
- void* p = rpcOpen(&init);
+struct QueueElem {
+ queue q;
+ int val;
+};
+class QueueObj {
+ public:
+ QueueObj() {
+ // avoid formate
+ QUEUE_INIT(&head);
+ }
+ void Push(QueueElem *el) {
+ // avoid formate
+ QUEUE_PUSH(&head, &el->q);
+ }
+ QueueElem *Pop() {
+ QueueElem *el = NULL;
+ if (!IsEmpty()) {
+ queue *h = QUEUE_HEAD(&head);
+ el = QUEUE_DATA(h, QueueElem, q);
+ QUEUE_REMOVE(&el->q);
+ }
+ return el;
+ }
+ bool IsEmpty() {
+ // avoid formate
+ return QUEUE_IS_EMPTY(&head);
+ }
+ void RmElem(QueueElem *el) {
+ // impl
+ QUEUE_REMOVE(&el->q);
+ }
+ void ForEach(std::vector &result) {
+ queue *h;
+ QUEUE_FOREACH(h, &head) {
+ // add more
+ QueueElem *el = QUEUE_DATA(h, QueueElem, q);
+ result.push_back(el->val);
+ }
+ }
+
+ private:
+ queue head;
+};
- while (1) {
- std::cout << "cron task" << std::endl;
- std::this_thread::sleep_for(std::chrono::milliseconds(10 * 1000));
+class QueueEnv : public ::testing::Test {
+ protected:
+ virtual void SetUp() {
+ // TODO
+ q = new QueueObj();
+ }
+ virtual void TearDown() {
+ delete q;
+ // formate
}
+ QueueObj *q;
+};
+
+TEST_F(QueueEnv, testPushAndPop) {
+ // add more test
+ assert(q->IsEmpty());
+
+ for (int i = 0; i < 100; i++) {
+ QueueElem *el = (QueueElem *)malloc(sizeof(QueueElem));
+ el->val = i;
+ q->Push(el);
+ }
+ int i = 0;
+ while (!q->IsEmpty()) {
+ QueueElem *el = q->Pop();
+ assert(el->val == i++);
+ free(el);
+ }
+ assert(q->IsEmpty());
+}
+TEST_F(QueueEnv, testRm) {
+ // add more test
+
+ std::vector set;
+ assert(q->IsEmpty());
+
+ for (int i = 0; i < 100; i++) {
+ QueueElem *el = (QueueElem *)malloc(sizeof(QueueElem));
+ el->val = i;
+ q->Push(el);
+ set.push_back(el);
+ }
+ for (int i = set.size() - 1; i >= 0; i--) {
+ QueueElem *el = set[i];
+ q->RmElem(el);
+ free(el);
+ }
+ assert(q->IsEmpty());
}
+TEST_F(QueueEnv, testIter) {
+ // add more test
+ assert(q->IsEmpty());
+ std::vector vals;
+ for (int i = 0; i < 100; i++) {
+ QueueElem *el = (QueueElem *)malloc(sizeof(QueueElem));
+ el->val = i;
+ q->Push(el);
+ vals.push_back(i);
+ }
+ std::vector result;
+ q->ForEach(result);
+ assert(result.size() == vals.size());
+}
+
+#endif