From 6bf39dfa3b6bdbf1afc622e18e4e13118d18178b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 27 Nov 2020 17:16:49 +0800 Subject: [PATCH] TD-1455 TD-2196 --- src/inc/taoserror.h | 5 ++++- src/sync/src/syncRetrieve.c | 36 +++++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 77ec5350ba..5fde6d40d5 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -261,6 +261,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_VND_COMMITING, 0, 0x0904, "Vnode is commiting") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_FILE_CHNAGED, 0, 0x0905, "Vnode file is changed") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_APP_ERROR, 0, 0x1000, "Unexpected generic error in sync") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") @@ -367,6 +370,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG, 0, 0x11A4, "tag value TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_NULL, 0, 0x11A5, "value not find") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_OP_VALUE_TYPE, 0, 0x11A6, "value type should be boolean, number or string") +// odbc TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_OOM, 0, 0x2100, "out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM, 0, 0x2101, "convertion not a valid literal input") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_UNDEF, 0, 0x2102, "convertion undefined") @@ -390,7 +394,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_BAD_SEQ, 0, 0x2113, "src bad se TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_INCOMPLETE, 0, 0x2114, "src incomplete") TAOS_DEFINE_ERROR(TSDB_CODE_ODBC_CONV_SRC_GENERAL, 0, 0x2115, "src general") - #ifdef TAOS_ERROR_C }; #endif diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index c956353a32..4559285a1f 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include #include "os.h" +#include "taoserror.h" #include "tlog.h" #include "tutil.h" #include "tglobal.h" @@ -26,28 +27,31 @@ #include "syncInt.h" static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) { - if (pNode->getFileVersion == NULL) return 0; + if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS; uint64_t fver = 0; int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver); if (code != 0) { - sInfo("%s, file are modified while retrieve, lastver:%" PRIu64, pPeer->id, pPeer->lastVer); - return code; + sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer); + pPeer->fileChanged = 1; + return TSDB_CODE_SYN_VND_COMMITING; } if (fver != pPeer->lastVer) { - sInfo("%s, file are modified while retrieve, fver:%" PRIu64 " lastver:%" PRIu64, pPeer->id, fver, pPeer->lastVer); - return -1; + sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer); + pPeer->fileChanged = 1; + return TSDB_CODE_SYN_FILE_CHNAGED; } - return 0; + pPeer->fileChanged = 0; + return TSDB_CODE_SUCCESS; } static int32_t syncRetrieveFile(SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SFileInfo fileInfo = {0}; SFileAck fileAck = {0}; - int32_t code = -1; + int32_t code = TSDB_CODE_SYN_APP_ERROR; char name[TSDB_FILENAME_LEN * 2] = {0}; if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer); @@ -58,25 +62,27 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, &fileInfo.size, &fileInfo.fversion); // fileInfo.size = htonl(size); - sDebug("%s, file:%s info will be sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); + sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); // send the file info int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } // if no file anymore, break if (fileInfo.magic == 0 || fileInfo.name[0] == 0) { + code = TSDB_CODE_SUCCESS; sDebug("%s, no more files to sync", pPeer->id); - code = 0; break; } // wait for the ack from peer ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -97,6 +103,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { // send the file to peer int32_t sfd = open(name, O_RDONLY); if (sfd < 0) { + code = TAOS_SYSTEM_ERROR(errno); sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -104,6 +111,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); close(sfd); if (ret < 0) { + code = TAOS_SYSTEM_ERROR(errno); sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); break; } @@ -112,11 +120,12 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { fileInfo.index++; // check if processed files are modified - if (syncAreFilesModified(pNode, pPeer) != 0) break; + code = syncAreFilesModified(pNode, pPeer); + if (code != TSDB_CODE_SUCCESS) break; } - if (code < 0) { - sError("%s, failed to retrieve file", pPeer->id); + if (code != TSDB_CODE_SUCCESS) { + sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code)); } return code; @@ -368,8 +377,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { close(sfd); if (code < 0) break; - index++; - if (syncAreFilesModified(pNode, pPeer) != 0) break; } @@ -445,7 +452,6 @@ void *syncRetrieveData(void *param) { if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves); - pPeer->fileChanged = 0; pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (pPeer->syncFd < 0) { sError("%s, failed to open socket to sync", pPeer->id); -- GitLab