未验证 提交 f4c196a4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4184 from taosdata/feature/wal

Feature/wal
......@@ -431,6 +431,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_PORT_HTTP 11
#define TSDB_PORT_ARBITRATOR 12
#define TSDB_MAX_WAL_SIZE (1024*1024)
typedef enum {
TAOS_QTYPE_RPC = 0,
TAOS_QTYPE_FWD = 1,
......
......@@ -237,7 +237,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW, 0, 0x070A, "Too many time window in query")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_ENOUGH_BUFFER, 0, 0x070B, "Query buffer limit has reached")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired")
......@@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sy
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, 0, 0x1002, "WAL size exceeds limit")
// http
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin")
......
......@@ -59,6 +59,7 @@ int32_t walAlter(twalh pWal, SWalCfg *pCfg);
void walStop(twalh);
void walClose(twalh);
int32_t walRenew(twalh);
void walRemoveOldFiles(twalh);
int32_t walWrite(twalh, SWalHead *);
void walFsync(twalh, bool forceFsync);
int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp);
......
......@@ -35,6 +35,8 @@ extern "C" {
#define TAOS_SMSG_SYNC_MUST 6
#define TAOS_SMSG_STATUS 7
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
......
......@@ -79,7 +79,7 @@ int32_t syncInit() {
info.numOfThreads = tsSyncTcpThreads;
info.serverIp = 0;
info.port = tsSyncPort;
info.bufferSize = 640000;
info.bufferSize = SYNC_MAX_SIZE;
info.processBrokenLink = syncProcessBrokenLink;
info.processIncomingMsg = syncProcessPeerMsg;
info.processIncomingConn = syncProcessIncommingConnection;
......@@ -850,7 +850,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
sDebug("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version;
......@@ -859,7 +859,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
syncSaveIntoBuffer(pPeer, pHead);
} else {
sError("%s, forward discarded, ver:%" PRIu64, pPeer->id, pHead->version);
sError("%s, forward discarded, hver:%" PRIu64, pPeer->id, pHead->version);
}
}
}
......@@ -890,10 +890,11 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
// head.len = htonl(head.len);
if (pHead->len < 0) {
sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len);
sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len);
return -1;
}
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
......
......@@ -244,7 +244,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
}
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
SWalHead *pHead = malloc(640000);
SWalHead *pHead = malloc(SYNC_MAX_SIZE);
int32_t code = -1;
int32_t bytes = 0;
int32_t sfd;
......
......@@ -86,7 +86,7 @@ int32_t main(int32_t argc, char *argv[]) {
info.numOfThreads = 1;
info.serverIp = 0;
info.port = tsArbitratorPort;
info.bufferSize = 640000;
info.bufferSize = SYNC_MAX_SIZE;
info.processBrokenLink = arbProcessBrokenLink;
info.processIncomingMsg = arbProcessPeerMsg;
info.processIncomingConn = arbProcessIncommingConnection;
......
......@@ -583,6 +583,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
walRemoveOldFiles(pVnode->wal);
return vnodeSaveVersion(pVnode);
}
......
......@@ -217,6 +217,11 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
if (code != TSDB_CODE_SUCCESS) return code;
}
if (pHead->len > TSDB_MAX_WAL_SIZE) {
vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version);
return TSDB_CODE_WAL_SIZE_LIMIT;
}
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
SVWriteMsg *pWrite = taosAllocateQitem(size);
if (pWrite == NULL) {
......
......@@ -34,7 +34,7 @@ extern int32_t wDebugFlag;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (1024 * 1024)
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (TSDB_FILENAME_LEN + 32)
......
......@@ -135,7 +135,7 @@ void walClose(void *handle) {
if (remove(pWal->name) < 0) {
wError("vgId:%d, wal:%p file:%s, failed to remove", pWal->vgId, pWal, pWal->name);
} else {
wDebug("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
wInfo("vgId:%d, wal:%p file:%s, it is removed", pWal->vgId, pWal, pWal->name);
}
}
} else {
......
......@@ -58,24 +58,32 @@ int32_t walRenew(void *handle) {
wDebug("vgId:%d, file:%s, it is created", pWal->vgId, pWal->name);
}
if (pWal->keep != TAOS_WAL_KEEP) {
// remove the oldest wal file
int64_t oldFileId = -1;
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
char walName[WAL_FILE_LEN] = {0};
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
if (remove(walName) < 0) {
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
} else {
wDebug("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
}
pthread_mutex_unlock(&pWal->mutex);
return code;
}
void walRemoveOldFiles(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
if (pWal->keep == TAOS_WAL_KEEP) return;
pthread_mutex_lock(&pWal->mutex);
// remove the oldest wal file
int64_t oldFileId = -1;
if (walGetOldFile(pWal, pWal->fileId, WAL_FILE_NUM, &oldFileId) == 0) {
char walName[WAL_FILE_LEN] = {0};
snprintf(walName, sizeof(walName), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, oldFileId);
if (remove(walName) < 0) {
wError("vgId:%d, file:%s, failed to remove since %s", pWal->vgId, walName, strerror(errno));
} else {
wInfo("vgId:%d, file:%s, it is removed", pWal->vgId, walName);
}
}
pthread_mutex_unlock(&pWal->mutex);
return code;
}
int32_t walWrite(void *handle, SWalHead *pHead) {
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
print ============== deploy
system sh/exec.sh -n dnode1 -s start
sleep 3001
sql connect
sql create database d1
sql use d1
sql create table t1 (ts timestamp, i int)
sql insert into t1 values(now, 1);
print =============== step3
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step4
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step5
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step6
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step7
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 3000
print =============== step8
system sh/exec.sh -n dnode1 -s start -x SIGKILL
sleep 3000
sql select * from t1;
print rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 3
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode2 -c http -v 1
system sh/cfg.sh -n dnode3 -c http -v 1
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode1 -c replica -v 3
system sh/cfg.sh -n dnode2 -c replica -v 3
system sh/cfg.sh -n dnode3 -c replica -v 3
system sh/cfg.sh -n dnode1 -c maxSQLLength -v 940032
system sh/cfg.sh -n dnode2 -c maxSQLLength -v 940032
system sh/cfg.sh -n dnode3 -c maxSQLLength -v 940032
print ============== deploy
system sh/exec.sh -n dnode1 -s start
sleep 5001
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
print =============== step1
$x = 0
show1:
$x = $x + 1
sleep 2000
if $x == 5 then
return -1
endi
sql show mnodes -x show1
$mnode1Role = $data2_1
print mnode1Role $mnode1Role
$mnode2Role = $data2_2
print mnode2Role $mnode2Role
$mnode3Role = $data2_3
print mnode3Role $mnode3Role
if $mnode1Role != master then
goto show1
endi
if $mnode2Role != slave then
goto show1
endi
if $mnode3Role != slave then
goto show1
endi
print =============== step2
sql create database d1 replica 3
sql use d1
sql create table table_rest (ts timestamp, i int)
print sql length is 870KB
restful d1 table_rest 1591072800 30000
restful d1 table_rest 1591172800 30000
restful d1 table_rest 1591272800 30000
restful d1 table_rest 1591372800 30000
restful d1 table_rest 1591472800 30000
restful d1 table_rest 1591572800 30000
restful d1 table_rest 1591672800 30000
restful d1 table_rest 1591772800 30000
restful d1 table_rest 1591872800 30000
restful d1 table_rest 1591972800 30000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
print =============== step3
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 5000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
system sh/exec.sh -n dnode1 -s start -x SIGINT
sleep 5000
print =============== step4
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 5000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
system sh/exec.sh -n dnode2 -s start -x SIGINT
sleep 5000
print =============== step5
system sh/exec.sh -n dnode3 -s stop -x SIGINT
sleep 5000
sql select * from table_rest;
print rows: $rows
if $rows != 300000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
\ No newline at end of file
......@@ -236,6 +236,9 @@ cd ../../../debug; make
./test.sh -f general/vector/table_query.sim
./test.sh -f general/vector/table_time.sim
./test.sh -f general/wal/sync.sim
./test.sh -f general/wal/kill.sim
./test.sh -f unique/account/account_create.sim
./test.sh -f unique/account/account_delete.sim
./test.sh -f unique/account/account_len.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册