提交 30da7560 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into fix/mnode

......@@ -241,6 +241,7 @@ typedef struct {
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk
int32_t numOfBlocks;
const void* pMsg;
} SSubmitMsgIter;
......
......@@ -67,7 +67,6 @@ enum {
enum {
#endif
// Requests handled by DNODE
TD_NEW_MSG_SEG(TDMT_DND_MSG)
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL)
......@@ -82,8 +81,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_SERVER_STATUS, "server-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
// Requests handled by MNODE
TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL)
......@@ -117,6 +115,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNC, "retrieve-func", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNC, "drop-func", NULL, NULL)
......@@ -126,39 +125,37 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "systable-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mq-ask-ep", SMqAskEpReq, SMqAskEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mq-consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mq-consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mq-do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "mq-drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "alter-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "confirm-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "mnode-confirm-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
// Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL)
......@@ -186,65 +183,37 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "sync-timeout", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "sync-ping", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "sync-noop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "sync-unknown", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
// Requests handled by QNODE
TD_NEW_MSG_SEG(TDMT_QND_MSG)
// Requests handled by SNODE
TD_NEW_MSG_SEG(TDMT_SND_MSG)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
//TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
//TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
//TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL)
// Requests handled by SCHEDULER
TD_NEW_MSG_SEG(TDMT_SCH_MSG)
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "scheduler-link-broken", NULL, NULL)
// Monitor info exchange between processes
TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)
TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_VM_INFO, "monitor-vinfo", NULL, NULL)
......@@ -254,6 +223,22 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_SYNC_MSG)
TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timeout", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
......
......@@ -1087,7 +1087,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
tsem_wait(&pParam->sem);
}
if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) {
if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) {
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
}
......
......@@ -35,6 +35,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
}
pIter->totalLen = htonl(pMsg->length);
pIter->numOfBlocks = htonl(pMsg->numOfBlocks);
ASSERT(pIter->totalLen > 0);
pIter->len = 0;
pIter->pMsg = pMsg;
......
......@@ -223,15 +223,15 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
code = 0;
......
......@@ -365,15 +365,15 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0;
......
......@@ -163,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SRpcMsg rsp = {0};
// get original rpc msg
assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG);
assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG);
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg);
syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
SRpcMsg originalRpcMsg;
......
......@@ -376,35 +376,35 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_PING) {
} else if (pMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
......
......@@ -279,56 +279,56 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -715,8 +715,8 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
goto _exit;
}
submitRsp.pArray = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(SSubmitBlkRsp));
newTbUids = taosArrayInit(pSubmitReq->numOfBlocks, sizeof(int64_t));
submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
if (!submitRsp.pArray) {
pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......
......@@ -328,6 +328,18 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++));
}
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
if (pJob->refId < 0) {
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
CTG_ERR_JRET(terrno);
}
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum);
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(*job);
CTG_RET(code);
......
......@@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
}
bool fmIsDistExecFunc(int32_t funcId) {
if (fmIsUserDefinedFunc(funcId)) {
return false;
}
if (!fmIsVectorFunc(funcId)) {
return true;
}
......
......@@ -199,6 +199,22 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre
return code;
}
static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableStmt* pStmt) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
FOREACH(pNode, pStmt->pTables) {
SDropTableClause* pClause = (SDropTableClause*)pNode;
code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
......@@ -341,6 +357,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt);
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt);
case QUERY_NODE_DROP_TABLE_STMT:
return collectMetaKeyFromDropTable(pCxt, (SDropTableStmt*)pStmt);
case QUERY_NODE_ALTER_TABLE_STMT:
return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt);
case QUERY_NODE_USE_DATABASE_STMT:
......
......@@ -24,7 +24,7 @@ class ParserInitialDTest : public ParserDdlTest {};
// todo delete
// todo desc
// todo describe
// todo drop account
// todo DROP account
TEST_F(ParserInitialDTest, dropBnode) {
useDb("root", "test");
......@@ -62,51 +62,61 @@ TEST_F(ParserInitialDTest, dropCGroup) {
run("DROP CONSUMER GROUP IF EXISTS cg1 ON tp1");
}
// todo drop database
// todo drop dnode
// todo drop function
// todo DROP database
// todo DROP dnode
// todo DROP function
TEST_F(ParserInitialDTest, dropIndex) {
useDb("root", "test");
run("drop index index1 on t1");
run("DROP index index1 on t1");
}
TEST_F(ParserInitialDTest, dropMnode) {
useDb("root", "test");
run("drop mnode on dnode 1");
run("DROP mnode on dnode 1");
}
TEST_F(ParserInitialDTest, dropQnode) {
useDb("root", "test");
run("drop qnode on dnode 1");
run("DROP qnode on dnode 1");
}
TEST_F(ParserInitialDTest, dropSnode) {
useDb("root", "test");
run("drop snode on dnode 1");
run("DROP snode on dnode 1");
}
// todo drop stable
// todo drop stream
// todo drop table
TEST_F(ParserInitialDTest, dropSTable) {
useDb("root", "test");
run("DROP STABLE st1");
}
// todo DROP stream
TEST_F(ParserInitialDTest, dropTable) {
useDb("root", "test");
run("DROP TABLE t1");
}
TEST_F(ParserInitialDTest, dropTopic) {
useDb("root", "test");
run("drop topic tp1");
run("DROP topic tp1");
run("drop topic if exists tp1");
run("DROP topic if exists tp1");
}
TEST_F(ParserInitialDTest, dropUser) {
login("root");
useDb("root", "test");
run("drop user wxy");
run("DROP user wxy");
}
} // namespace ParserTest
......@@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) {
// case QUERY_NODE_LOGIC_PLAN_AGG:
// return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
......@@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
}
// case QUERY_NODE_LOGIC_PLAN_SORT:
// return stbSplHasMultiTbScan(pNode);
// return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
default:
......@@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code;
}
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) {
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys,
SLogicNode* pPartChild) {
SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S
pMerge->srcGroupId = pCxt->groupId;
pMerge->node.pParent = pParent;
pMerge->node.precision = pPartChild->precision;
int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk));
if (TSDB_CODE_SUCCESS == code) {
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
if (NULL == pMerge->node.pTargets) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pParent->pChildren, pMerge);
pMerge->pMergeKeys = pMergeKeys;
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
if (NULL == pMerge->node.pTargets) {
nodesDestroyNode(pMerge);
return TSDB_CODE_OUT_OF_MEMORY;
}
return code;
return nodesListMakeAppend(&pParent->pChildren, pMerge);
}
static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow);
SNodeList* pMergeKeys = NULL;
code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk));
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
......@@ -365,6 +369,124 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf
}
}
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
SNodeList* pFunc = pMergeAgg->pAggFuncs;
pMergeAgg->pAggFuncs = NULL;
SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
pMergeAgg->pGroupKeys = NULL;
SNodeList* pTargets = pMergeAgg->node.pTargets;
pMergeAgg->node.pTargets = NULL;
SNodeList* pChildren = pMergeAgg->node.pChildren;
pMergeAgg->node.pChildren = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg);
if (NULL == pPartAgg) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
pPartAgg->pGroupKeys = pGroupKeys;
code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
if (NULL == pMergeAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
pMergeAgg->node.pTargets = pTargets;
pPartAgg->node.pChildren = pChildren;
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
}
nodesDestroyList(pFunc);
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pPartAgg;
} else {
nodesDestroyNode(pPartAgg);
}
return code;
}
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return code;
}
static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) {
SNodeList* pSortKeys = pMergeSort->pSortKeys;
pMergeSort->pSortKeys = NULL;
SNodeList* pTargets = pMergeSort->node.pTargets;
pMergeSort->node.pTargets = NULL;
SNodeList* pChildren = pMergeSort->node.pChildren;
pMergeSort->node.pChildren = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort);
if (NULL == pPartSort) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
pMergeSort->node.pTargets = pTargets;
pPartSort->node.pChildren = pChildren;
if (TSDB_CODE_SUCCESS == code) {
pPartSort->pSortKeys = pSortKeys;
code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets);
if (NULL == pMergeSort->pSortKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = (SLogicNode*)pPartSort;
} else {
nodesDestroyNode(pPartSort);
}
return code;
}
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartSort = NULL;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort);
if (TSDB_CODE_SUCCESS == code) {
SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys);
if (NULL != pMergeKeys) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
}
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return code;
}
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
if (TSDB_CODE_SUCCESS == code) {
......@@ -386,9 +508,15 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(info.pSplitNode)) {
case QUERY_NODE_LOGIC_PLAN_AGG:
code = stbSplSplitAggNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_WINDOW:
code = stbSplSplitWindowNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_SORT:
code = stbSplSplitSortNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = stbSplSplitScanNode(pCxt, &info);
break;
......
......@@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) {
run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3");
run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3");
}
TEST_F(PlanGroupByTest, stable) {
useDb("root", "test");
run("SELECT COUNT(*) FROM st1");
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
}
......@@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {};
TEST_F(PlanOrderByTest, basic) {
useDb("root", "test");
// order by key is in the projection list
run("select c1 from t1 order by c1");
// order by key is not in the projection list
run("select c1 from t1 order by c2");
// ORDER BY key is in the projection list
run("SELECT c1 FROM t1 ORDER BY c1");
// ORDER BY key is not in the projection list
run("SELECT c1 FROM t1 ORDER BY c2");
}
TEST_F(PlanOrderByTest, expr) {
useDb("root", "test");
run("select * from t1 order by c1 + 10, c2");
run("SELECT * FROM t1 ORDER BY c1 + 10, c2");
}
TEST_F(PlanOrderByTest, nullsOrder) {
useDb("root", "test");
run("select * from t1 order by c1 desc nulls first");
run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST");
}
TEST_F(PlanOrderByTest, stable) {
useDb("root", "test");
// ORDER BY key is in the projection list
run("SELECT c1 FROM st1 ORDER BY c1");
}
......@@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL);
// if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
......@@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
......@@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pAppendEntry->index;
......@@ -346,7 +346,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
......
......@@ -124,7 +124,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
// config change
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
SSyncCfg newSyncCfg;
......
......@@ -256,7 +256,7 @@ static void *syncIOConsumerFunc(void *param) {
syncRpcMsgLog2((char *)"==syncIOConsumerFunc==", pRpcMsg);
// use switch case instead of if else
if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
if (pRpcMsg->msgType == TDMT_SYNC_PING) {
if (io->FpOnSyncPing != NULL) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) {
syncPingDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
if (io->FpOnSyncPingReply != NULL) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) {
syncPingReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
if (io->FpOnSyncClientRequest != NULL) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) {
syncClientRequestDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote != NULL) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) {
syncRequestVoteDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) {
syncRequestVoteReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries != NULL) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) {
syncAppendEntriesDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply != NULL) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) {
syncAppendEntriesReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
} else if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
......@@ -365,7 +365,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) {
if (pMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) {
sTrace("==syncIOProcessReply==");
} else {
syncRpcMsgLog2((char *)"==syncIOProcessReply==", pMsg);
......
......@@ -174,7 +174,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
sInfo("==syncReconfig== newconfig:%s", configChange);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
rpcMsg.info.noResp = 1;
rpcMsg.contLen = strlen(configChange) + 1;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
......@@ -1399,7 +1399,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......@@ -1421,7 +1421,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......
......@@ -22,50 +22,50 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
// in compiler optimization, switch case = if else constants
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout* pSyncMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncTimeout2Json(pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing* pSyncMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncPing2Json(pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply* pSyncMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncPingReply2Json(pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest* pSyncMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncClientRequest2Json(pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_REPLY) {
pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote* pSyncMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncRequestVote2Json(pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncRequestVoteReply2Json(pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries* pSyncMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncAppendEntries2Json(pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncAppendEntriesReply2Json(pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) {
pRoot = cJSON_CreateObject();
char* s;
s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
......@@ -98,7 +98,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* syncRpcUnknownMsg2Json() {
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "msgType", TDMT_VND_SYNC_UNKNOWN);
cJSON_AddNumberToObject(pRoot, "msgType", TDMT_SYNC_UNKNOWN);
cJSON_AddStringToObject(pRoot, "data", "unknown message");
cJSON* pJson = cJSON_CreateObject();
......@@ -146,7 +146,7 @@ SyncTimeout* syncTimeoutBuild() {
SyncTimeout* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_VND_SYNC_TIMEOUT;
pMsg->msgType = TDMT_SYNC_TIMEOUT;
return pMsg;
}
......@@ -275,7 +275,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) {
SyncPing* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_VND_SYNC_PING;
pMsg->msgType = TDMT_SYNC_PING;
pMsg->dataLen = dataLen;
return pMsg;
}
......@@ -535,7 +535,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
SyncPingReply* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_VND_SYNC_PING_REPLY;
pMsg->msgType = TDMT_SYNC_PING_REPLY;
pMsg->dataLen = dataLen;
return pMsg;
}
......@@ -795,7 +795,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) {
SyncClientRequest* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pMsg->seqNum = 0;
pMsg->isWeak = false;
pMsg->dataLen = dataLen;
......@@ -937,7 +937,7 @@ SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
return pMsg;
}
......@@ -1086,7 +1086,7 @@ SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE_REPLY;
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
return pMsg;
}
......@@ -1232,7 +1232,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId) {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES;
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
pMsg->dataLen = dataLen;
return pMsg;
}
......@@ -1398,7 +1398,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES_REPLY;
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
return pMsg;
}
......@@ -1546,7 +1546,7 @@ SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) {
SyncApplyMsg* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = TDMT_VND_SYNC_APPLY_MSG;
pMsg->msgType = TDMT_SYNC_APPLY_MSG;
pMsg->dataLen = dataLen;
return pMsg;
}
......
......@@ -59,14 +59,14 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
memset(&rpcMsg, 0, sizeof(SRpcMsg));
rpcMsg.contLen = head.contLen;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
rpcMsg.msgType = TDMT_VND_SYNC_NOOP;
rpcMsg.msgType = TDMT_SYNC_NOOP;
memcpy(rpcMsg.pCont, &head, sizeof(head));
SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen);
assert(pEntry != NULL);
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
pEntry->originalRpcType = TDMT_VND_SYNC_NOOP;
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
pEntry->originalRpcType = TDMT_SYNC_NOOP;
pEntry->seqNum = 0;
pEntry->isWeak = 0;
pEntry->term = term;
......
......@@ -104,7 +104,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
assert(pEntry != NULL);
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
......
......@@ -215,28 +215,28 @@ void syncUtilMsgNtoH(void* msg) {
}
bool syncUtilIsData(tmsg_t msgType) {
if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) {
if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) {
return false;
}
return true;
}
bool syncUtilUserPreCommit(tmsg_t msgType) {
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
return true;
}
return false;
}
bool syncUtilUserCommit(tmsg_t msgType) {
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
return true;
}
return false;
}
bool syncUtilUserRollback(tmsg_t msgType) {
if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
return true;
}
return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册