diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index fe9ffda69cb82f4c0d2113218fb44b4889158730..2cf188866bd0486fe1d44f72b9cce87245160c89 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -67,7 +67,6 @@ enum { enum { #endif - // Requests handled by DNODE TD_NEW_MSG_SEG(TDMT_DND_MSG) TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL) @@ -82,8 +81,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_SERVER_STATUS, "server-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) - - // Requests handled by MNODE + TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "create-acct", NULL, NULL) @@ -95,7 +93,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DNODE, "alter-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL) @@ -103,6 +100,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL) @@ -115,52 +113,53 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "alter-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNC, "retrieve-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNC, "drop-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "drop-sma", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp) - TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "systable-retrieve", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) - TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) - TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mq-ask-ep", SMqAskEpReq, SMqAskEpRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mq-consumer-lost", SMqConsumerLostMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mq-consumer-recover", SMqConsumerRecoverMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mq-do-rebalance", SMqDoRebalanceMsg, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "mq-drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "alter-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "ask-ep", SMqAskEpReq, SMqAskEpRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "consumer-lost", SMqConsumerLostMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) + TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp) + TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "mnode-confirm-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) - // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "query-continue", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "fetch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "create-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "alter-table", NULL, NULL) @@ -182,67 +181,38 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) - TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "vnode-sync-client-request", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "vnode-sync-client-request-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "vnode-sync-request-vote", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "vnode-sync-request-vote-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "vnode-sync-append-entries", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "vnode-sync-append-entries-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "vnode-sync-noop", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL) - - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL) - - TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "vnode-delete-data", SVDeleteReq, SVDeleteRsp) - - // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) - // Requests handled by SNODE TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) - //TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - //TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - //TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) - TD_DEF_MSG_TYPE(TDMT_SND_TASK_RUN, "snode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DISPATCH, "snode-stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SND_TASK_RECOVER, "snode-stream-task-recover", NULL, NULL) - // Requests handled by SCHEDULER TD_NEW_MSG_SEG(TDMT_SCH_MSG) - TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "scheduler-link-broken", NULL, NULL) - - // Monitor info exchange between processes + TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) + TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_VM_INFO, "monitor-vinfo", NULL, NULL) @@ -252,6 +222,22 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) + + TD_NEW_MSG_SEG(TDMT_SYNC_MSG) + TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timeout", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index eb37316402538a1c89c7f3c7dbf0fa21b57843c0..01b8d5c51e9139d3079bdb5975dba63a7e39591c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -229,6 +229,7 @@ typedef struct STableScanPhysiNode { double ratio; int32_t dataRequired; SNodeList* pDynamicScanFuncs; + SNodeList* pPartitionKeys; int64_t interval; int64_t offset; int64_t sliding; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9762ac6ba5d1eb2c729bcfb4cc26f642ec0e2846..6694954fa26c26048eef051067e7c217cab919cd 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1101,7 +1101,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc tsem_wait(&pParam->sem); } - if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) { + if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5b8d214cb412bd8b439880157afbc61e07025af2..2b59354d609775c6db7b2b1383aadbce4718d7c6 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -87,11 +87,12 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "wal", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, - {.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "cache_model", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, + {.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update }; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8d0d503d8f6a09a7a73233bad9816ce7023a4d53..4a79513736fddf733feee97106cb6e9e2c048954 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -129,12 +129,7 @@ SArray *mmGetMsgHandles() { SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - - // Requests handled by DNODE if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -146,7 +141,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - // Requests handled by MNODE if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -159,6 +153,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -172,66 +168,66 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIRM_WRITE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - // Requests handled by VNODE + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index f08ac6b9d00f06d76a388fa78027b165c3060cda..0819f79cf92139e6e9eaa734237df15de4340d1e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -365,15 +365,15 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e714b9ad5c531be26adffa4774390082a8c0ad72..b1c04fd8cf0a78537950c922a570ee22d357d6ef 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -163,7 +163,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg rsp = {0}; // get original rpc msg - assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG); + assert(pMsg->msgType == TDMT_SYNC_APPLY_MSG); SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg); syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); SRpcMsg originalRpcMsg; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a2a925b0bd23ce62fc95114a7439091a47d90797..bbd80d765a876c671e2131fe3a17e22233acac02 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1431,6 +1431,52 @@ const char *mndGetDbStr(const char *src) { return pos; } +int64_t getValOfDiffPrecision(int8_t unit, int64_t val) { + int64_t v = 0; + switch(unit) { + case 's': v = val / 1000; break; + case 'm': v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI]; break; + case 'h': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60); break; + case 'd': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60); break; + case 'w': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7); break; + default: + break; + } + + return v; +} + +char* buildRetension(SArray* pRetension) { + size_t size = taosArrayGetSize(pRetension); + if (size == 0) { + return NULL; + } + + char* p1 = taosMemoryCalloc(1, 100); + SRetention* p = taosArrayGet(pRetension, 0); + + int32_t len = 2; + + int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); + + p = taosArrayGet(pRetension, 1); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); + + p = taosArrayGet(pRetension, 2); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c", v1, p->freqUnit, v2, p->keepUnit); + + varDataSetLen(p1, len); + return p1; +} + static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables, bool sysDb) { int32_t cols = 0; @@ -1478,7 +1524,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false); - const char *src = pDb->cfg.strict ? "strict" : "nostrict"; + const char *src = pDb->cfg.strict ? "strict" : "no_strict"; char strict[24] = {0}; STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1526,7 +1572,21 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false); + + STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src)); +#if 0 + char cacheModel[24] = {0}; + bool null = false; + if (pDb->cfg.cacheLastRow == 0) { + STR_TO_VARSTR(cacheModel, "no_cache"); + } else if (pDb->cfg.cacheLastRow == 1) { + STR_TO_VARSTR(cacheModel, "last_row_cache") + } else { + null = true; + } + colDataAppend(pColInfo, rows, cacheModel, null); +#endif + colDataAppend(pColInfo, rows, (const char*) &pDb->cfg.cacheLastRow, false); char *prec = NULL; switch (pDb->cfg.precision) { @@ -1555,8 +1615,18 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)statusB, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); + + char* p = buildRetension(pDb->cfg.pRetensions); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols); + if (p == NULL) { + colDataAppendNULL(pColInfo, rows); + } else { + colDataAppend(pColInfo, rows, (const char *)p, false); + taosMemoryFree(p); + } } } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3a3fd7ebdb5ac8f56a64ea5b0169dfeda8cd3b97..d989ba6ec39f64b4660b1469c817909ad96f4ee7 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -376,35 +376,35 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); - if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_PING) { + } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); code = syncNodeOnPingCb(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); syncClientRequestDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); @@ -625,7 +625,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is acquired, ref:%d", ref); + // mTrace("mnode rpc is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -634,7 +634,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { void mndReleaseRpcRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is released, ref:%d", ref); + // mTrace("mnode rpc is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } @@ -675,7 +675,7 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is acquired, ref:%d", ref); + // mTrace("mnode sync is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -684,6 +684,6 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { void mndReleaseSyncRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is released, ref:%d", ref); + // mTrace("mnode sync is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b6de5eaf66946aeba65cab58b3c7c1830dae1096..537f2e09643cb70e16b66f87ceb8e28d9500eb3c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -287,56 +287,56 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SRpcMsg *pRpcMsg = pMsg; - if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); syncClientRequestDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 47ef45d7d743e7f63240593bda16a75edc808eef..0e5febf10d309688f9fc11ef9913ea7dd297405f 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -334,6 +334,18 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++)); } + pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob); + if (pJob->refId < 0) { + ctgError("add job to ref failed, error: %s", tstrerror(terrno)); + CTG_ERR_JRET(terrno); + } + + taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); + + qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum); + return TSDB_CODE_SUCCESS; + + _return: taosMemoryFreeClear(*job); CTG_RET(code); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f2f0bc20551336b8bfa14414697b9b5299ea6a49..88f308710ec5022c1ad1b83a38bfa0e4dd4e53a6 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -27,6 +27,10 @@ typedef struct { int32_t bytes; } SGroupKeys, SStateKeys; +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList); +uint64_t calcGroupId(char* pData, int32_t len); +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex); +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e7a3390cf3114b07be4439b23f346e11ded0f78f..60be28167d53cd0b23e65787d37f91d38281e95a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -333,6 +333,12 @@ typedef struct STableScanInfo { double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + int32_t curTWinIdx; } STableScanInfo; @@ -727,7 +733,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3c46f46e198bd1fae7ebd6173df64b8dec0e6737..8388253975fe69c7c328003adbcb61975bd66fbd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4387,9 +4387,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pDataReader == NULL && terrno != 0) { return NULL; } - + SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8c3a0c0e6e712ad07a381b3baa709b095ba955fb..3691024096d869a5d8527c0a95cbf5837e558a9c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -24,10 +24,10 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" +#include "executorInt.h" static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); -static uint64_t calcGroupId(char* pData, int32_t len); static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; @@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); } -static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); if ((*pGroupColVals) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -110,7 +110,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo return true; } -static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) { +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; size_t numOfGroupCols = taosArrayGetSize(pGroupCols); @@ -139,7 +139,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData } } -static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); @@ -409,7 +409,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SPartitionOperatorInfo* pInfo = pOperator->info; - int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); for (int32_t j = 0; j < pBlock->info.rows; ++j) { recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); @@ -622,8 +621,13 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosArrayDestroy(pInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } taosArrayDestroy(pInfo->pGroupColVals); taosMemoryFree(pInfo->keyBuf); + taosHashCleanup(pInfo->pGroupSet); taosMemoryFree(pInfo->columnOffset); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 348d85943e9ce306347a5530617131b6e00cf89a..0774ccec1cdbc71666df5a2940f1435276765132 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -33,6 +33,8 @@ #include "ttypes.h" #include "vnode.h" +#include "executorInt.h" + #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -373,6 +375,17 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { longjmp(pOperator->pTaskInfo->env, code); } + recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0); + int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); + + uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); + if (groupId) { + pBlock->info.groupId = *groupId; + }else if(len != 0){ + pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len); + taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t)); + } + // current block is filter out according to filter condition, continue load the next block if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { continue; @@ -497,21 +510,25 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pTableScanInfo->pResBlock); tsdbCleanupReadHandle(pTableScanInfo->dataReader); + taosArrayDestroy(pTableScanInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } + taosArrayDestroy(pTableScanInfo->pGroupColVals); + taosMemoryFree(pTableScanInfo->keyBuf); + taosHashCleanup(pTableScanInfo->pGroupSet); if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo); } } SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; @@ -522,7 +539,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - return NULL; + goto _error; } if (pTableScanNode->scan.pScanPseudoCols != NULL) { @@ -552,6 +569,18 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; + // for table group + pInfo->pGroupCols = groupKyes; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + if (pInfo->pGroupSet == NULL) { + goto _error; + } + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); @@ -559,6 +588,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->cost.openCost = 0; return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; } SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { @@ -901,7 +936,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 611ae8d81fdc681c28936456b5b46c0a7e09d4c0..67efdc5c0b91266500e3146b3c2d76350f28087d 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { } bool fmIsDistExecFunc(int32_t funcId) { + if (fmIsUserDefinedFunc(funcId)) { + return false; + } if (!fmIsVectorFunc(funcId)) { return true; } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 18c1341da86bb2306cc6cb2e5b1036e9bbba7a8e..7abe2442610bbe85e823fb3fb85843351f8e52e8 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -199,6 +199,22 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre return code; } +static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pStmt->pTables) { + SDropTableClause* pClause = (SDropTableClause*)pNode; + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) { int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); if (TSDB_CODE_SUCCESS == code) { @@ -341,6 +357,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt); case QUERY_NODE_CREATE_MULTI_TABLE_STMT: return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt); + case QUERY_NODE_DROP_TABLE_STMT: + return collectMetaKeyFromDropTable(pCxt, (SDropTableStmt*)pStmt); case QUERY_NODE_ALTER_TABLE_STMT: return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt); case QUERY_NODE_USE_DATABASE_STMT: diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index 5ad427d964ad1dc47a4fed64b51f89257ae53da6..1b1b930851880c86282584df57cb32ca4bef9f95 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -24,7 +24,7 @@ class ParserInitialDTest : public ParserDdlTest {}; // todo delete // todo desc // todo describe -// todo drop account +// todo DROP account TEST_F(ParserInitialDTest, dropBnode) { useDb("root", "test"); @@ -62,51 +62,61 @@ TEST_F(ParserInitialDTest, dropCGroup) { run("DROP CONSUMER GROUP IF EXISTS cg1 ON tp1"); } -// todo drop database -// todo drop dnode -// todo drop function +// todo DROP database +// todo DROP dnode +// todo DROP function TEST_F(ParserInitialDTest, dropIndex) { useDb("root", "test"); - run("drop index index1 on t1"); + run("DROP index index1 on t1"); } TEST_F(ParserInitialDTest, dropMnode) { useDb("root", "test"); - run("drop mnode on dnode 1"); + run("DROP mnode on dnode 1"); } TEST_F(ParserInitialDTest, dropQnode) { useDb("root", "test"); - run("drop qnode on dnode 1"); + run("DROP qnode on dnode 1"); } TEST_F(ParserInitialDTest, dropSnode) { useDb("root", "test"); - run("drop snode on dnode 1"); + run("DROP snode on dnode 1"); } -// todo drop stable -// todo drop stream -// todo drop table +TEST_F(ParserInitialDTest, dropSTable) { + useDb("root", "test"); + + run("DROP STABLE st1"); +} + +// todo DROP stream + +TEST_F(ParserInitialDTest, dropTable) { + useDb("root", "test"); + + run("DROP TABLE t1"); +} TEST_F(ParserInitialDTest, dropTopic) { useDb("root", "test"); - run("drop topic tp1"); + run("DROP topic tp1"); - run("drop topic if exists tp1"); + run("DROP topic if exists tp1"); } TEST_F(ParserInitialDTest, dropUser) { login("root"); useDb("root", "test"); - run("drop user wxy"); + run("DROP user wxy"); } } // namespace ParserTest diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e3c8b82e3988bd7943815a645332920f9d31d08b..a3d508690880c1d229c65137014f87d8efa733a9 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { - // case QUERY_NODE_LOGIC_PLAN_AGG: - // return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode); + case QUERY_NODE_LOGIC_PLAN_AGG: + return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; if (WINDOW_TYPE_INTERVAL != pWindow->winType) { @@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } // case QUERY_NODE_LOGIC_PLAN_SORT: - // return stbSplHasMultiTbScan(pNode); + // return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); default: @@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { +static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys, + SLogicNode* pPartChild) { SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; @@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S pMerge->srcGroupId = pCxt->groupId; pMerge->node.pParent = pParent; pMerge->node.precision = pPartChild->precision; - int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk)); - if (TSDB_CODE_SUCCESS == code) { - pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); - if (NULL == pMerge->node.pTargets) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeAppend(&pParent->pChildren, pMerge); + pMerge->pMergeKeys = pMergeKeys; + pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); + if (NULL == pMerge->node.pTargets) { + nodesDestroyNode(pMerge); + return TSDB_CODE_OUT_OF_MEMORY; } - return code; + return nodesListMakeAppend(&pParent->pChildren, pMerge); } static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartWindow = NULL; int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow); + SNodeList* pMergeKeys = NULL; + code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk)); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -365,6 +369,124 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf } } +static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) { + SNodeList* pFunc = pMergeAgg->pAggFuncs; + pMergeAgg->pAggFuncs = NULL; + SNodeList* pGroupKeys = pMergeAgg->pGroupKeys; + pMergeAgg->pGroupKeys = NULL; + SNodeList* pTargets = pMergeAgg->node.pTargets; + pMergeAgg->node.pTargets = NULL; + SNodeList* pChildren = pMergeAgg->node.pChildren; + pMergeAgg->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg); + if (NULL == pPartAgg) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pPartAgg->pGroupKeys = pGroupKeys; + code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets); + if (NULL == pMergeAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + pMergeAgg->node.pTargets = pTargets; + pPartAgg->node.pChildren = pChildren; + + code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets); + } + + nodesDestroyList(pFunc); + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartAgg; + } else { + nodesDestroyNode(pPartAgg); + } + + return code; +} + +static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartAgg = NULL; + int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + +static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) { + SNodeList* pSortKeys = pMergeSort->pSortKeys; + pMergeSort->pSortKeys = NULL; + SNodeList* pTargets = pMergeSort->node.pTargets; + pMergeSort->node.pTargets = NULL; + SNodeList* pChildren = pMergeSort->node.pChildren; + pMergeSort->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort); + if (NULL == pPartSort) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + pMergeSort->node.pTargets = pTargets; + pPartSort->node.pChildren = pChildren; + if (TSDB_CODE_SUCCESS == code) { + pPartSort->pSortKeys = pSortKeys; + code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets); + if (NULL == pMergeSort->pSortKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartSort; + } else { + nodesDestroyNode(pPartSort); + } + + return code; +} + +static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartSort = NULL; + int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort); + if (TSDB_CODE_SUCCESS == code) { + SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys); + if (NULL != pMergeKeys) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -386,9 +508,15 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(info.pSplitNode)) { + case QUERY_NODE_LOGIC_PLAN_AGG: + code = stbSplSplitAggNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; + case QUERY_NODE_LOGIC_PLAN_SORT: + code = stbSplSplitSortNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_SCAN: code = stbSplSplitScanNode(pCxt, &info); break; diff --git a/source/libs/planner/test/planGroupByTest.cpp b/source/libs/planner/test/planGroupByTest.cpp index cf516034707e53a230d4e2e7af6fb81c3b8aaecf..201df2efde6236c5ae68aaedf04625a2c4acda19 100644 --- a/source/libs/planner/test/planGroupByTest.cpp +++ b/source/libs/planner/test/planGroupByTest.cpp @@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) { run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3"); run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3"); } + +TEST_F(PlanGroupByTest, stable) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM st1"); + + run("SELECT COUNT(*) FROM st1 GROUP BY c1"); +} diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index d95d1bdf1d429d9ea6c5e62c5a280dbb5c6de477..ef6f438b55c60f89e96de40c9eeeff71f3f2fcdf 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {}; TEST_F(PlanOrderByTest, basic) { useDb("root", "test"); - // order by key is in the projection list - run("select c1 from t1 order by c1"); - // order by key is not in the projection list - run("select c1 from t1 order by c2"); + // ORDER BY key is in the projection list + run("SELECT c1 FROM t1 ORDER BY c1"); + // ORDER BY key is not in the projection list + run("SELECT c1 FROM t1 ORDER BY c2"); } TEST_F(PlanOrderByTest, expr) { useDb("root", "test"); - run("select * from t1 order by c1 + 10, c2"); + run("SELECT * FROM t1 ORDER BY c1 + 10, c2"); } TEST_F(PlanOrderByTest, nullsOrder) { useDb("root", "test"); - run("select * from t1 order by c1 desc nulls first"); + run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST"); +} + +TEST_F(PlanOrderByTest, stable) { + useDb("root", "test"); + + // ORDER BY key is in the projection list + run("SELECT c1 FROM st1 ORDER BY c1"); } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 3afe7b15e213c0da3760c7a8ef1f313d145cd31f..89dcd8a476b7c88397b47cdb56618fac1c5863b1 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); - // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + // if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) { if (syncUtilUserRollback(pRollBackEntry->msgType)) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -346,7 +346,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } // config change - if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; SSyncCfg newSyncCfg; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 4a1a40a2d7ddd47d9d6ec30a683f284dacc70fa7..c6376495a4775f5f4264f3d7960e3995f610cc87 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -124,7 +124,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } // config change - if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; SSyncCfg newSyncCfg; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index e30a39e6342c4b7df77ee9cfdbe4f29333e36c16..aa8484de998ff327ab34293d793a974b5e92a05a 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -256,7 +256,7 @@ static void *syncIOConsumerFunc(void *param) { syncRpcMsgLog2((char *)"==syncIOConsumerFunc==", pRpcMsg); // use switch case instead of if else - if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + if (pRpcMsg->msgType == TDMT_SYNC_PING) { if (io->FpOnSyncPing != NULL) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { if (io->FpOnSyncPingReply != NULL) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingReplyDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { if (io->FpOnSyncClientRequest != NULL) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) { syncClientRequestDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) { syncRequestVoteDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) { syncRequestVoteReplyDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { if (io->FpOnSyncAppendEntries != NULL) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) { syncAppendEntriesDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { if (io->FpOnSyncAppendEntriesReply != NULL) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) { syncAppendEntriesReplyDestroy(pSyncMsg); } - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + } else if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); assert(pSyncMsg != NULL); @@ -365,7 +365,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { } static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - if (pMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) { + if (pMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) { sTrace("==syncIOProcessReply=="); } else { syncRpcMsgLog2((char *)"==syncIOProcessReply==", pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 66806dbd0c8e6b40ac9331884ff0447263f0eaaa..d522b829dd8620c856316626c8c62c7e75ba83a0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -174,7 +174,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { sInfo("==syncReconfig== newconfig:%s", configChange); SRpcMsg rpcMsg = {0}; - rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; + rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE; rpcMsg.info.noResp = 1; rpcMsg.contLen = strlen(configChange) + 1; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); @@ -1399,7 +1399,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -1421,7 +1421,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index fae069f2e6b13c0073c6309f889dc7f8f92c8c6e..6871e6b3ed3d37c45833b6133e6430923ca5d727 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -22,50 +22,50 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; // in compiler optimization, switch case = if else constants - if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { + if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout* pSyncMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncTimeout2Json(pSyncMsg); syncTimeoutDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING) { SyncPing* pSyncMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPing2Json(pSyncMsg); syncPingDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply* pSyncMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPingReply2Json(pSyncMsg); syncPingReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { SyncClientRequest* pSyncMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncClientRequest2Json(pSyncMsg); syncClientRequestDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_REPLY) { pRoot = syncRpcUnknownMsg2Json(); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote* pSyncMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVote2Json(pSyncMsg); syncRequestVoteDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVoteReply2Json(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { SyncAppendEntries* pSyncMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntries2Json(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) { + } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntriesReply2Json(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pRpcMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) { + } else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) { pRoot = cJSON_CreateObject(); char* s; s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen); @@ -98,7 +98,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* syncRpcUnknownMsg2Json() { cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "msgType", TDMT_VND_SYNC_UNKNOWN); + cJSON_AddNumberToObject(pRoot, "msgType", TDMT_SYNC_UNKNOWN); cJSON_AddStringToObject(pRoot, "data", "unknown message"); cJSON* pJson = cJSON_CreateObject(); @@ -146,7 +146,7 @@ SyncTimeout* syncTimeoutBuild() { SyncTimeout* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_TIMEOUT; + pMsg->msgType = TDMT_SYNC_TIMEOUT; return pMsg; } @@ -275,7 +275,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) { SyncPing* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_PING; + pMsg->msgType = TDMT_SYNC_PING; pMsg->dataLen = dataLen; return pMsg; } @@ -535,7 +535,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { SyncPingReply* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_PING_REPLY; + pMsg->msgType = TDMT_SYNC_PING_REPLY; pMsg->dataLen = dataLen; return pMsg; } @@ -795,7 +795,7 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { SyncClientRequest* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; pMsg->seqNum = 0; pMsg->isWeak = false; pMsg->dataLen = dataLen; @@ -937,7 +937,7 @@ SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE; + pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; return pMsg; } @@ -1086,7 +1086,7 @@ SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_REQUEST_VOTE_REPLY; + pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; return pMsg; } @@ -1232,7 +1232,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; pMsg->dataLen = dataLen; return pMsg; } @@ -1398,7 +1398,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->vgId = vgId; - pMsg->msgType = TDMT_VND_SYNC_APPEND_ENTRIES_REPLY; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; return pMsg; } @@ -1546,7 +1546,7 @@ SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; - pMsg->msgType = TDMT_VND_SYNC_APPLY_MSG; + pMsg->msgType = TDMT_SYNC_APPLY_MSG; pMsg->dataLen = dataLen; return pMsg; } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 8755f71654382f3913a3c81b6ee1e9b6e91dbb69..05a2dbaa3f36b00cd9202844d65f97f022ba22ae 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -59,14 +59,14 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) memset(&rpcMsg, 0, sizeof(SRpcMsg)); rpcMsg.contLen = head.contLen; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - rpcMsg.msgType = TDMT_VND_SYNC_NOOP; + rpcMsg.msgType = TDMT_SYNC_NOOP; memcpy(rpcMsg.pCont, &head, sizeof(head)); SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen); assert(pEntry != NULL); - pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; - pEntry->originalRpcType = TDMT_VND_SYNC_NOOP; + pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; + pEntry->originalRpcType = TDMT_SYNC_NOOP; pEntry->seqNum = 0; pEntry->isWeak = 0; pEntry->term = term; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a6397f8cba24694d6f36847af5e877c72bd1a920..b353ed85db27888483f4de249f86206d4b195c0e 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -104,7 +104,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); assert(pEntry != NULL); - pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; + pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; pEntry->originalRpcType = pWalHandle->pHead->head.msgType; pEntry->seqNum = pWalHandle->pHead->head.syncMeta.seqNum; pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index d754acd9f831ac18ce7e28b5ef2fda4b2d8650db..48567b75c27b8c13aaefc6d029c4c7a957b23c1a 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -215,28 +215,28 @@ void syncUtilMsgNtoH(void* msg) { } bool syncUtilIsData(tmsg_t msgType) { - if (msgType == TDMT_VND_SYNC_NOOP || msgType == TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) { return false; } return true; } bool syncUtilUserPreCommit(tmsg_t msgType) { - if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { return true; } return false; } bool syncUtilUserCommit(tmsg_t msgType) { - if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { return true; } return false; } bool syncUtilUserRollback(tmsg_t msgType) { - if (msgType != TDMT_VND_SYNC_NOOP && msgType != TDMT_VND_SYNC_CONFIG_CHANGE) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) { return true; } return false; diff --git a/tests/script/tsim/db/alter_option.sim b/tests/script/tsim/db/alter_option.sim index f3adb4535ec0a2b6dae2de6a277ab3a913bb711a..744451150429ce110f2caf69af5e059765325f26 100644 --- a/tests/script/tsim/db/alter_option.sim +++ b/tests/script/tsim/db/alter_option.sim @@ -89,7 +89,7 @@ endi if $data4_db != 3 then # replica return -1 endi -if $data5_db != nostrict then # strict +if $data5_db != no_strict then # strict return -1 endi if $data6_db != 345600 then # days diff --git a/tests/script/tsim/db/create_all_options.sim b/tests/script/tsim/db/create_all_options.sim index d7d72c4a1df1116d563c2f9283994e9807eac9d7..88f0378d618229ee6edd4ccf7879793453305746 100644 --- a/tests/script/tsim/db/create_all_options.sim +++ b/tests/script/tsim/db/create_all_options.sim @@ -110,7 +110,7 @@ if $data4_db != 1 then # replica print expect 1, actual: $data4_db return -1 endi -if $data5_db != nostrict then # strict +if $data5_db != no_strict then # strict return -1 endi if $data6_db != 14400 then # days diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index 883dc1e7895015626c2f3a185fabfc38cf86a449..b2ca1e03456c6414459a72999180e9d57ca104c9 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -1,45 +1,45 @@ -run tsim/user/pass_alter.sim -run tsim/user/basic1.sim -run tsim/user/privilege2.sim -run tsim/user/user_len.sim -run tsim/user/privilege1.sim -run tsim/user/pass_len.sim -run tsim/table/basic1.sim -run tsim/trans/lossdata1.sim -run tsim/trans/create_db.sim -run tsim/stable/alter_metrics.sim -run tsim/stable/tag_modify.sim -run tsim/stable/alter_comment.sim -run tsim/stable/column_drop.sim -run tsim/stable/column_modify.sim -run tsim/stable/tag_rename.sim -run tsim/stable/vnode3.sim -run tsim/stable/metrics.sim -run tsim/stable/alter_insert2.sim -run tsim/stable/show.sim -run tsim/stable/alter_import.sim -run tsim/stable/tag_add.sim -run tsim/stable/tag_drop.sim -run tsim/stable/column_add.sim -run tsim/stable/alter_count.sim -run tsim/stable/values.sim -run tsim/stable/dnode3.sim -run tsim/stable/alter_insert1.sim -run tsim/stable/refcount.sim -run tsim/stable/disk.sim -run tsim/db/basic1.sim -run tsim/db/basic3.sim -run tsim/db/basic7.sim -run tsim/db/basic6.sim -run tsim/db/create_all_options.sim -run tsim/db/basic2.sim -run tsim/db/error1.sim -run tsim/db/taosdlog.sim -run tsim/db/alter_option.sim -run tsim/mnode/basic1.sim -run tsim/mnode/basic3.sim -run tsim/mnode/basic2.sim -run tsim/parser/fourArithmetic-basic.sim +#run tsim/user/pass_alter.sim +#run tsim/user/basic1.sim +#run tsim/user/privilege2.sim +#run tsim/user/user_len.sim +#run tsim/user/privilege1.sim +##run tsim/user/pass_len.sim +##run tsim/table/basic1.sim +#run tsim/trans/lossdata1.sim +#run tsim/trans/create_db.sim +##run tsim/stable/alter_metrics.sim +##run tsim/stable/tag_modify.sim +##run tsim/stable/alter_comment.sim +##run tsim/stable/column_drop.sim +##run tsim/stable/column_modify.sim +##run tsim/stable/tag_rename.sim +##run tsim/stable/vnode3.sim +##run tsim/stable/metrics.sim +##run tsim/stable/alter_insert2.sim +##run tsim/stable/show.sim +##run tsim/stable/alter_import.sim +##run tsim/stable/tag_add.sim +##run tsim/stable/tag_drop.sim +##run tsim/stable/column_add.sim +##run tsim/stable/alter_count.sim +##run tsim/stable/values.sim +##run tsim/stable/dnode3.sim +##run tsim/stable/alter_insert1.sim +##run tsim/stable/refcount.sim +##run tsim/stable/disk.sim +#run tsim/db/basic1.sim +#run tsim/db/basic3.sim +##run tsim/db/basic7.sim +#run tsim/db/basic6.sim +#run tsim/db/create_all_options.sim +#run tsim/db/basic2.sim +#run tsim/db/error1.sim +#run tsim/db/taosdlog.sim +#run tsim/db/alter_option.sim +#run tsim/mnode/basic1.sim +#run tsim/mnode/basic3.sim +#run tsim/mnode/basic2.sim +#run tsim/parser/fourArithmetic-basic.sim run tsim/parser/groupby-basic.sim run tsim/snode/basic1.sim run tsim/query/time_process.sim