提交 9ca3c242 编写于 作者: S slguan

fix the issue #201

上级 f192397c
...@@ -169,7 +169,6 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[] ...@@ -169,7 +169,6 @@ int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]
vnodeReadLastBlockToMem(pObj, &lastBlock, data); vnodeReadLastBlockToMem(pObj, &lastBlock, data);
pHinfo->compInfo.numOfBlocks--; pHinfo->compInfo.numOfBlocks--;
code = lastBlock.numOfPoints; code = lastBlock.numOfPoints;
pImport->slot--;
} }
return code; return code;
...@@ -237,12 +236,14 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf ...@@ -237,12 +236,14 @@ int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinf
} else if (pVnode->commitFileId == pImport->fileId) { } else if (pVnode->commitFileId == pImport->fileId) {
int slots = pImport->pos ? pImport->slot + 1 : pImport->slot; int slots = pImport->pos ? pImport->slot + 1 : pImport->slot;
assert(slots >= 0);
pHinfo->leftOffset += slots * sizeof(SCompBlock); pHinfo->leftOffset += slots * sizeof(SCompBlock);
// check if last block is at last file, if it is, read into memory // check if last block is at last file, if it is, read into memory
if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks && if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks &&
pHinfo->compInfo.last) { pHinfo->compInfo.last) {
rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data);
if ( rowsBefore > 0 ) pImport->slot--;
} }
// this block will be replaced by new blocks // this block will be replaced by new blocks
...@@ -787,6 +788,8 @@ int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) { ...@@ -787,6 +788,8 @@ int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) {
code = vnodeFindKeyInFile(pImport, 1); code = vnodeFindKeyInFile(pImport, 1);
if (code != 0) return code; if (code != 0) return code;
assert(pImport->slot >= 0);
if (pImport->key != pImport->firstKey) { if (pImport->key != pImport->firstKey) {
pImport->payload = payload; pImport->payload = payload;
pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key);
...@@ -845,7 +848,7 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { ...@@ -845,7 +848,7 @@ int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) {
} }
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *pNumOfPoints) { int *pNumOfPoints, TSKEY now) {
SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; SVnodeObj *pVnode = &vnodeList[pObj->vnode];
int rows; int rows;
...@@ -872,7 +875,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -872,7 +875,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
payload = pSubmit->payLoad; payload = pSubmit->payLoad;
int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; int firstId = (*(TSKEY *)payload)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; int lastId = (*(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)))/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
int cfile = taosGetTimestamp(pVnode->cfg.precision)/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; int cfile = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) { if ((firstId <= cfile - pVnode->maxFiles) || (firstId > cfile + 1) || (lastId <= cfile - pVnode->maxFiles) || (lastId > cfile + 1)) {
dError("vid:%d sid:%d id:%s, invalid timestamp to import, firstKey: %ld lastKey: %ld", dError("vid:%d sid:%d id:%s, invalid timestamp to import, firstKey: %ld lastKey: %ld",
pObj->vnode, pObj->sid, pObj->meterId, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1))); pObj->vnode, pObj->sid, pObj->meterId, *(TSKEY *)(payload), *(TSKEY *)(payload+pObj->bytesPerPoint*(rows-1)));
...@@ -888,7 +891,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -888,7 +891,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now);
if (pShell) { if (pShell) {
pShell->code = code; pShell->code = code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册