未验证 提交 46c732db 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4600 from taosdata/feature/wal

Feature/wal
...@@ -267,6 +267,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul ...@@ -267,6 +267,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_PROTOCOL, 0, 0x0905, "Mismatched protocol")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_CLUSTERID, 0, 0x0906, "Mismatched clusterId")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, 0, 0x0907, "Mismatched signature")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, 0, 0x0908, "Invalid msg checksum")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, 0, 0x0909, "Invalid msg length")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, 0, 0x090A, "Invalid msg type")
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
......
...@@ -119,10 +119,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *); ...@@ -119,10 +119,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
//global configurable parameters //global configurable parameters
extern int32_t tsMaxSyncNum;
extern int32_t tsSyncTcpThreads;
extern int32_t tsSyncTimer;
extern int32_t tsMaxFwdInfo;
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
extern char tsArbitrator[]; extern char tsArbitrator[];
extern uint16_t tsSyncPort; extern uint16_t tsSyncPort;
......
...@@ -48,7 +48,7 @@ typedef struct { ...@@ -48,7 +48,7 @@ typedef struct {
void * pVnode; void * pVnode;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
SRspRet rspRet; SRspRet rspRet;
char reserveForSync[16]; char reserveForSync[24];
SWalHead pHead[]; SWalHead pHead[];
} SVWriteMsg; } SVWriteMsg;
......
...@@ -59,7 +59,7 @@ typedef struct SSdbRow { ...@@ -59,7 +59,7 @@ typedef struct SSdbRow {
SMnodeMsg *pMsg; SMnodeMsg *pMsg;
int32_t (*fpReq)(SMnodeMsg *pMsg); int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code); int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
char reserveForSync[16]; char reserveForSync[24];
SWalHead pHead[]; SWalHead pHead[];
} SSdbRow; } SSdbRow;
......
...@@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc) ...@@ -5,12 +5,12 @@ INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC) AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX) IF (TD_LINUX)
LIST(REMOVE_ITEM SRC src/tarbitrator.c) LIST(REMOVE_ITEM SRC src/syncArbitrator.c)
ADD_LIBRARY(sync ${SRC}) ADD_LIBRARY(sync ${SRC})
TARGET_LINK_LIBRARIES(sync tutil pthread common) TARGET_LINK_LIBRARIES(sync tutil pthread common)
LIST(APPEND BIN_SRC src/tarbitrator.c) LIST(APPEND BIN_SRC src/syncArbitrator.c)
LIST(APPEND BIN_SRC src/taosTcpPool.c) LIST(APPEND BIN_SRC src/syncTcp.c)
ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) ADD_EXECUTABLE(tarbitrator ${BIN_SRC})
TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil) TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil)
......
...@@ -13,12 +13,14 @@ ...@@ -13,12 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_SYNCINT_H #ifndef TDENGINE_SYNC_INT_H
#define TDENGINE_SYNCINT_H #define TDENGINE_SYNC_INT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "syncMsg.h"
#include "twal.h"
#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }} #define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }} #define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
...@@ -27,86 +29,22 @@ extern "C" { ...@@ -27,86 +29,22 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
typedef enum { #define SYNC_TCP_THREADS 2
TAOS_SMSG_SYNC_DATA = 1, #define SYNC_MAX_NUM 2
TAOS_SMSG_FORWARD = 2,
TAOS_SMSG_FORWARD_RSP = 3,
TAOS_SMSG_SYNC_REQ = 4,
TAOS_SMSG_SYNC_RSP = 5,
TAOS_SMSG_SYNC_MUST = 6,
TAOS_SMSG_STATUS = 7,
TAOS_SMSG_SYNC_DATA_RSP = 8,
} ESyncMsgType;
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_MAX_FWDS 512
#define SYNC_FWD_TIMER 300 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000 #define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 #define SYNC_CHECK_INTERVAL 1 // ms
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus #define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
#pragma pack(push, 1)
typedef struct {
char type; // msg type
char pversion; // protocol version
char reserved[6]; // not used
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
// char cont[]; // message content starts from here
} SSyncHead;
typedef struct {
SSyncHead syncHead;
uint16_t port;
uint16_t tranId;
char fqdn[TSDB_FQDN_LEN];
int32_t sourceId; // only for arbitrator
} SFirstPkt;
typedef struct {
int8_t sync;
int8_t reserved;
uint16_t tranId;
} SFirstPktRsp;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[];
} SPeersStatus;
typedef struct {
char name[TSDB_FILENAME_LEN];
uint32_t magic;
uint32_t index;
uint64_t fversion;
int64_t size;
} SFileInfo;
typedef struct {
int8_t sync;
} SFileAck;
typedef struct {
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
typedef struct { typedef struct {
char * buffer; char * buffer;
int32_t bufferSize; int32_t bufferSize;
...@@ -190,7 +128,6 @@ void syncRestartConnection(SSyncPeer *pPeer); ...@@ -190,7 +128,6 @@ void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode); void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer); void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer);
uint16_t syncGenTranId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNC_MSG_H
#define TDENGINE_SYNC_MSG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tsync.h"
typedef enum {
TAOS_SMSG_START = 0,
TAOS_SMSG_SYNC_DATA = 1,
TAOS_SMSG_SYNC_DATA_RSP = 2,
TAOS_SMSG_SYNC_FWD = 3,
TAOS_SMSG_SYNC_FWD_RSP = 4,
TAOS_SMSG_SYNC_REQ = 5,
TAOS_SMSG_SYNC_REQ_RSP = 6,
TAOS_SMSG_SYNC_MUST = 7,
TAOS_SMSG_SYNC_MUST_RSP = 8,
TAOS_SMSG_STATUS = 9,
TAOS_SMSG_STATUS_RSP = 10,
TAOS_SMSG_SETUP = 11,
TAOS_SMSG_SETUP_RSP = 12,
TAOS_SMSG_SYNC_FILE = 13,
TAOS_SMSG_SYNC_FILE_RSP = 14,
TAOS_SMSG_END = 15,
} ESyncMsgType;
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType;
#pragma pack(push, 1)
typedef struct {
int8_t type; // msg type
int8_t protocol; // protocol version
uint16_t signature; // fixed value
int32_t code; //
int32_t cId; // cluster Id
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
uint32_t cksum;
} SSyncHead;
typedef struct {
SSyncHead head;
uint16_t port;
uint16_t tranId;
int32_t sourceId; // only for arbitrator
char fqdn[TSDB_FQDN_LEN];
} SSyncMsg;
typedef struct {
SSyncHead head;
int8_t sync;
int8_t reserved;
uint16_t tranId;
int8_t reserverd[4];
} SSyncRsp;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
SSyncHead head;
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA];
} SPeersStatus;
typedef struct {
SSyncHead head;
char name[TSDB_FILENAME_LEN];
uint32_t magic;
uint32_t index;
uint64_t fversion;
int64_t size;
} SFileInfo;
typedef struct {
SSyncHead head;
int8_t sync;
} SFileAck;
typedef struct {
SSyncHead head;
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
#define SYNC_PROTOCOL_VERSION 1
#define SYNC_SIGNATURE ((uint16_t)(0xCDEF))
extern char *statusType[];
uint16_t syncGenTranId();
int32_t syncCheckHead(SSyncHead *pHead);
void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len);
void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code);
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId);
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEPEER_H
...@@ -13,16 +13,13 @@ ...@@ -13,16 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TCP_POOL_H #ifndef TDENGINE_SYNC_TCP_POOL_H
#define TDENGINE_TCP_POOL_H #define TDENGINE_SYNC_TCP_POOL_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef void *ttpool_h;
typedef void *tthread_h;
typedef struct { typedef struct {
int32_t numOfThreads; int32_t numOfThreads;
uint32_t serverIp; uint32_t serverIp;
...@@ -33,10 +30,10 @@ typedef struct { ...@@ -33,10 +30,10 @@ typedef struct {
void (*processIncomingConn)(int32_t fd, uint32_t ip); void (*processIncomingConn)(int32_t fd, uint32_t ip);
} SPoolInfo; } SPoolInfo;
ttpool_h taosOpenTcpThreadPool(SPoolInfo *pInfo); void *syncOpenTcpThreadPool(SPoolInfo *pInfo);
void taosCloseTcpThreadPool(ttpool_h); void syncCloseTcpThreadPool(void *);
void * taosAllocateTcpConn(void *, void *ahandle, int32_t connFd); void *syncAllocateTcpConn(void *, void *ahandle, int32_t connFd);
void taosFreeTcpConn(void *); void syncFreeTcpConn(void *);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,17 +22,17 @@ ...@@ -22,17 +22,17 @@
#include "tsocket.h" #include "tsocket.h"
#include "tglobal.h" #include "tglobal.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h"
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context); static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context);
static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp);
static void arbProcessBrokenLink(void *param); static void arbProcessBrokenLink(void *param);
static int32_t arbProcessPeerMsg(void *param, void *buffer); static int32_t arbProcessPeerMsg(void *param, void *buffer);
static tsem_t tsArbSem; static tsem_t tsArbSem;
static ttpool_h tsArbTcpPool; static void * tsArbTcpPool;
typedef struct { typedef struct {
char id[TSDB_EP_LEN + 24]; char id[TSDB_EP_LEN + 24];
...@@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -90,7 +90,7 @@ int32_t main(int32_t argc, char *argv[]) {
info.processBrokenLink = arbProcessBrokenLink; info.processBrokenLink = arbProcessBrokenLink;
info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingMsg = arbProcessPeerMsg;
info.processIncomingConn = arbProcessIncommingConnection; info.processIncomingConn = arbProcessIncommingConnection;
tsArbTcpPool = taosOpenTcpThreadPool(&info); tsArbTcpPool = syncOpenTcpThreadPool(&info);
if (tsArbTcpPool == NULL) { if (tsArbTcpPool == NULL) {
sDebug("failed to open TCP thread pool, exit..."); sDebug("failed to open TCP thread pool, exit...");
...@@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -101,8 +101,8 @@ int32_t main(int32_t argc, char *argv[]) {
tsem_wait(&tsArbSem); tsem_wait(&tsArbSem);
taosCloseTcpThreadPool(tsArbTcpPool); syncCloseTcpThreadPool(tsArbTcpPool);
sInfo("TAOS arbitrator is shut down\n"); sInfo("TAOS arbitrator is shut down");
closelog(); closelog();
return 0; return 0;
...@@ -113,9 +113,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -113,9 +113,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt; SSyncMsg msg;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
return; return;
} }
...@@ -127,9 +127,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -127,9 +127,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return; return;
} }
firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0; msg.fqdn[TSDB_FQDN_LEN - 1] = 0;
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port); snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", msg.sourceId, msg.fqdn, msg.port);
if (firstPkt.syncHead.vgId) { if (msg.head.vgId) {
sDebug("%s, vgId in head is not zero, close the connection", pNode->id); sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
tfree(pNode); tfree(pNode);
taosCloseSocket(connFd); taosCloseSocket(connFd);
...@@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -138,7 +138,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
sDebug("%s, arbitrator request is accepted", pNode->id); sDebug("%s, arbitrator request is accepted", pNode->id);
pNode->nodeFd = connFd; pNode->nodeFd = connFd;
pNode->pConn = taosAllocateTcpConn(tsArbTcpPool, pNode, connFd); pNode->pConn = syncAllocateTcpConn(tsArbTcpPool, pNode, connFd);
return; return;
} }
...@@ -156,8 +156,8 @@ static int32_t arbProcessPeerMsg(void *param, void *buffer) { ...@@ -156,8 +156,8 @@ static int32_t arbProcessPeerMsg(void *param, void *buffer) {
int32_t bytes = 0; int32_t bytes = 0;
char * cont = (char *)buffer; char * cont = (char *)buffer;
int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head)); int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(SSyncHead));
if (hlen != sizeof(head)) { if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen); sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen);
return -1; return -1;
} }
......
...@@ -23,25 +23,18 @@ ...@@ -23,25 +23,18 @@
#include "tsocket.h" #include "tsocket.h"
#include "tglobal.h" #include "tglobal.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncTcp.h"
#include "syncInt.h" #include "syncInt.h"
// global configurable int32_t tsSyncNum = 0; // number of sync in process in whole system
int32_t tsMaxSyncNum = 2; char tsNodeFqdn[TSDB_FQDN_LEN] = {0};
int32_t tsSyncTcpThreads = 2;
int32_t tsMaxFwdInfo = 512;
int32_t tsSyncTimer = 1;
// module global, not configurable static void * tsTcpPool = NULL;
int32_t tsSyncNum; // number of sync in process in whole system
char tsNodeFqdn[TSDB_FQDN_LEN];
static ttpool_h tsTcpPool;
static void * tsSyncTmrCtrl = NULL; static void * tsSyncTmrCtrl = NULL;
static void * tsVgIdHash; static void * tsVgIdHash = NULL;
static int32_t tsSyncRefId = -1; static int32_t tsSyncRefId = -1;
// local functions // local functions
...@@ -80,36 +73,10 @@ char *syncStatus[] = { ...@@ -80,36 +73,10 @@ char *syncStatus[] = {
"invalid" "invalid"
}; };
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType;
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp",
"check-role",
"check-role-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
int32_t syncInit() { int32_t syncInit() {
SPoolInfo info = {0}; SPoolInfo info = {0};
info.numOfThreads = tsSyncTcpThreads; info.numOfThreads = SYNC_TCP_THREADS;
info.serverIp = 0; info.serverIp = 0;
info.port = tsSyncPort; info.port = tsSyncPort;
info.bufferSize = SYNC_MAX_SIZE; info.bufferSize = SYNC_MAX_SIZE;
...@@ -117,7 +84,7 @@ int32_t syncInit() { ...@@ -117,7 +84,7 @@ int32_t syncInit() {
info.processIncomingMsg = syncProcessPeerMsg; info.processIncomingMsg = syncProcessPeerMsg;
info.processIncomingConn = syncProcessIncommingConnection; info.processIncomingConn = syncProcessIncommingConnection;
tsTcpPool = taosOpenTcpThreadPool(&info); tsTcpPool = syncOpenTcpThreadPool(&info);
if (tsTcpPool == NULL) { if (tsTcpPool == NULL) {
sError("failed to init tcpPool"); sError("failed to init tcpPool");
return -1; return -1;
...@@ -126,16 +93,16 @@ int32_t syncInit() { ...@@ -126,16 +93,16 @@ int32_t syncInit() {
tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (tsSyncTmrCtrl == NULL) { if (tsSyncTmrCtrl == NULL) {
sError("failed to init tmrCtrl"); sError("failed to init tmrCtrl");
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
return -1; return -1;
} }
tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVgIdHash == NULL) { if (tsVgIdHash == NULL) {
sError("failed to init tsVgIdHash"); sError("failed to init vgIdHash");
taosTmrCleanUp(tsSyncTmrCtrl); taosTmrCleanUp(tsSyncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
tsSyncTmrCtrl = NULL; tsSyncTmrCtrl = NULL;
return -1; return -1;
...@@ -155,7 +122,7 @@ int32_t syncInit() { ...@@ -155,7 +122,7 @@ int32_t syncInit() {
void syncCleanUp() { void syncCleanUp() {
if (tsTcpPool) { if (tsTcpPool) {
taosCloseTcpThreadPool(tsTcpPool); syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL; tsTcpPool = NULL;
} }
...@@ -236,7 +203,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { ...@@ -236,7 +203,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum,
syncRole[nodeRole]); syncRole[nodeRole]);
pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1); pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + SYNC_MAX_FWDS * sizeof(SFwdInfo), 1);
if (pNode->pSyncFwds == NULL) { if (pNode->pSyncFwds == NULL) {
sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -336,6 +303,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { ...@@ -336,6 +303,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
newPeers[i] = pNode->peerInfo[j]; newPeers[i] = pNode->peerInfo[j];
} }
if (newPeers[i] == NULL) {
sError("vgId:%d, failed to reconfig", pNode->vgId);
return TSDB_CODE_SYN_INVALID_CONFIG;
}
if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) { if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) {
pNode->selfIndex = i; pNode->selfIndex = i;
} }
...@@ -384,21 +356,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { ...@@ -384,21 +356,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncPeer *pPeer = pNode->pMaster; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) { if (pPeer && pNode->quorum > 1) {
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; SFwdRsp rsp;
syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
SSyncHead *pHead = (SSyncHead *)msg;
pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp);
SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); if (taosWriteMsg(pPeer->peerFd, &rsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) {
pFwdRsp->version = version; sTrace("%s, forward-rsp is sent, code:0x%x hver:%" PRIu64, pPeer->id, code, version);
pFwdRsp->code = code;
int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) {
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
} else { } else {
sDebug("%s, failed to send forward ack, restart", pPeer->id); sDebug("%s, failed to send forward-rsp, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
} }
...@@ -509,7 +473,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) { ...@@ -509,7 +473,7 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
taosFreeTcpConn(pPeer->pConn); syncFreeTcpConn(pPeer->pConn);
} }
} }
...@@ -779,7 +743,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) { ...@@ -779,7 +743,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, check peer connection in 1000 ms", pPeer->id); sDebug("%s, check peer connection in 1000 ms", pPeer->id);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
...@@ -857,7 +821,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -857,7 +821,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
// Ensure the sync of mnode not interrupted // Ensure the sync of mnode not interrupted
if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) { if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) {
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
...@@ -865,56 +829,42 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -865,56 +829,42 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sDebug("%s, try to sync", pPeer->id); sDebug("%s, try to sync", pPeer->id);
SFirstPkt firstPkt; SSyncMsg msg;
memset(&firstPkt, 0, sizeof(firstPkt)); syncBuildSyncReqMsg(&msg, pNode->vgId);
firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
firstPkt.syncHead.vgId = pNode->vgId; taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
firstPkt.tranId = syncGenTranId(); if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
} else { } else {
sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]); sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]);
} }
} }
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo * pFwdInfo; SFwdInfo * pFwdInfo;
sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
bool found = false;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
// find the forwardInfo from first // find the forwardInfo from first
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % SYNC_MAX_FWDS;
if (pFwdRsp->version == pFwdInfo->version) { if (pFwdRsp->version == pFwdInfo->version) {
found = true;
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
syncRemoveConfirmedFwdInfo(pNode); syncRemoveConfirmedFwdInfo(pNode);
break; return;
} }
} }
} }
if (!found) {
sTrace("%s, forward-rsp not found first:%d fwds:%d, code:%x hver:%" PRIu64, pPeer->id, pSyncFwds->first,
pSyncFwds->fwds, pFwdRsp->code, pFwdRsp->version);
syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code);
}
} }
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)cont; SWalHead * pHead = (SWalHead *)(cont + sizeof(SSyncHead));
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
...@@ -931,9 +881,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -931,9 +881,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
} }
} }
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role], pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role],
...@@ -947,23 +896,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { ...@@ -947,23 +896,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
} }
} }
static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) {
if (pPeer->peerFd < 0) return -1; if (pPeer->peerFd < 0) return -1;
int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) { if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen); sDebug("%s, failed to read msg since %s, hlen:%d", pPeer->id, tstrerror(errno), hlen);
return -1; return -1;
} }
// head.len = htonl(head.len); int32_t code = syncCheckHead(pHead);
if (pHead->len < 0) { if (code != 0) {
sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len); sError("%s, failed to check msg head since %s, type:%d", pPeer->id, tstrerror(code), pHead->type);
return -1; return -1;
} }
assert(pHead->len <= TSDB_MAX_WAL_SIZE); int32_t bytes = taosReadMsg(pPeer->peerFd, (char *)pHead + sizeof(SSyncHead), pHead->len);
int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
return -1; return -1;
...@@ -974,23 +922,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -974,23 +922,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
static int32_t syncProcessPeerMsg(void *param, void *buffer) { static int32_t syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead *pHead = buffer;
char * cont = buffer;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
int32_t code = syncReadPeerMsg(pPeer, &head, cont); int32_t code = syncReadPeerMsg(pPeer, pHead);
if (code == 0) { if (code == 0) {
if (head.type == TAOS_SMSG_FORWARD) { if (pHead->type == TAOS_SMSG_SYNC_FWD) {
syncProcessForwardFromPeer(cont, pPeer); syncProcessForwardFromPeer(buffer, pPeer);
} else if (head.type == TAOS_SMSG_FORWARD_RSP) { } else if (pHead->type == TAOS_SMSG_SYNC_FWD_RSP) {
syncProcessFwdResponse(cont, pPeer); syncProcessFwdResponse(buffer, pPeer);
} else if (head.type == TAOS_SMSG_SYNC_REQ) { } else if (pHead->type == TAOS_SMSG_SYNC_REQ) {
syncProcessSyncRequest(cont, pPeer); syncProcessSyncRequest(buffer, pPeer);
} else if (head.type == TAOS_SMSG_STATUS) { } else if (pHead->type == TAOS_SMSG_STATUS) {
syncProcessPeersStatusMsg(cont, pPeer); syncProcessPeersStatusMsg(buffer, pPeer);
} }
} }
...@@ -999,36 +946,30 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { ...@@ -999,36 +946,30 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
return code; return code;
} }
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0};
if (pPeer->peerFd < 0 || pPeer->ip == 0) return; if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
SSyncHead * pHead = (SSyncHead *)msg; SSyncNode * pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead)); SPeersStatus msg;
pHead->type = TAOS_SMSG_STATUS; memset(&msg, 0, sizeof(SPeersStatus));
pHead->len = statusMsgLen - sizeof(SSyncHead); syncBuildPeersStatus(&msg, pNode->vgId);
pPeersStatus->version = nodeVersion; msg.role = nodeRole;
pPeersStatus->role = nodeRole; msg.ack = ack;
pPeersStatus->ack = ack; msg.type = type;
pPeersStatus->type = type; msg.tranId = tranId;
pPeersStatus->tranId = tranId; msg.version = nodeVersion;
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; msg.peersStatus[i].role = pNode->peerInfo[i]->role;
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; msg.peersStatus[i].version = pNode->peerInfo[i]->version;
} }
if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) { if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SPeersStatus)) == sizeof(SPeersStatus)) {
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, syncStatus[pPeer->sstatus], pPeer->version, ack, tranId, statusType[type], pPeer->peerFd);
statusType[pPeersStatus->type], pPeer->peerFd);
} else { } else {
sDebug("%s, failed to send status msg, restart", pPeer->id); sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -1048,29 +989,23 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1048,29 +989,23 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) { if (connFd < 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
return; return;
} }
SFirstPkt firstPkt; SSyncMsg msg;
memset(&firstPkt, 0, sizeof(firstPkt)); syncBuildSyncSetupMsg(&msg, pPeer->nodeId ? pNode->vgId : 0);
firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
firstPkt.tranId = syncGenTranId();
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId); sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
} else { } else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosClose(connFd); taosClose(connFd);
taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
} }
...@@ -1116,14 +1051,21 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1116,14 +1051,21 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp); tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr); sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt; SSyncMsg msg;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
return; return;
} }
int32_t vgId = firstPkt.syncHead.vgId; int32_t code = syncCheckHead((SSyncHead *)(&msg));
if (code != 0) {
sError("failed to check peer sync msg from ip:%s since %s", ipstr, strerror(code));
taosCloseSocket(connFd);
return;
}
int32_t vgId = msg.head.vgId;
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t)); SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) { if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId); sError("vgId:%d, vgId could not be found", vgId);
...@@ -1131,7 +1073,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1131,7 +1073,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return; return;
} }
sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId); sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId);
SSyncNode *pNode = *ppNode; SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
...@@ -1139,27 +1081,27 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1139,27 +1081,27 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break; if (pPeer && (strcmp(pPeer->fqdn, msg.fqdn) == 0) && (pPeer->port == msg.port)) break;
} }
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
if (pPeer == NULL) { if (pPeer == NULL) {
sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, firstPkt.fqdn, firstPkt.port); sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, msg.fqdn, msg.port);
taosCloseSocket(connFd); taosCloseSocket(connFd);
// syncSendVpeerCfgMsg(sync); // syncSendVpeerCfgMsg(sync);
} else { } else {
// first packet tells what kind of link // first packet tells what kind of link
if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) { if (msg.head.type == TAOS_SMSG_SYNC_DATA) {
pPeer->syncFd = connFd; pPeer->syncFd = connFd;
nodeSStatus = TAOS_SYNC_STATUS_START; nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-data pkt from master is received, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, sInfo("%s, sync-data msg from master is received, tranId:%u, set sstatus:%s", pPeer->id, msg.tranId,
syncStatus[nodeSStatus]); syncStatus[nodeSStatus]);
syncCreateRestoreDataThread(pPeer); syncCreateRestoreDataThread(pPeer);
} else { } else {
sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd); sDebug("%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d", pPeer->id, connFd, pPeer->syncFd, pPeer->peerFd);
syncClosePeerConn(pPeer); syncClosePeerConn(pPeer);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer); syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id); sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId()); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
...@@ -1192,15 +1134,15 @@ static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle ...@@ -1192,15 +1134,15 @@ static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int64_t time = taosGetTimestampMs(); int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds >= tsMaxFwdInfo) { if (pSyncFwds->fwds >= SYNC_MAX_FWDS) {
// pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; // pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
// pSyncFwds->fwds--; // pSyncFwds->fwds--;
sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds); sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds);
return TSDB_CODE_SYN_TOO_MANY_FWDINFO; return TSDB_CODE_SYN_TOO_MANY_FWDINFO;
} }
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo; pSyncFwds->last = (pSyncFwds->last + 1) % SYNC_MAX_FWDS;
} }
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
...@@ -1223,7 +1165,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { ...@@ -1223,7 +1165,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFwdInfo->confirmed == 0) break; if (pFwdInfo->confirmed == 0) break;
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS;
pSyncFwds->fwds--; pSyncFwds->fwds--;
if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last;
sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds);
...@@ -1248,7 +1190,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1248,7 +1190,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
if (confirm && pFwdInfo->confirmed == 0) { if (confirm && pFwdInfo->confirmed == 0) {
sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:0x%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
(*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code); (*pNode->confirmForward)(pNode->vgId, pFwdInfo->mhandle, pFwdInfo->code);
pFwdInfo->confirmed = 1; pFwdInfo->confirmed = 1;
} }
...@@ -1289,7 +1231,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { ...@@ -1289,7 +1231,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { for (int32_t i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS;
if (ABS(time - pFwdInfo->time) < 2000) break; if (ABS(time - pFwdInfo->time) < 2000) break;
sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId,
...@@ -1334,14 +1276,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1334,14 +1276,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded // only msg from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance // a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD; syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len);
pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
...@@ -1371,4 +1311,3 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1371,4 +1311,3 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
return code; return code;
} }
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tglobal.h"
#include "tchecksum.h"
#include "syncInt.h"
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp",
"check-role",
"check-role-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
static void syncBuildHead(SSyncHead *pHead) {
pHead->protocol = SYNC_PROTOCOL_VERSION;
pHead->signature = SYNC_SIGNATURE;
pHead->code = 0;
pHead->cId = 0;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
}
int32_t syncCheckHead(SSyncHead *pHead) {
if (pHead->protocol != SYNC_PROTOCOL_VERSION) return TSDB_CODE_SYN_MISMATCHED_PROTOCOL;
if (pHead->signature != SYNC_SIGNATURE) return TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
if (pHead->cId != 0) return TSDB_CODE_SYN_MISMATCHED_CLUSTERID;
if (pHead->len <= 0 || pHead->len > TSDB_MAX_WAL_SIZE) return TSDB_CODE_SYN_INVALID_MSGLEN;
if (pHead->type <= TAOS_SMSG_START || pHead->type >= TAOS_SMSG_END) return TSDB_CODE_SYN_INVALID_MSGTYPE;
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SSyncHead))) return TSDB_CODE_SYN_INVALID_CHECKSUM;
return TSDB_CODE_SUCCESS;
}
void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len) {
pHead->type = TAOS_SMSG_SYNC_FWD;
pHead->vgId = vgId;
pHead->len = len;
syncBuildHead(pHead);
}
void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code) {
pMsg->head.type = TAOS_SMSG_SYNC_FWD_RSP;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
pMsg->version = version;
pMsg->code = code;
}
static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) {
pMsg->head.type = type;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
pMsg->port = tsSyncPort;
pMsg->tranId = syncGenTranId();
pMsg->sourceId = vgId;
tstrncpy(pMsg->fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
}
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_STATUS;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
}
void syncBuildFileAck(SFileAck *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_SYNC_FILE_RSP;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFileAck) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
}
void syncBuildFileInfo(SFileInfo *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_SYNC_FILE;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFileInfo) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
}
\ No newline at end of file
...@@ -56,7 +56,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -56,7 +56,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */ SFileInfo minfo; memset(&minfo, 0, sizeof(SFileInfo)); /* = {0}; */
SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */ SFileInfo sinfo; memset(&sinfo, 0, sizeof(SFileInfo)); /* = {0}; */
SFileAck fileAck = {0}; SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1; int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
uint32_t pindex = 0; // index in last restore uint32_t pindex = 0; // index in last restore
...@@ -69,7 +69,14 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -69,7 +69,14 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
minfo.index = -1; minfo.index = -1;
int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo)); int32_t ret = taosReadMsg(pPeer->syncFd, &minfo, sizeof(SFileInfo));
if (ret != sizeof(SFileInfo) || minfo.index == -1) { if (ret != sizeof(SFileInfo) || minfo.index == -1) {
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); sError("%s, failed to read fileinfo while restore file since %s", pPeer->id, strerror(errno));
break;
}
assert(ret == sizeof(SFileInfo));
ret = syncCheckHead((SSyncHead *)(&minfo));
if (ret != 0) {
sError("%s, failed to check fileinfo while restore file since %s", pPeer->id, strerror(ret));
break; break;
} }
...@@ -94,12 +101,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -94,12 +101,13 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
&sinfo.fversion); &sinfo.fversion);
// if file not there or magic is not the same, file shall be synced // if file not there or magic is not the same, file shall be synced
memset(&fileAck, 0, sizeof(fileAck)); memset(&fileAck, 0, sizeof(SFileAck));
syncBuildFileAck(&fileAck, pNode->vgId);
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
// send file ack // send file ack
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
if (ret != sizeof(fileAck)) { if (ret != sizeof(SFileAck)) {
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
break; break;
} }
...@@ -289,12 +297,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -289,12 +297,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
uint64_t fversion = 0; uint64_t fversion = 0;
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()}; SSyncRsp rsp = {.sync = 1, .tranId = syncGenTranId()};
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { if (taosWriteMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); sError("%s, failed to send sync rsp since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId); sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId);
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion); int32_t code = syncRestoreFile(pPeer, &fversion);
......
...@@ -88,7 +88,7 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { ...@@ -88,7 +88,7 @@ static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
static int32_t syncRetrieveFile(SSyncPeer *pPeer) { static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo)); SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
SFileAck fileAck = {0}; SFileAck fileAck; memset(&fileAck, 0, sizeof(SFileAck));
int32_t code = -1; int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
...@@ -103,11 +103,12 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -103,11 +103,12 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.size = 0; fileInfo.size = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion); &fileInfo.size, &fileInfo.fversion);
syncBuildFileInfo(&fileInfo, pNode->vgId);
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
// send the file info // send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(SFileInfo));
if (ret != sizeof(fileInfo)) { if (ret != sizeof(SFileInfo)) {
code = -1; code = -1;
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
...@@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -128,6 +129,13 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
break; break;
} }
ret = syncCheckHead((SSyncHead*)(&fileAck));
if (ret != 0) {
code = -1;
sError("%s, failed to check file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(ret));
break;
}
// set the peer sync version // set the peer sync version
pPeer->sversion = fileInfo.fversion; pPeer->sversion = fileInfo.fversion;
...@@ -405,27 +413,22 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -405,27 +413,22 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt; SSyncMsg msg;
memset(&firstPkt, 0, sizeof(firstPkt)); syncBuildSyncDataMsg(&msg, pNode->vgId);
firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.tranId = syncGenTranId();
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1; return -1;
} }
sDebug("%s, send sync-data pkt to peer, tranId:%u", pPeer->id, firstPkt.tranId); sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId);
SFirstPktRsp firstPktRsp; SSyncRsp rsp;
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) { if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId); sError("%s, failed to read sync-data rsp since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1; return -1;
} }
sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId); sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId);
return 0; return 0;
} }
......
...@@ -19,10 +19,10 @@ ...@@ -19,10 +19,10 @@
#include "tutil.h" #include "tutil.h"
#include "tsocket.h" #include "tsocket.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosTcpPool.h"
#include "twal.h" #include "twal.h"
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncTcp.h"
typedef struct SThreadObj { typedef struct SThreadObj {
pthread_t thread; pthread_t thread;
...@@ -47,12 +47,12 @@ typedef struct { ...@@ -47,12 +47,12 @@ typedef struct {
int32_t closedByApp; int32_t closedByApp;
} SConnObj; } SConnObj;
static void *taosAcceptPeerTcpConnection(void *argv); static void *syncAcceptPeerTcpConnection(void *argv);
static void *taosProcessTcpData(void *param); static void *syncProcessTcpData(void *param);
static void taosStopPoolThread(SThreadObj *pThread); static void syncStopPoolThread(SThreadObj *pThread);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static SThreadObj *syncGetTcpThread(SPoolObj *pPool);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { void *syncOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_t thattr; pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
...@@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -80,7 +80,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pPool->thread), &thattr, (void *)taosAcceptPeerTcpConnection, pPool) != 0) { if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) {
sError("failed to create accept thread for TCP server since %s", strerror(errno)); sError("failed to create accept thread for TCP server since %s", strerror(errno));
close(pPool->acceptFd); close(pPool->acceptFd);
tfree(pPool->pThread); tfree(pPool->pThread);
...@@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) { ...@@ -94,7 +94,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
return pPool; return pPool;
} }
void taosCloseTcpThreadPool(void *param) { void syncCloseTcpThreadPool(void *param) {
SPoolObj * pPool = param; SPoolObj * pPool = param;
SThreadObj *pThread; SThreadObj *pThread;
...@@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) { ...@@ -103,7 +103,7 @@ void taosCloseTcpThreadPool(void *param) {
for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) { for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) {
pThread = pPool->pThread[i]; pThread = pPool->pThread[i];
if (pThread) taosStopPoolThread(pThread); if (pThread) syncStopPoolThread(pThread);
} }
sDebug("%p TCP pool is closed", pPool); sDebug("%p TCP pool is closed", pPool);
...@@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) { ...@@ -112,7 +112,7 @@ void taosCloseTcpThreadPool(void *param) {
tfree(pPool); tfree(pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = param; SPoolObj *pPool = param;
...@@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { ...@@ -122,7 +122,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return NULL; return NULL;
} }
SThreadObj *pThread = taosGetTcpThread(pPool); SThreadObj *pThread = syncGetTcpThread(pPool);
if (pThread == NULL) { if (pThread == NULL) {
tfree(pConn); tfree(pConn);
return NULL; return NULL;
...@@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) { ...@@ -149,7 +149,7 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
return pConn; return pConn;
} }
void taosFreeTcpConn(void *param) { void syncFreeTcpConn(void *param) {
SConnObj * pConn = param; SConnObj * pConn = param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
...@@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -175,7 +175,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
#define maxEvents 10 #define maxEvents 10
static void *taosProcessTcpData(void *param) { static void *syncProcessTcpData(void *param) {
SThreadObj *pThread = (SThreadObj *)param; SThreadObj *pThread = (SThreadObj *)param;
SPoolObj * pPool = pThread->pPool; SPoolObj * pPool = pThread->pPool;
SPoolInfo * pInfo = &pPool->info; SPoolInfo * pInfo = &pPool->info;
...@@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -222,7 +222,7 @@ static void *taosProcessTcpData(void *param) {
if (pConn->closedByApp == 0) { if (pConn->closedByApp == 0) {
if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) { if ((*pInfo->processIncomingMsg)(pConn->ahandle, buffer) < 0) {
taosFreeTcpConn(pConn); syncFreeTcpConn(pConn);
continue; continue;
} }
} }
...@@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) { ...@@ -239,7 +239,7 @@ static void *taosProcessTcpData(void *param) {
return NULL; return NULL;
} }
static void *taosAcceptPeerTcpConnection(void *argv) { static void *syncAcceptPeerTcpConnection(void *argv) {
SPoolObj * pPool = (SPoolObj *)argv; SPoolObj * pPool = (SPoolObj *)argv;
SPoolInfo *pInfo = &pPool->info; SPoolInfo *pInfo = &pPool->info;
...@@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) { ...@@ -268,7 +268,7 @@ static void *taosAcceptPeerTcpConnection(void *argv) {
return NULL; return NULL;
} }
static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
SThreadObj *pThread = pPool->pThread[pPool->nextId]; SThreadObj *pThread = pPool->pThread[pPool->nextId];
if (pThread) return pThread; if (pThread) return pThread;
...@@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -286,7 +286,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
pthread_attr_t thattr; pthread_attr_t thattr;
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)taosProcessTcpData, pThread); int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread);
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
...@@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) { ...@@ -303,7 +303,7 @@ static SThreadObj *taosGetTcpThread(SPoolObj *pPool) {
return pThread; return pThread;
} }
static void taosStopPoolThread(SThreadObj *pThread) { static void syncStopPoolThread(SThreadObj *pThread) {
pthread_t thread = pThread->thread; pthread_t thread = pThread->thread;
if (!taosCheckPthreadValid(thread)) { if (!taosCheckPthreadValid(thread)) {
return; return;
......
...@@ -100,7 +100,7 @@ int processRpcMsg(void *item) { ...@@ -100,7 +100,7 @@ int processRpcMsg(void *item) {
pHead->msgType = pMsg->msgType; pHead->msgType = pMsg->msgType;
pHead->len = pMsg->contLen; pHead->len = pMsg->contLen;
uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version); uDebug("ver:%" PRIu64 ", rsp from client processed", pHead->version);
writeIntoWal(pHead); writeIntoWal(pHead);
syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC); syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC);
...@@ -275,7 +275,7 @@ int getWalInfo(int32_t vgId, char *name, int64_t *index) { ...@@ -275,7 +275,7 @@ int getWalInfo(int32_t vgId, char *name, int64_t *index) {
int writeToCache(int32_t vgId, void *data, int type) { int writeToCache(int32_t vgId, void *data, int type) {
SWalHead *pHead = data; SWalHead *pHead = data;
uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type); uDebug("rsp from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
int msgSize = pHead->len + sizeof(SWalHead); int msgSize = pHead->len + sizeof(SWalHead);
void *pMsg = taosAllocateQitem(msgSize); void *pMsg = taosAllocateQitem(msgSize);
......
...@@ -110,6 +110,7 @@ sql insert into d1.t1 values(1589529000012, 2) ...@@ -110,6 +110,7 @@ sql insert into d1.t1 values(1589529000012, 2)
sql insert into d2.t2 values(1589529000022, 2) sql insert into d2.t2 values(1589529000022, 2)
sql insert into d3.t3 values(1589529000032, 2) sql insert into d3.t3 values(1589529000032, 2)
sql insert into d4.t4 values(1589529000042, 2) sql insert into d4.t4 values(1589529000042, 2)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 2 then if $rows != 2 then
...@@ -141,6 +142,7 @@ sql insert into d1.t1 values(1589529000013, 3) ...@@ -141,6 +142,7 @@ sql insert into d1.t1 values(1589529000013, 3)
sql insert into d2.t2 values(1589529000023, 3) sql insert into d2.t2 values(1589529000023, 3)
sql insert into d3.t3 values(1589529000033, 3) sql insert into d3.t3 values(1589529000033, 3)
sql insert into d4.t4 values(1589529000043, 3) sql insert into d4.t4 values(1589529000043, 3)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 3 then if $rows != 3 then
...@@ -172,6 +174,7 @@ sql insert into d1.t1 values(1589529000014, 4) ...@@ -172,6 +174,7 @@ sql insert into d1.t1 values(1589529000014, 4)
sql insert into d2.t2 values(1589529000024, 4) sql insert into d2.t2 values(1589529000024, 4)
sql insert into d3.t3 values(1589529000034, 4) sql insert into d3.t3 values(1589529000034, 4)
sql insert into d4.t4 values(1589529000044, 4) sql insert into d4.t4 values(1589529000044, 4)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
print select * from d1.t1 $rows print select * from d1.t1 $rows
...@@ -207,6 +210,7 @@ sql insert into d1.t1 values(1589529000015, 5) ...@@ -207,6 +210,7 @@ sql insert into d1.t1 values(1589529000015, 5)
sql insert into d2.t2 values(1589529000025, 5) sql insert into d2.t2 values(1589529000025, 5)
sql insert into d3.t3 values(1589529000035, 5) sql insert into d3.t3 values(1589529000035, 5)
sql insert into d4.t4 values(1589529000045, 5) sql insert into d4.t4 values(1589529000045, 5)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 5 then if $rows != 5 then
...@@ -238,6 +242,7 @@ sql insert into d1.t1 values(1589529000016, 6) ...@@ -238,6 +242,7 @@ sql insert into d1.t1 values(1589529000016, 6)
sql insert into d2.t2 values(1589529000026, 6) sql insert into d2.t2 values(1589529000026, 6)
sql insert into d3.t3 values(1589529000036, 6) sql insert into d3.t3 values(1589529000036, 6)
sql insert into d4.t4 values(1589529000046, 6) sql insert into d4.t4 values(1589529000046, 6)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 6 then if $rows != 6 then
......
...@@ -110,6 +110,7 @@ sql insert into d1.t1 values(1588262400002, 2) ...@@ -110,6 +110,7 @@ sql insert into d1.t1 values(1588262400002, 2)
sql insert into d2.t2 values(1588262400002, 2) sql insert into d2.t2 values(1588262400002, 2)
sql insert into d3.t3 values(1588262400002, 2) sql insert into d3.t3 values(1588262400002, 2)
sql insert into d4.t4 values(1588262400002, 2) sql insert into d4.t4 values(1588262400002, 2)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 2 then if $rows != 2 then
...@@ -142,6 +143,7 @@ sql insert into d1.t1 values(1588262400003, 3) ...@@ -142,6 +143,7 @@ sql insert into d1.t1 values(1588262400003, 3)
sql insert into d2.t2 values(1588262400003, 3) sql insert into d2.t2 values(1588262400003, 3)
sql insert into d3.t3 values(1588262400003, 3) sql insert into d3.t3 values(1588262400003, 3)
sql insert into d4.t4 values(1588262400003, 3) sql insert into d4.t4 values(1588262400003, 3)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 3 then if $rows != 3 then
...@@ -173,6 +175,7 @@ sql insert into d1.t1 values(1588262400004, 4) ...@@ -173,6 +175,7 @@ sql insert into d1.t1 values(1588262400004, 4)
sql insert into d2.t2 values(1588262400004, 4) sql insert into d2.t2 values(1588262400004, 4)
sql insert into d3.t3 values(1588262400004, 4) sql insert into d3.t3 values(1588262400004, 4)
sql insert into d4.t4 values(1588262400004, 4) sql insert into d4.t4 values(1588262400004, 4)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 4 then if $rows != 4 then
...@@ -204,6 +207,7 @@ sql insert into d1.t1 values(1588262400005, 5) ...@@ -204,6 +207,7 @@ sql insert into d1.t1 values(1588262400005, 5)
sql insert into d2.t2 values(1588262400005, 5) sql insert into d2.t2 values(1588262400005, 5)
sql insert into d3.t3 values(1588262400005, 5) sql insert into d3.t3 values(1588262400005, 5)
sql insert into d4.t4 values(1588262400005, 5) sql insert into d4.t4 values(1588262400005, 5)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 5 then if $rows != 5 then
...@@ -235,6 +239,7 @@ sql insert into d1.t1 values(1588262400006, 6) ...@@ -235,6 +239,7 @@ sql insert into d1.t1 values(1588262400006, 6)
sql insert into d2.t2 values(1588262400006, 6) sql insert into d2.t2 values(1588262400006, 6)
sql insert into d3.t3 values(1588262400006, 6) sql insert into d3.t3 values(1588262400006, 6)
sql insert into d4.t4 values(1588262400006, 6) sql insert into d4.t4 values(1588262400006, 6)
sleep 1000
sql select * from d1.t1 sql select * from d1.t1
if $rows != 6 then if $rows != 6 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册