提交 7b82bd5e 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG d11f210 GIT_TAG 2864326
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -171,7 +171,7 @@ If you want to perform event window based query on the result set of a sub-query ...@@ -171,7 +171,7 @@ If you want to perform event window based query on the result set of a sub-query
For example, the diagram below illustrates the event windows generated by the query below: For example, the diagram below illustrates the event windows generated by the query below:
```sql ```sql
select _wstart, _wend, count(*) from t start with c1 > 0 end with c2 < 10 select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c2 < 10
``` ```
![Event Window Illustration](./event_window.webp) ![Event Window Illustration](./event_window.webp)
......
...@@ -15,14 +15,14 @@ About details of installing TDenine, please refer to [Installation Guide](../../ ...@@ -15,14 +15,14 @@ About details of installing TDenine, please refer to [Installation Guide](../../
## Uninstall ## Uninstall
<Tabs> <Tabs>
<TabItem label="Uninstall apt-get" value="aptremove"> <TabItem label="Uninstall by apt-get" value="aptremove">
Apt-get package of TDengine can be uninstalled as below: Uninstall package of TDengine by apt-get can be uninstalled as below:
```bash ```bash
$ sudo apt-get remove tdengine $ sudo apt-get remove tdengine
Reading package lists... Done Reading package lists... Done
Building dependency tree Building dependency tree
Reading state information... Done Reading state information... Done
The following packages will be REMOVED: The following packages will be REMOVED:
tdengine tdengine
...@@ -35,7 +35,7 @@ TDengine is removed successfully! ...@@ -35,7 +35,7 @@ TDengine is removed successfully!
``` ```
Apt-get package of taosTools can be uninstalled as below: If you have installed taos-tools, please uninstall it first before uninstall TDengine. The command of uninstall is following:
``` ```
$ sudo apt remove taostools $ sudo apt remove taostools
...@@ -168,7 +168,7 @@ Upgrading a running server is much more complex. First please check the version ...@@ -168,7 +168,7 @@ Upgrading a running server is much more complex. First please check the version
- Stop the cluster of TDengine - Stop the cluster of TDengine
- Uninstall old version and install new version - Uninstall old version and install new version
- Start the cluster of TDengine - Start the cluster of TDengine
- Execute simple queries, such as the ones executed prior to installing the new package, to make sure there is no data loss - Execute simple queries, such as the ones executed prior to installing the new package, to make sure there is no data loss
- Run some simple data insertion statements to make sure the cluster works well - Run some simple data insertion statements to make sure the cluster works well
- Restore business services - Restore business services
......
...@@ -163,7 +163,7 @@ SELECT COUNT(*), FIRST(ts) FROM temp_tb_1 SESSION(ts, tol_val); ...@@ -163,7 +163,7 @@ SELECT COUNT(*), FIRST(ts) FROM temp_tb_1 SESSION(ts, tol_val);
以下面的 SQL 语句为例,事件窗口切分如图所示: 以下面的 SQL 语句为例,事件窗口切分如图所示:
```sql ```sql
select _wstart, _wend, count(*) from t start with c1 > 0 end with c2 < 10 select _wstart, _wend, count(*) from t event_window start with c1 > 0 end with c2 < 10
``` ```
![TDengine Database 事件窗口示意图](./event_window.webp) ![TDengine Database 事件窗口示意图](./event_window.webp)
......
...@@ -1670,10 +1670,10 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceV ...@@ -1670,10 +1670,10 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceV
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
} SForceElectionReq; } SForceBecomeFollowerReq;
int32_t tSerializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); int32_t tSerializeSForceBecomeFollowerReq(void* buf, int32_t bufLen, SForceBecomeFollowerReq* pReq);
int32_t tDeserializeSForceElectionReq(void* buf, int32_t bufLen, SForceElectionReq* pReq); int32_t tDeserializeSForceBecomeFollowerReq(void* buf, int32_t bufLen, SForceBecomeFollowerReq* pReq);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
......
...@@ -83,7 +83,6 @@ enum { ...@@ -83,7 +83,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_FORCE_ELECTION, "balance-force-election", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
...@@ -166,7 +165,6 @@ enum { ...@@ -166,7 +165,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP, "balance-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MERGE_VGROUP, "merge-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_REDISTRIBUTE_VGROUP, "redistribute-vgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL)
...@@ -177,6 +175,7 @@ enum { ...@@ -177,6 +175,7 @@ enum {
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL) // TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
...@@ -288,6 +287,7 @@ enum { ...@@ -288,6 +287,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
......
...@@ -246,7 +246,7 @@ bool syncIsReadyForRead(int64_t rid); ...@@ -246,7 +246,7 @@ bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid); bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid); bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncLeaderForceElection(int64_t rid); int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
SSyncState syncGetState(int64_t rid); SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -223,6 +223,7 @@ int32_t* taosGetErrno(); ...@@ -223,6 +223,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x033C) // 2.x // #define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x033C) // 2.x
// #define TSDB_CODE_MND_DNODE_ID_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033D) // 2.x // #define TSDB_CODE_MND_DNODE_ID_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033D) // 2.x
// #define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033E) // 2.x // #define TSDB_CODE_MND_DNODE_EP_NOT_CONFIGUREDTAOS_DEF_ERROR_CODE(0, 0x033E) // 2.x
#define TSDB_CODE_MND_DNODE_DIFF_CLUSTER TAOS_DEF_ERROR_CODE(0, 0x033F) // internal
// mnode-acct // mnode-acct
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340) #define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0340)
......
...@@ -4569,7 +4569,7 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq * ...@@ -4569,7 +4569,7 @@ int32_t tDeserializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *
return 0; return 0;
} }
int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { int32_t tSerializeSForceBecomeFollowerReq(void *buf, int32_t bufLen, SForceBecomeFollowerReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
...@@ -4582,7 +4582,7 @@ int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq ...@@ -4582,7 +4582,7 @@ int32_t tSerializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq
return tlen; return tlen;
} }
int32_t tDeserializeSForceElectionReq(void *buf, int32_t bufLen, SForceElectionReq *pReq) { int32_t tDeserializeSForceBecomeFollowerReq(void *buf, int32_t bufLen, SForceBecomeFollowerReq *pReq) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
......
...@@ -92,7 +92,6 @@ SArray *mmGetMsgHandles() { ...@@ -92,7 +92,6 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
...@@ -205,6 +204,7 @@ SArray *mmGetMsgHandles() { ...@@ -205,6 +204,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo ...@@ -86,7 +86,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemo
// vmHandle.c // vmHandle.c
SArray *vmGetMsgHandles(); SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -304,26 +304,6 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -304,26 +304,6 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t vmProcessForceElectionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg){
SForceElectionReq req = {0};
if (tDeserializeSForceElectionReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
vnodeForceElection(pVnode->pImpl);
return 0;
}
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeHashRangeReq req = {0}; SAlterVnodeHashRangeReq req = {0};
if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
...@@ -568,7 +548,6 @@ SArray *vmGetMsgHandles() { ...@@ -568,7 +548,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_FORCE_ELECTION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
...@@ -586,6 +565,7 @@ SArray *vmGetMsgHandles() { ...@@ -586,6 +565,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -37,9 +37,6 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -37,9 +37,6 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_CREATE_VNODE: case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg); code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break; break;
case TDMT_DND_FORCE_ELECTION:
code = vmProcessForceElectionReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE: case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg); code = vmProcessDropVnodeReq(pMgmt, pMsg);
break; break;
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "tmisce.h" #include "tmisce.h"
#include "mndCluster.h"
#define TSDB_DNODE_VER_NUMBER 1 #define TSDB_DNODE_VER_NUMBER 1
#define TSDB_DNODE_RESERVE_SIZE 64 #define TSDB_DNODE_RESERVE_SIZE 64
...@@ -366,6 +367,14 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -366,6 +367,14 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
int64_t clusterid = mndGetClusterId(pMnode);
if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) {
code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER;
mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current cluster:%" PRId64 ", code:0x%x",
statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code);
goto _OVER;
}
if (statusReq.dnodeId == 0) { if (statusReq.dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode == NULL) { if (pDnode == NULL) {
......
...@@ -61,7 +61,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -61,7 +61,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_FORCE_ELECTION_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
...@@ -1779,17 +1779,18 @@ _OVER: ...@@ -1779,17 +1779,18 @@ _OVER:
return code; return code;
} }
static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId, static void *mndBuildSForceBecomeFollowerReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dnodeId,
int32_t *pContLen) { int32_t *pContLen) {
SForceElectionReq balanceReq = { SForceBecomeFollowerReq balanceReq = {
.vgId = pVgroup->vgId, .vgId = pVgroup->vgId,
}; };
int32_t contLen = tSerializeSForceElectionReq(NULL, 0, &balanceReq); int32_t contLen = tSerializeSForceBecomeFollowerReq(NULL, 0, &balanceReq);
if (contLen < 0) { if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
contLen += sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) { if (pReq == NULL) {
...@@ -1797,9 +1798,13 @@ static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t ...@@ -1797,9 +1798,13 @@ static void *mndBuildSForceElectionReq(SMnode *pMnode, SVgObj *pVgroup, int32_t
return NULL; return NULL;
} }
tSerializeSForceElectionReq((char *)pReq, contLen, &balanceReq); SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
tSerializeSForceBecomeFollowerReq((char *)pReq + sizeof(SMsgHead), contLen, &balanceReq);
*pContLen = contLen; *pContLen = contLen;
return pReq; return pReq;
} }
int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) { int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dnodeId) {
...@@ -1811,12 +1816,12 @@ int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj * ...@@ -1811,12 +1816,12 @@ int32_t mndAddBalanceVgroupLeaderAction(SMnode *pMnode, STrans *pTrans, SVgObj *
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildSForceElectionReq(pMnode, pVgroup, dnodeId, &contLen); void *pReq = mndBuildSForceBecomeFollowerReq(pMnode, pVgroup, dnodeId, &contLen);
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_FORCE_ELECTION; action.msgType = TDMT_SYNC_FORCE_FOLLOWER;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
...@@ -1832,13 +1837,20 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra ...@@ -1832,13 +1837,20 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra
int32_t vgid = pVgroup->vgId; int32_t vgid = pVgroup->vgId;
int8_t replica = pVgroup->replica; int8_t replica = pVgroup->replica;
if(pVgroup->replica <= 1) { if(pVgroup->replica <= 1) {
mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica); mInfo("trans:%d, vgid:%d no need to balance, replica:%d", pTrans->id, vgid, replica);
return -1; return -1;
} }
int32_t index = vgid%replica; int32_t dnodeId = pVgroup->vnodeGid[0].dnodeId;
int32_t dnodeId = pVgroup->vnodeGid[index].dnodeId;
for(int i = 0; i < replica; i++)
{
if(pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER){
dnodeId = pVgroup->vnodeGid[i].dnodeId;
break;
}
}
bool exist = false; bool exist = false;
bool online = false; bool online = false;
......
...@@ -56,7 +56,6 @@ void vnodeDestroy(const char *path, STfs *pTfs); ...@@ -56,7 +56,6 @@ void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode);
void vnodeForceElection(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodeSyncCommit(SVnode *pVnode); int32_t vnodeSyncCommit(SVnode *pVnode);
......
...@@ -380,10 +380,6 @@ void vnodePreClose(SVnode *pVnode) { ...@@ -380,10 +380,6 @@ void vnodePreClose(SVnode *pVnode) {
void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeForceElection(SVnode *pVnode) {
syncLeaderForceElection(pVnode->sync);
}
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
......
...@@ -1210,14 +1210,19 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { ...@@ -1210,14 +1210,19 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0; GET_RES_INFO(pCtx)->numOfRes = 0;
} else { } else {
if (resultBuf.bufLen <= session->bytes) { if (resultBuf.numOfResult == 0) {
memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen); udfRes->finalResNum = 0;
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} else {
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
GET_RES_INFO(pCtx)->numOfRes = 0; GET_RES_INFO(pCtx)->numOfRes = 0;
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } else {
if (resultBuf.bufLen <= session->bytes) {
memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} else {
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
GET_RES_INFO(pCtx)->numOfRes = 0;
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
} }
} }
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "tglobal.h" #include "tglobal.h"
#include "tref.h" #include "tref.h"
#include "syncUtil.h"
static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId);
...@@ -206,6 +207,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { ...@@ -206,6 +207,9 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
case TDMT_SYNC_LOCAL_CMD: case TDMT_SYNC_LOCAL_CMD:
code = syncNodeOnLocalCmd(pSyncNode, pMsg); code = syncNodeOnLocalCmd(pSyncNode, pMsg);
break; break;
case TDMT_SYNC_FORCE_FOLLOWER:
code = syncForceBecomeFollower(pSyncNode, pMsg);
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1; code = -1;
...@@ -228,13 +232,18 @@ int32_t syncLeaderTransfer(int64_t rid) { ...@@ -228,13 +232,18 @@ int32_t syncLeaderTransfer(int64_t rid) {
return ret; return ret;
} }
int32_t syncLeaderForceElection(int64_t rid) { int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); syncNodeBecomeFollower(ths, "force election");
if (pSyncNode == NULL) return -1;
int32_t ret = syncNodeElect(pSyncNode); SRpcMsg rsp = {
syncNodeRelease(pSyncNode); .code = 0,
return ret; .pCont = pRpcMsg->info.rsp,
.contLen = pRpcMsg->info.rspLen,
.info = pRpcMsg->info,
};
tmsgSendRsp(&rsp);
return 0;
} }
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
...@@ -2295,6 +2304,14 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2295,6 +2304,14 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int64_t timeDiff = tsMs - pMsg->timeStamp; int64_t timeDiff = tsMs - pMsg->timeStamp;
syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf); syncLogRecvHeartbeat(ths, pMsg, timeDiff, tbuf);
if (!syncNodeInRaftGroup(ths, &pMsg->srcId)) {
sWarn(
"vgId:%d, drop heartbeat msg from dnode:%d, because it come from another cluster:%d, differ from current "
"cluster:%d",
ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), CID(&(ths->myRaftId)));
return 0;
}
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
SyncTerm currentTerm = raftStoreGetTerm(ths); SyncTerm currentTerm = raftStoreGetTerm(ths);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "syncVoteMgr.h" #include "syncVoteMgr.h"
#include "syncUtil.h"
// TLA+ Spec // TLA+ Spec
// HandleRequestVoteRequest(i, j, m) == // HandleRequestVoteRequest(i, j, m) ==
......
...@@ -413,12 +413,16 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result) { ...@@ -413,12 +413,16 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
} }
#ifdef WINDOWS #ifdef WINDOWS
if (*timep < 0) { if (*timep < 0) {
return NULL;
// TODO: bugs in following code
SYSTEMTIME ss, s; SYSTEMTIME ss, s;
FILETIME ff, f; FILETIME ff, f;
LARGE_INTEGER offset; LARGE_INTEGER offset;
struct tm tm1; struct tm tm1;
time_t tt = 0; time_t tt = 0;
localtime_s(&tm1, &tt); if (localtime_s(&tm1, &tt) != 0 ) {
return NULL;
}
ss.wYear = tm1.tm_year + 1900; ss.wYear = tm1.tm_year + 1900;
ss.wMonth = tm1.tm_mon + 1; ss.wMonth = tm1.tm_mon + 1;
ss.wDay = tm1.tm_mday; ss.wDay = tm1.tm_mday;
...@@ -444,7 +448,9 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result) { ...@@ -444,7 +448,9 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
result->tm_yday = 0; result->tm_yday = 0;
result->tm_isdst = 0; result->tm_isdst = 0;
} else { } else {
localtime_s(result, timep); if (localtime_s(result, timep) != 0) {
return NULL;
}
} }
#else #else
localtime_r(timep, result); localtime_r(timep, result);
...@@ -469,12 +475,16 @@ struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst) ...@@ -469,12 +475,16 @@ struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst)
} }
#ifdef WINDOWS #ifdef WINDOWS
if (*timep < 0) { if (*timep < 0) {
return NULL;
// TODO: bugs in following code
SYSTEMTIME ss, s; SYSTEMTIME ss, s;
FILETIME ff, f; FILETIME ff, f;
LARGE_INTEGER offset; LARGE_INTEGER offset;
struct tm tm1; struct tm tm1;
time_t tt = 0; time_t tt = 0;
localtime_s(&tm1, &tt); if (localtime_s(&tm1, &tt) != 0) {
return NULL;
}
ss.wYear = tm1.tm_year + 1900; ss.wYear = tm1.tm_year + 1900;
ss.wMonth = tm1.tm_mon + 1; ss.wMonth = tm1.tm_mon + 1;
ss.wDay = tm1.tm_mday; ss.wDay = tm1.tm_mday;
...@@ -500,7 +510,9 @@ struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst) ...@@ -500,7 +510,9 @@ struct tm *taosLocalTimeNolock(struct tm *result, const time_t *timep, int dst)
result->tm_yday = 0; result->tm_yday = 0;
result->tm_isdst = 0; result->tm_isdst = 0;
} else { } else {
localtime_s(result, timep); if (localtime_s(result, timep) != 0) {
return NULL;
}
} }
#elif defined(LINUX) #elif defined(LINUX)
time_t secsMin = 60, secsHour = 3600, secsDay = 3600 * 24; time_t secsMin = 60, secsHour = 3600, secsDay = 3600 * 24;
......
...@@ -291,7 +291,10 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) { ...@@ -291,7 +291,10 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
} }
struct tm ptm = {0}; struct tm ptm = {0};
taosLocalTime(&tt, &ptm); if (taosLocalTime(&tt, &ptm) == NULL) {
sprintf(buf, "NaN");
return buf;
}
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_NANO) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册