diff --git a/src/system/src/vnodeImport.c b/src/system/src/vnodeImport.c index 34a7a3b7d0efd54c2ec0d1068c05b066dde7b3d1..b7a5a71961368ffce7b23556007de39bd299d65d 100644 --- a/src/system/src/vnodeImport.c +++ b/src/system/src/vnodeImport.c @@ -169,7 +169,6 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[] vnodeReadLastBlockToMem(pObj, &lastBlock, data); pHinfo->compInfo.numOfBlocks--; code = lastBlock.numOfPoints; - pImport->slot--; } return code; @@ -237,12 +236,14 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf } else if (pVnode->commitFileId == pImport->fileId) { int slots = pImport->pos ? pImport->slot + 1 : pImport->slot; + assert(slots >= 0); pHinfo->leftOffset += slots * sizeof(SCompBlock); // check if last block is at last file, if it is, read into memory if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks && pHinfo->compInfo.last) { rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); + if ( rowsBefore > 0 ) pImport->slot--; } // this block will be replaced by new blocks @@ -787,6 +788,8 @@ int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) { code = vnodeFindKeyInFile(pImport, 1); if (code != 0) return code; + assert(pImport->slot >= 0); + if (pImport->key != pImport->firstKey) { pImport->payload = payload; pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); @@ -845,7 +848,7 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { } int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, - int *pNumOfPoints) { + int *pNumOfPoints, TSKEY now) { SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SVnodeObj *pVnode = &vnodeList[pObj->vnode]; int rows; @@ -872,7 +875,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi payload = pSubmit->payLoad; int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; + int cfile = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) { dError("vid:%d sid:%d id:%s, invalid timestamp to import, firstKey: %ld lastKey: %ld", pObj->vnode, pObj->sid, pObj->meterId, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1))); @@ -888,7 +891,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); - code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported); + code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); if (pShell) { pShell->code = code;