diff --git a/src/system/inc/vnode.h b/src/system/inc/vnode.h index 72f7d8e6a0466f9f545bcee605bb688aa70e353e..fa118cd1ec9bad83c1f022f43a829e3ba9e3fa96 100644 --- a/src/system/inc/vnode.h +++ b/src/system/inc/vnode.h @@ -419,10 +419,6 @@ void vnodeCommitOver(SVnodeObj *pVnode); TSKEY vnodeGetFirstKey(int vnode); -int vnodeSyncRetrieveCache(int vnode, int fd); - -int vnodeSyncRestoreCache(int vnode, int fd); - pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode); void vnodeCancelCommit(SVnodeObj *pVnode); @@ -448,10 +444,6 @@ void *vnodeCommitToFile(void *param); void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid); -int vnodeSyncRetrieveFile(int vnode, int fd, uint32_t fileId, uint64_t *fmagic); - -int vnodeSyncRestoreFile(int vnode, int sfd); - int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pBlock, SData *data[], SData *cdata[], int pointsRead); int vnodeSearchPointInFile(SMeterObj *pObj, SQuery *pQuery); @@ -477,14 +469,8 @@ void *vnodeGetMeterPeerConnection(SMeterObj *pObj, int index); int vnodeForwardToPeer(SMeterObj *pObj, char *msg, int msgLen, char action, int sversion); -void vnodeCloseAllSyncFds(int vnode); - void vnodeConfigVPeers(int vnode, int numOfPeers, SVPeerDesc peerDesc[]); -void vnodeStartSyncProcess(SVnodeObj *pVnode); - -void vnodeCancelSync(int vnode); - void vnodeListPeerStatus(char *buffer); void vnodeCheckOwnStatus(SVnodeObj *pVnode); diff --git a/src/system/src/dnodeMgmt.c b/src/system/src/dnodeMgmt.c index 96bdeeb214285899e2e057a3b3497f9598093fc5..28e723f3f08668e74d32dd2dbf56f0fd4966de86 100644 --- a/src/system/src/dnodeMgmt.c +++ b/src/system/src/dnodeMgmt.c @@ -113,7 +113,7 @@ int vnodeProcessCreateMeterRequest(char *pMsg) { pVnode = vnodeList + vid; if (pVnode->cfg.maxSessions <= 0) { dError("vid:%d, not activated", vid); - code = TSDB_CODE_INVALID_SESSION_ID; + code = TSDB_CODE_NOT_ACTIVE_SESSION; goto _over; } @@ -215,7 +215,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg) { if (pVnode->pCachePool == NULL) { dError("vid:%d is not activated yet", pCreate->vnode); vnodeSendVpeerCfgMsg(pCreate->vnode); - code = TSDB_CODE_INVALID_SESSION_ID; + code = TSDB_CODE_NOT_ACTIVE_SESSION; goto _create_over; } diff --git a/src/system/src/dnodeSystem.c b/src/system/src/dnodeSystem.c index 7d7c3ee6aa90f647d95c148f0170625907c23ff4..bc08d5b3c2a5e2f26fa7b9cc8c2dc2d20283469e 100644 --- a/src/system/src/dnodeSystem.c +++ b/src/system/src/dnodeSystem.c @@ -44,7 +44,7 @@ void dnodeInitModules() { tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem; tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem; tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem; - tsModule[TSDB_MOD_HTTP].num = tsEnableHttpModule ? -1 : 0; + tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0; tsModule[TSDB_MOD_HTTP].curNum = 0; tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0; @@ -53,7 +53,7 @@ void dnodeInitModules() { tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem; tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem; tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem; - tsModule[TSDB_MOD_MONITOR].num = tsEnableMonitorModule ? -1 : 0; + tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0; tsModule[TSDB_MOD_MONITOR].curNum = 0; tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0; } diff --git a/src/system/src/vnodeFile.c b/src/system/src/vnodeFile.c index 5fa3a4ad1fa738cb0becf25b5ae170c4eeb8bd94..98bc482d3a51fcbf31a437dd77f3995dede05ab5 100644 --- a/src/system/src/vnodeFile.c +++ b/src/system/src/vnodeFile.c @@ -408,7 +408,6 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) { char dpath[TSDB_FILENAME_LEN] = "\0"; int fileId; int ret; - int file_removed = 0; close(pVnode->nfd); pVnode->nfd = 0; @@ -449,14 +448,15 @@ void vnodeCloseCommitFiles(SVnodeObj *pVnode) { dTrace("vid:%d, %s and %s is saved", pVnode->vnode, pVnode->cfn, pVnode->lfn); - if (pVnode->numOfFiles > pVnode->maxFiles) { - fileId = pVnode->fileId - pVnode->numOfFiles + 1; + // Retention policy here + fileId = pVnode->fileId - pVnode->numOfFiles + 1; + int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; + while (fileId <= cfile - pVnode->maxFiles) { vnodeRemoveFile(pVnode->vnode, fileId); pVnode->numOfFiles--; - file_removed = 1; + fileId++; } - if (!file_removed) vnodeUpdateFileMagic(pVnode->vnode, pVnode->commitFileId); vnodeSaveAllMeterObjToFile(pVnode->vnode); return; diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index 1f25141e48fec9f75aba2da86cf30500a1d4ccef..a49c9a56194988cd106a5ec819ed92657ee7dd92 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -870,10 +870,13 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } payload = pSubmit->payLoad; - if (pVnode->lastKeyOnFile > pVnode->cfg.daysToKeep * tsMsPerDay[pVnode->cfg.precision] + *((TSKEY *)(payload))) { - dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is too old to import, key:%lld", - pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, *(TSKEY *)(payload)); - return TSDB_CODE_OTHERS; + int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; + int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; + int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; + if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) { + dError("vid:%d sid:%d id:%s, invalid timestamp to import, firstKey: %ld lastKey: %ld", + pObj->vnode, pObj->sid, pObj->meterId, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1))); + return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; } if ( pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {