提交 e00987e6 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-2004

...@@ -431,6 +431,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -431,6 +431,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_PORT_HTTP 11 #define TSDB_PORT_HTTP 11
#define TSDB_PORT_ARBITRATOR 12 #define TSDB_PORT_ARBITRATOR 12
#define TSDB_MAX_WAL_SIZE (1024*1024)
typedef enum { typedef enum {
TAOS_QTYPE_RPC = 0, TAOS_QTYPE_RPC = 0,
TAOS_QTYPE_FWD = 1, TAOS_QTYPE_FWD = 1,
......
...@@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sy ...@@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sy
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, 0, 0x1002, "WAL size exceeds limit")
// http // http
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin")
......
...@@ -59,6 +59,7 @@ int32_t walAlter(twalh pWal, SWalCfg *pCfg); ...@@ -59,6 +59,7 @@ int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh); void walStop(twalh);
void walClose(twalh); void walClose(twalh);
int32_t walRenew(twalh); int32_t walRenew(twalh);
void walRemoveOldFiles(twalh);
int32_t walWrite(twalh, SWalHead *); int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh, bool forceFsync); void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
......
...@@ -35,6 +35,8 @@ extern "C" { ...@@ -35,6 +35,8 @@ extern "C" {
#define TAOS_SMSG_SYNC_MUST 6 #define TAOS_SMSG_SYNC_MUST 6
#define TAOS_SMSG_STATUS 7 #define TAOS_SMSG_STATUS 7
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus #define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
......
...@@ -79,7 +79,7 @@ int32_t syncInit() { ...@@ -79,7 +79,7 @@ int32_t syncInit() {
info.numOfThreads = tsSyncTcpThreads; info.numOfThreads = tsSyncTcpThreads;
info.serverIp = 0; info.serverIp = 0;
info.port = tsSyncPort; info.port = tsSyncPort;
info.bufferSize = 640000; info.bufferSize = SYNC_MAX_SIZE;
info.processBrokenLink = syncProcessBrokenLink; info.processBrokenLink = syncProcessBrokenLink;
info.processIncomingMsg = syncProcessPeerMsg; info.processIncomingMsg = syncProcessPeerMsg;
info.processIncomingConn = syncProcessIncommingConnection; info.processIncomingConn = syncProcessIncommingConnection;
...@@ -850,7 +850,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -850,7 +850,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)cont; SWalHead * pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
...@@ -859,7 +859,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -859,7 +859,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead); syncSaveIntoBuffer(pPeer, pHead);
} else { } else {
sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version); sError("%s, forward discarded, hver:%" PRIu64, pPeer->id, pHead->version);
} }
} }
} }
...@@ -890,10 +890,11 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -890,10 +890,11 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
// head.len = htonl(head.len); // head.len = htonl(head.len);
if (pHead->len < 0) { if (pHead->len < 0) {
sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len);
return -1; return -1;
} }
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
......
...@@ -244,7 +244,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { ...@@ -244,7 +244,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
} }
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) { static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
SWalHead *pHead = malloc(640000); SWalHead *pHead = malloc(SYNC_MAX_SIZE);
int32_t code = -1; int32_t code = -1;
int32_t bytes = 0; int32_t bytes = 0;
int32_t sfd; int32_t sfd;
......
...@@ -86,7 +86,7 @@ int32_t main(int32_t argc, char *argv[]) { ...@@ -86,7 +86,7 @@ int32_t main(int32_t argc, char *argv[]) {
info.numOfThreads = 1; info.numOfThreads = 1;
info.serverIp = 0; info.serverIp = 0;
info.port = tsArbitratorPort; info.port = tsArbitratorPort;
info.bufferSize = 640000; info.bufferSize = SYNC_MAX_SIZE;
info.processBrokenLink = arbProcessBrokenLink; info.processBrokenLink = arbProcessBrokenLink;
info.processIncomingMsg = arbProcessPeerMsg; info.processIncomingMsg = arbProcessPeerMsg;
info.processIncomingConn = arbProcessIncommingConnection; info.processIncomingConn = arbProcessIncommingConnection;
......
...@@ -592,6 +592,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { ...@@ -592,6 +592,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
if (status == TSDB_STATUS_COMMIT_OVER) { if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version); vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
walRemoveOldFiles(pVnode->wal);
return vnodeSaveVersion(pVnode); return vnodeSaveVersion(pVnode);
} }
......
...@@ -217,6 +217,11 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar ...@@ -217,6 +217,11 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
} }
if (pHead->len > TSDB_MAX_WAL_SIZE) {
vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version);
return TSDB_CODE_WAL_SIZE_LIMIT;
}
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
SVWriteMsg *pWrite = taosAllocateQitem(size); SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) { if (pWrite == NULL) {
......
...@@ -34,7 +34,7 @@ extern int32_t wDebugFlag; ...@@ -34,7 +34,7 @@ extern int32_t wDebugFlag;
#define WAL_PREFIX "wal" #define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3 #define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (1024 * 1024) #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32) #define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32)
......
...@@ -135,7 +135,7 @@ void walClose(void *handle) { ...@@ -135,7 +135,7 @@ void walClose(void *handle) {
if (remove(pWal->name) < 0) { if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name); wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else { } else {
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name); wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
} }
} }
} else { } else {
......
...@@ -58,7 +58,18 @@ int32_t walRenew(void *handle) { ...@@ -58,7 +58,18 @@ int32_t walRenew(void *handle) {
wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name); wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
} }
if (pWal->keep != TAOS_WAL_KEEP) { pthread_mutex_unlock(&pWal->mutex);
return code;
}
void walRemoveOldFiles(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
if (pWal->keep == TAOS_WAL_KEEP) return;
pthread_mutex_lock(&pWal->mutex);
// remove the oldest wal file // remove the oldest wal file
int64_t oldFileId = -1; int64_t oldFileId = -1;
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) { if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
...@@ -68,14 +79,11 @@ int32_t walRenew(void *handle) { ...@@ -68,14 +79,11 @@ int32_t walRenew(void *handle) {
if (remove(walName) < 0) { if (remove(walName) < 0) {
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno)); wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
} else { } else {
wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName); wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
}
} }
} }
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
return code;
} }
int32_t walWrite(void *handle, SWalHead *pHead) { int32_t walWrite(void *handle, SWalHead *pHead) {
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ============== deploy
system sh/exec.sh -n dnode1 -s start
sleep 3001
sql connect
sql create database d1
sql use d1
sql create table t1 (ts timestamp, i int)
sql insert into t1 values(now, 1);
print =============== step3
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step4
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step5
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step6
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step7
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step8
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode2 -c http -v 1
system sh/cfg.sh -n dnode3 -c http -v 1
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode1 -c replica -v 3
system sh/cfg.sh -n dnode2 -c replica -v 3
system sh/cfg.sh -n dnode3 -c replica -v 3
system sh/cfg.sh -n dnode1 -c maxSQLLength -v 940032
system sh/cfg.sh -n dnode2 -c maxSQLLength -v 940032
system sh/cfg.sh -n dnode3 -c maxSQLLength -v 940032
print ============== deploy
system sh/exec.sh -n dnode1 -s start
sleep 5001
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step1
$x = 0
show1:
$x = $x + 1
sleep 2000
if $x == 5 then
return -1
endi
sql show mnodes -x show1
$mnode1Role = $data2_1
print mnode1Role $mnode1Role
$mnode2Role = $data2_2
print mnode2Role $mnode2Role
$mnode3Role = $data2_3
print mnode3Role $mnode3Role
if $mnode1Role != master then
goto show1
endi
if $mnode2Role != slave then
goto show1
endi
if $mnode3Role != slave then
goto show1
endi
print =============== step2
sql create database d1 replica 3
sql use d1
sql create table table_rest (ts timestamp, i int)
print sql length is 870KB
restful d1 table_rest 1591072800 30000
restful d1 table_rest 1591172800 30000
restful d1 table_rest 1591272800 30000
restful d1 table_rest 1591372800 30000
restful d1 table_rest 1591472800 30000
restful d1 table_rest 1591572800 30000
restful d1 table_rest 1591672800 30000
restful d1 table_rest 1591772800 30000
restful d1 table_rest 1591872800 30000
restful d1 table_rest 1591972800 30000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
print =============== step3
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 5000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
system sh/exec.sh -n dnode1 -s start -x SIGINT
sleep 5000
print =============== step4
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 5000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
system sh/exec.sh -n dnode2 -s start -x SIGINT
sleep 5000
print =============== step5
system sh/exec.sh -n dnode3 -s stop -x SIGINT
sleep 5000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
\ No newline at end of file
...@@ -236,6 +236,9 @@ cd ../../../debug; make ...@@ -236,6 +236,9 @@ cd ../../../debug; make
./test.sh -f general/vector/table_query.sim ./test.sh -f general/vector/table_query.sim
./test.sh -f general/vector/table_time.sim ./test.sh -f general/vector/table_time.sim
./test.sh -f general/wal/sync.sim
./test.sh -f general/wal/kill.sim
./test.sh -f unique/account/account_create.sim ./test.sh -f unique/account/account_create.sim
./test.sh -f unique/account/account_delete.sim ./test.sh -f unique/account/account_delete.sim
./test.sh -f unique/account/account_len.sim ./test.sh -f unique/account/account_len.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册