提交 dbdd70ac 编写于 作者: S Shengliang Guan

TD-2196

上级 0a90950b
...@@ -65,7 +65,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -65,7 +65,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
while (1) { while (1) {
// read file info // read file info
int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo));
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
break;
}
// if no more file from master, break; // if no more file from master, break;
if (minfo.name[0] == 0 || minfo.magic == 0) { if (minfo.name[0] == 0 || minfo.magic == 0) {
...@@ -92,8 +95,11 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -92,8 +95,11 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0; fileAck.sync = (sinfo.magic != minfo.magic || sinfo.name[0] == 0) ? 1 : 0;
// send file ack // send file ack
ret = taosWriteMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
break;
}
// if sync is not required, continue // if sync is not required, continue
if (fileAck.sync == 0) { if (fileAck.sync == 0) {
...@@ -108,14 +114,17 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -108,14 +114,17 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); int32_t dfd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (dfd < 0) { if (dfd < 0) {
sError("%s, failed to open file:%s", pPeer->id, name); sError("%s, failed to open file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno));
break; break;
} }
ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size); ret = taosCopyFds(pPeer->syncFd, dfd, minfo.size);
fsync(dfd); fsync(dfd);
close(dfd); close(dfd);
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to copy file:%s while restore file since %s", pPeer->id, minfo.name, strerror(errno));
break;
}
fileChanged = true; fileChanged = true;
sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size); sDebug("%s, %s is received, size:%" PRId64, pPeer->id, minfo.name, minfo.size);
...@@ -125,6 +134,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -125,6 +134,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// data file is changed, code shall be set to 1 // data file is changed, code shall be set to 1
*fversion = minfo.fversion; *fversion = minfo.fversion;
code = 1; code = 1;
sDebug("%s, file changed while restore file", pPeer->id);
} }
if (code < 0) { if (code < 0) {
...@@ -146,15 +156,22 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -146,15 +156,22 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
while (1) { while (1) {
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno));
break;
}
if (pHead->len == 0) { if (pHead->len == 0) {
sDebug("%s, wal is synced over", pPeer->id);
code = 0; code = 0;
break; break;
} // wal sync over } // wal sync over
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno));
break;
}
sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version); sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version);
......
...@@ -114,7 +114,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -114,7 +114,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file info // send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
// if no file anymore, break // if no file anymore, break
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
...@@ -124,8 +127,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -124,8 +127,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
} }
// wait for the ack from peer // wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
// set the peer sync version // set the peer sync version
pPeer->sversion = fileInfo.fversion; pPeer->sversion = fileInfo.fversion;
...@@ -134,7 +140,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -134,7 +140,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name); snprintf(name, sizeof(name), "%s/%s", pNode->path, fileInfo.name);
// add the file into watch list // add the file into watch list
if (syncAddIntoWatchList(pPeer, name) < 0) break; if (syncAddIntoWatchList(pPeer, name) < 0) {
sError("%s, failed to watch file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
// if sync is not required, continue // if sync is not required, continue
if (fileAck.sync == 0) { if (fileAck.sync == 0) {
...@@ -145,21 +154,30 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -145,21 +154,30 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file to peer // send the file to peer
int32_t sfd = open(name, O_RDONLY); int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) break; if (sfd < 0) {
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret < 0) break; if (ret < 0) {
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
fileInfo.index++; fileInfo.index++;
// check if processed files are modified // check if processed files are modified
if (syncAreFilesModified(pPeer) != 0) break; if (syncAreFilesModified(pPeer) != 0) {
sInfo("%s, file:%s are modified while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
} }
if (code < 0) { if (code < 0) {
sError("%s, failed to retrieve file since %s", pPeer->id, strerror(errno)); sError("%s, failed to retrieve file", pPeer->id);
} }
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册