diff --git a/src/system/detail/src/vnodeCache.c b/src/system/detail/src/vnodeCache.c index 8b51bc460923fa5dee6bad1c4d75e91789d59cec..ac7e19524aea72cfeb7f428fa44e48a80de8a8a3 100644 --- a/src/system/detail/src/vnodeCache.c +++ b/src/system/detail/src/vnodeCache.c @@ -372,13 +372,60 @@ void vnodeCancelCommit(SVnodeObj *pVnode) { taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer); } +/* The vnode cache lock should be hold before calling this interface + */ +SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode) { + SCachePool *pPool = (SCachePool *)(pVnode->pCachePool); + SVnodeCfg *pCfg = &(pVnode->cfg); + SCacheBlock *pCacheBlock = NULL; + int skipped = 0; + + while (1) { + pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]); + if (pCacheBlock->blockId == 0) break; + + if (pCacheBlock->notFree) { + pPool->freeSlot++; + pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks; + skipped++; + if (skipped > pPool->threshold) { + vnodeCreateCommitThread(pVnode); + pthread_mutex_unlock(&pPool->vmutex); + dError("vid:%d committing process is too slow, notFreeSlots:%d....", pVnode->vnode, pPool->notFreeSlots); + return NULL; + } + } else { + SMeterObj * pRelObj = pCacheBlock->pMeterObj; + SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache; + int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks; + pCacheBlock = pRelInfo->cacheBlocks[firstSlot]; + if (pCacheBlock) { + pPool->freeSlot = pCacheBlock->index; + vnodeFreeCacheBlock(pCacheBlock); + break; + } else { + pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks; + skipped++; + } + } + } + + pCacheBlock = (SCacheBlock *)(pPool->pMem[pPool->freeSlot]); + pCacheBlock->index = pPool->freeSlot; + pCacheBlock->notFree = 1; + pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks; + pPool->notFreeSlots++; + + return pCacheBlock; +} + int vnodeAllocateCacheBlock(SMeterObj *pObj) { int index; SCachePool * pPool; SCacheBlock *pCacheBlock; SCacheInfo * pInfo; SVnodeObj * pVnode; - int skipped = 0, commit = 0; + int commit = 0; pVnode = vnodeList + pObj->vnode; pPool = (SCachePool *)pVnode->pCachePool; @@ -406,45 +453,10 @@ int vnodeAllocateCacheBlock(SMeterObj *pObj) { return -1; } - while (1) { - pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]); - if (pCacheBlock->blockId == 0) break; - - if (pCacheBlock->notFree) { - pPool->freeSlot++; - pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks; - skipped++; - if (skipped > pPool->threshold) { - vnodeCreateCommitThread(pVnode); - pthread_mutex_unlock(&pPool->vmutex); - dError("vid:%d sid:%d id:%s, committing process is too slow, notFreeSlots:%d....", - pObj->vnode, pObj->sid, pObj->meterId, pPool->notFreeSlots); - return -1; - } - } else { - SMeterObj *pRelObj = pCacheBlock->pMeterObj; - SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache; - int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks; - pCacheBlock = pRelInfo->cacheBlocks[firstSlot]; - if (pCacheBlock) { - pPool->freeSlot = pCacheBlock->index; - vnodeFreeCacheBlock(pCacheBlock); - break; - } else { - pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks; - skipped++; - } - } - } - - index = pPool->freeSlot; - pPool->freeSlot++; - pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks; - pPool->notFreeSlots++; + if ((pCacheBlock = vnodeGetFreeCacheBlock(pVnode)) == NULL) return -1; + index = pCacheBlock->index; pCacheBlock->pMeterObj = pObj; - pCacheBlock->notFree = 1; - pCacheBlock->index = index; pCacheBlock->offset[0] = ((char *)(pCacheBlock)) + sizeof(SCacheBlock) + pObj->numOfColumns * sizeof(char *); for (int col = 1; col < pObj->numOfColumns; ++col) diff --git a/src/system/detail/src/vnodeFile.c b/src/system/detail/src/vnodeFile.c index df94c883ace04237577bb905fe0b2b758fd7f2ab..f95ef0176534321c33cd2768ba806963e4c14f1b 100644 --- a/src/system/detail/src/vnodeFile.c +++ b/src/system/detail/src/vnodeFile.c @@ -103,8 +103,8 @@ void vnodeGetDnameFromLname(char *lhead, char *ldata, char *llast, char *dhead, } void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId) { - sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId); - sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId); + if (nHeadName != NULL) sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId); + if (nLastName != NULL) sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId); } void vnodeCreateDataDirIfNeeded(int vnode, char *path) { diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index f50b6f49461def53da870f448e6d4aabd213ed95..96aeb99e20fca4c702e59ce078c23acd03a2b0bc 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -15,31 +15,24 @@ #define _DEFAULT_SOURCE #include +#include +#include #include -#include #include -#include "trpc.h" -#include "ttimer.h" #include "vnode.h" -#include "vnodeMgmt.h" -#include "vnodeShell.h" -#include "vnodeShell.h" #include "vnodeUtil.h" -#pragma GCC diagnostic ignored "-Wpointer-sign" -#pragma GCC diagnostic ignored "-Wint-conversion" -typedef struct { - SCompHeader *headList; - SCompInfo compInfo; - int last; // 0:last block in data file, 1:not the last block - int newBlocks; - int oldNumOfBlocks; - int64_t compInfoOffset; // offset for compInfo in head file - int64_t leftOffset; // copy from this offset to end of head file - int64_t hfdSize; // old head file size -} SHeadInfo; +extern void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId); +extern int vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize, + char *temp, char *buffer, int bufferSize); +extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints); +extern void vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId); +extern int vnodeCreateEmptyCompFile(int vnode, int fileId); +extern int vnodeUpdateFreeSlot(SVnodeObj *pVnode); +extern SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode); +#define KEY_AT_INDEX(payload, step, idx) (*(TSKEY *)((char *)(payload) + (step) * (idx))) typedef struct { void * signature; SShellObj *pShell; @@ -56,225 +49,112 @@ typedef struct { // only for file int numOfPoints; - int fileId; int64_t offset; // offset in data file - SData *sdata[TSDB_MAX_COLUMNS]; - char *buffer; - char *payload; - char *opayload; + char * payload; + char * opayload; // allocated space for payload from client int rows; } SImportInfo; -int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport); - -int vnodeGetImportStartPart(SMeterObj *pObj, char *payload, int rows, TSKEY key1) { - int i; - - for (i = 0; i < rows; ++i) { - TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); - if (key >= key1) break; - } - - return i; -} - -int vnodeGetImportEndPart(SMeterObj *pObj, char *payload, int rows, char **pStart, TSKEY key0) { - int i; - - for (i = 0; i < rows; ++i) { - TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); - if (key > key0) break; - } - - *pStart = payload + i * pObj->bytesPerPoint; - return rows - i; -} - -int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) { - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - TSCKSUM chksum = 0; - - if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0; +typedef struct { + // in .head file + SCompHeader *pHeader; + size_t pHeaderSize; - if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); + SCompInfo compInfo; + SCompBlock *pBlocks; + // in .data file + int blockId; + uint8_t blockLoadState; - int leftSize = pHinfo->hfdSize - pHinfo->leftOffset; - if (leftSize > 0) { - lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize); - } + SField *pField; + size_t pFieldSize; - pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks; - int offset = (pHinfo->compInfo.numOfBlocks - pHinfo->oldNumOfBlocks) * sizeof(SCompBlock); - if (pHinfo->oldNumOfBlocks == 0) offset += sizeof(SCompInfo) + sizeof(TSCKSUM); + SData *data[TSDB_MAX_COLUMNS]; + char * buffer; - pHinfo->headList[pObj->sid].compInfoOffset = pHinfo->compInfoOffset; - for (int sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) { - if (pHinfo->headList[sid].compInfoOffset) pHinfo->headList[sid].compInfoOffset += offset; - } + char *temp; - lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET); - int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize); - twrite(pVnode->nfd, pHinfo->headList, tmsize); + char * tempBuffer; + size_t tempBufferSize; + // Variables for sendfile + int64_t compInfoOffset; + int64_t nextNo0Offset; // next sid whose compInfoOffset > 0 + int64_t hfSize; + int64_t driftOffset; - int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); - char *buffer = malloc(size); - lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - read(pVnode->nfd, buffer, size); - SCompBlock *pBlock = (SCompBlock *)(buffer + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock)); + int oldNumOfBlocks; + int newNumOfBlocks; + int last; +} SImportHandle; - pHinfo->compInfo.uid = pObj->uid; - pHinfo->compInfo.delimiter = TSDB_VNODE_DELIMITER; - pHinfo->compInfo.last = pBlock->last; +typedef struct { + int slot; + int pos; + int oslot; // old slot + TSKEY nextKey; +} SBlockIter; - taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo)); - lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET); - twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); +typedef struct { + int64_t spos; + int64_t epos; + int64_t totalRows; + char * offset[]; +} SMergeBuffer; - chksum = taosCalcChecksum(0, (uint8_t *)buffer, size); - lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET); - twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); - free(buffer); +int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport); - vnodeCloseCommitFiles(pVnode); +int vnodeFindKeyInCache(SImportInfo *pImport, int order) { + SMeterObj * pObj = pImport->pObj; + int code = 0; + SQuery query; + SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - return 0; -} + TSKEY key = order ? pImport->firstKey : pImport->lastKey; + memset(&query, 0, sizeof(query)); + query.order.order = order; + query.skey = key; + query.ekey = order ? pImport->lastKey : pImport->firstKey; + vnodeSearchPointInCache(pObj, &query); -int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCompBlock lastBlock; - int code = 0; - - if (pHinfo->compInfo.last == 0) return 0; - - // read into memory - uint64_t offset = - pHinfo->compInfoOffset + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock) + sizeof(SCompInfo); - lseek(pVnode->hfd, offset, SEEK_SET); - read(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); - assert(lastBlock.last); - - if (lastBlock.sversion != pObj->sversion) { - lseek(pVnode->lfd, lastBlock.offset, SEEK_SET); - lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END); - tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len); - - lastBlock.last = 0; - lseek(pVnode->hfd, offset, SEEK_SET); - twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock)); + if (query.slot < 0) { + pImport->slot = pInfo->commitSlot; + if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; + pImport->pos = 0; + pImport->key = 0; + dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key); + code = 0; } else { - vnodeReadLastBlockToMem(pObj, &lastBlock, data); - pHinfo->compInfo.numOfBlocks--; - code = lastBlock.numOfPoints; - } - - return code; -} - -int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - TSKEY firstKey = *((TSKEY *)payload); - struct stat filestat; - int sid, rowsBefore = 0; - - if (pVnode->nfd <= 0 || firstKey > pVnode->commitLastKey) { - if (pVnode->nfd > 0) vnodeCloseFileForImport(pObj, pHinfo); - - pVnode->commitFirstKey = firstKey; - if (vnodeOpenCommitFiles(pVnode, pObj->sid) < 0) return -1; - - fstat(pVnode->hfd, &filestat); - pHinfo->hfdSize = filestat.st_size; - pHinfo->newBlocks = 0; - pHinfo->last = 1; // by default, new blockes are at the end of block list - - lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET); - read(pVnode->hfd, pHinfo->headList, sizeof(SCompHeader) * pCfg->maxSessions); - - if (pHinfo->headList[pObj->sid].compInfoOffset > 0) { - lseek(pVnode->hfd, pHinfo->headList[pObj->sid].compInfoOffset, SEEK_SET); - if (read(pVnode->hfd, &pHinfo->compInfo, sizeof(SCompInfo)) != sizeof(SCompInfo)) { - dError("vid:%d sid:%d, failed to read compInfo from file:%s", pObj->vnode, pObj->sid, pVnode->cfn); - return -1; - } + pImport->slot = query.slot; + pImport->pos = query.pos; + pImport->key = query.key; - if (pHinfo->compInfo.uid == pObj->uid) { - pHinfo->compInfoOffset = pHinfo->headList[pObj->sid].compInfoOffset; - pHinfo->leftOffset = pHinfo->headList[pObj->sid].compInfoOffset + sizeof(SCompInfo); + if (key != query.key) { + if (order == 0) { + // since pos is the position which has smaller key, data shall be imported after it + pImport->pos++; + if (pImport->pos >= pObj->pointsPerBlock) { + pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; + pImport->pos = 0; + } } else { - pHinfo->headList[pObj->sid].compInfoOffset = 0; - } - } - - if ( pHinfo->headList[pObj->sid].compInfoOffset == 0 ) { - memset(&pHinfo->compInfo, 0, sizeof(SCompInfo)); - pHinfo->compInfo.uid = pObj->uid; - - for (sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) - if (pHinfo->headList[sid].compInfoOffset > 0) break; - - pHinfo->compInfoOffset = (sid == pCfg->maxSessions) ? pHinfo->hfdSize : pHinfo->headList[sid].compInfoOffset; - pHinfo->leftOffset = pHinfo->compInfoOffset; - } - - pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks; - lseek(pVnode->hfd, 0, SEEK_SET); - lseek(pVnode->nfd, 0, SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset); - twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); - if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR); - - if (pVnode->commitFileId < pImport->fileId) { - if (pHinfo->compInfo.numOfBlocks > 0) - pHinfo->leftOffset += pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); - - rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); - - // copy all existing compBlockInfo - lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - if (pHinfo->compInfo.numOfBlocks > 0) - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock)); - - } else if (pVnode->commitFileId == pImport->fileId) { - int slots = pImport->pos ? pImport->slot + 1 : pImport->slot; - pHinfo->leftOffset += slots * sizeof(SCompBlock); - - // check if last block is at last file, if it is, read into memory - if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks && - pHinfo->compInfo.last) { - rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); - if ( rowsBefore > 0 ) pImport->slot--; - } - - // this block will be replaced by new blocks - if (pImport->pos > 0) pHinfo->compInfo.numOfBlocks--; - - if (pImport->slot > 0) { - lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); - tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock)); + if (pImport->pos < 0) pImport->pos = 0; } - - if (pImport->slot < pHinfo->compInfo.numOfBlocks) - pHinfo->last = 0; // new blocks are not at the end of block list - - } else { - // nothing - - pHinfo->last = 0; // new blocks are not at the end of block list } + code = 0; } - return rowsBefore; + return code; } -extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints); -int vnodeImportToFile(SImportInfo *pImport); +void vnodeGetValidDataRange(int vnode, TSKEY now, TSKEY *minKey, TSKEY *maxKey) { + SVnodeObj *pVnode = vnodeList + vnode; + + int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision]; + int fid = now / delta; + *minKey = (fid - pVnode->maxFiles + 1) * delta; + *maxKey = (fid + 2) * delta - 1; + return; +} void vnodeProcessImportTimer(void *param, void *tmrId) { SImportInfo *pImport = (SImportInfo *)param; @@ -283,18 +163,18 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { return; } - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; + SMeterObj * pObj = pImport->pObj; + SVnodeObj * pVnode = &vnodeList[pObj->vnode]; SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj *pShell = pImport->pShell; + SShellObj * pShell = pImport->pShell; pImport->retry++; - //slow query will block the import operation + // slow query will block the import operation int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); if (state >= TSDB_METER_STATE_DELETING) { - dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", - pObj->vnode, pObj->sid, pObj->meterId, state); + dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", pObj->vnode, pObj->sid, pObj->meterId, + state); return; } @@ -303,7 +183,7 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { num = pObj->numOfQueries; pthread_mutex_unlock(&pVnode->vmutex); - //if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY + // if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY int32_t commitInProcess = 0; pthread_mutex_lock(&pPool->vmutex); if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) { @@ -311,9 +191,10 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); if (pImport->retry < 1000) { - dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready." - "commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId, - commitInProcess, num, state); + dTrace( + "vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready." + "commitInProcess:%d, numOfQueries:%d, state:%d", + pObj->vnode, pObj->sid, pObj->meterId, commitInProcess, num, state); taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); return; @@ -345,646 +226,1430 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { free(pImport); } -int vnodeImportToFile(SImportInfo *pImport) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - SHeadInfo headInfo; - int code = 0, col; - SCompBlock compBlock; - char * payload = pImport->payload; - int rows = pImport->rows; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; +int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, + int *pNumOfPoints, TSKEY now) { + SSubmitMsg *pSubmit = (SSubmitMsg *)cont; + SVnodeObj * pVnode = vnodeList + pObj->vnode; + int rows; + char * payload; + int code = TSDB_CODE_ACTION_IN_PROGRESS; + SCachePool *pPool = (SCachePool *)(pVnode->pCachePool); + SShellObj * pShell = (SShellObj *)param; + int pointsImported = 0; + TSKEY minKey, maxKey; - TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1))); - TSKEY firstKey = *((TSKEY *)payload); - memset(&headInfo, 0, sizeof(headInfo)); - headInfo.headList = malloc(sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM)); + rows = htons(pSubmit->numOfRows); + int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); + if (expectedLen != contLen) { + dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, + expectedLen, contLen); + return TSDB_CODE_WRONG_MSG_SIZE; + } - SData *cdata[TSDB_MAX_COLUMNS]; - char *buffer1 = - malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns); - cdata[0] = (SData *)buffer1; + // FIXME: check sversion here should not be here (Take import convert to insert case into consideration) + if (sversion != pObj->sversion) { + dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, + pObj->sversion, sversion); + return TSDB_CODE_OTHERS; + } - SData *data[TSDB_MAX_COLUMNS]; - char *buffer2 = - malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns); - data[0] = (SData *)buffer2; + // Check timestamp context. + payload = pSubmit->payLoad; + TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); + TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); + assert(firstKey <= lastKey); + vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey); + if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) { + dError( + "vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld " + "maxAllowedKey:%ld", + pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey); + return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; + } - for (col = 1; col < pObj->numOfColumns; ++col) { - cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) + EXTRA_BYTES + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) + EXTRA_BYTES + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); + // FIXME: Commit log here is invalid (Take retry into consideration) + if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { + if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; + code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); + if (code != 0) return code; } - int rowsBefore = 0; - int rowsRead = 0; - int rowsUnread = 0; - int leftRows = rows; // left number of rows of imported data - int row, rowsToWrite; - int64_t offset[TSDB_MAX_COLUMNS]; + if (firstKey > pObj->lastKey) { // Just call insert + vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); + vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); + code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); - if (pImport->pos > 0) { - for (col = 0; col < pObj->numOfColumns; ++col) - memcpy(data[col]->data, pImport->sdata[col]->data, pImport->pos * pObj->schema[col].bytes); + if (pShell) { + pShell->code = code; + pShell->numOfTotalPoints += pointsImported; + } - rowsBefore = pImport->pos; - rowsRead = pImport->pos; - rowsUnread = pImport->numOfPoints - pImport->pos; - } + vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); + } else { // trigger import + SImportInfo *pNew, import; - dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to file, firstKey:%ld lastKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); - do { - if (leftRows > 0) { - code = vnodeOpenFileForImport(pImport, payload, &headInfo, data); - if (code < 0) goto _exit; - if (code > 0) { - rowsBefore = code; - code = 0; - }; - } else { - // if payload is already imported, rows unread shall still be processed - rowsBefore = 0; - } + dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld", + pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey); + memset(&import, 0, sizeof(import)); + import.firstKey = firstKey; + import.lastKey = lastKey; + import.pObj = pObj; + import.pShell = pShell; + import.payload = payload; + import.rows = rows; - int rowsToProcess = pObj->pointsPerFileBlock - rowsBefore; - if (rowsToProcess > leftRows) rowsToProcess = leftRows; + // FIXME: mutex here seems meaningless and num here still can + // be changed + int32_t num = 0; + pthread_mutex_lock(&pVnode->vmutex); + num = pObj->numOfQueries; + pthread_mutex_unlock(&pVnode->vmutex); - for (col = 0; col < pObj->numOfColumns; ++col) { - offset[col] = data[col]->data + rowsBefore * pObj->schema[col].bytes; - } + int32_t commitInProcess = 0; - row = 0; - if (leftRows > 0) { - for (row = 0; row < rowsToProcess; ++row) { - if (*((TSKEY *)payload) > pVnode->commitLastKey) break; + pthread_mutex_lock(&pPool->vmutex); + if (((commitInProcess = pPool->commitInProcess) == 1) || + num > 0) { // mutual exclusion with read (need to change here) + pthread_mutex_unlock(&pPool->vmutex); - for (col = 0; col < pObj->numOfColumns; ++col) { - memcpy((void *)offset[col], payload, pObj->schema[col].bytes); - payload += pObj->schema[col].bytes; - offset[col] += pObj->schema[col].bytes; - } + pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); + memcpy(pNew, &import, sizeof(SImportInfo)); + pNew->signature = pNew; + int payloadLen = contLen - sizeof(SSubmitMsg); + pNew->payload = malloc(payloadLen); + pNew->opayload = pNew->payload; + memcpy(pNew->payload, payload, payloadLen); + + dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, + pObj->meterId, commitInProcess, pObj->numOfQueries); + + taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); + return 0; + } else { + pPool->commitInProcess = 1; + pthread_mutex_unlock(&pPool->vmutex); + int code = vnodeImportData(pObj, &import); + if (pShell) { + pShell->code = code; + pShell->numOfTotalPoints += import.importedRows; } } + } - leftRows -= row; - rowsToWrite = rowsBefore + row; - rowsBefore = 0; + // How about the retry? Will this also cause vnode version++? + pVnode->version++; - if (leftRows == 0 && rowsUnread > 0) { - // copy the unread - int rowsToCopy = pObj->pointsPerFileBlock - rowsToWrite; - if (rowsToCopy > rowsUnread) rowsToCopy = rowsUnread; + if (pShell) { + pShell->count--; + if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints); + } - for (col = 0; col < pObj->numOfColumns; ++col) { - int bytes = pObj->schema[col].bytes; - memcpy(data[col]->data + rowsToWrite * bytes, pImport->sdata[col]->data + rowsRead * bytes, rowsToCopy * bytes); - } + return 0; +} - rowsRead += rowsToCopy; - rowsUnread -= rowsToCopy; - rowsToWrite += rowsToCopy; - } +/* Function to search keys in a range + * + * Assumption: keys in payload are in ascending order + * + * @payload: data records, key in ascending order + * @step: bytes each record takes + * @rows: number of data records + * @skey: range start (included) + * @ekey: range end (included) + * @srows: rtype, start index of records + * @nrows: rtype, number of records in range + * + * @rtype: 0 means find data in the range + * -1 means find no data in the range + */ +static int vnodeSearchKeyInRange(char *payload, int step, int rows, TSKEY skey, TSKEY ekey, int *srow, int *nrows) { + if (rows <= 0 || KEY_AT_INDEX(payload, step, 0) > ekey || KEY_AT_INDEX(payload, step, rows - 1) < skey || skey > ekey) + return -1; - for (col = 0; col < pObj->numOfColumns; ++col) { - data[col]->len = rowsToWrite * pObj->schema[col].bytes; + int left = 0; + int right = rows - 1; + int mid; + + // Binary search the first key in payload >= skey + do { + mid = (left + right) / 2; + if (skey < KEY_AT_INDEX(payload, step, mid)) { + right = mid; + } else if (skey > KEY_AT_INDEX(payload, step, mid)) { + left = mid + 1; + } else { + break; } + } while (left < right); - compBlock.last = headInfo.last; - vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite); - twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)); + if (skey <= KEY_AT_INDEX(payload, step, mid)) { + *srow = mid; + } else { + if (mid + 1 >= rows) { + return -1; + } else { + *srow = mid + 1; + } + } - rowsToWrite = 0; - headInfo.newBlocks++; + assert(skey <= KEY_AT_INDEX(payload, step, *srow)); - } while (leftRows > 0 || rowsUnread > 0); + *nrows = 0; + for (int i = *srow; i < rows; i++) { + if (KEY_AT_INDEX(payload, step, i) <= ekey) { + (*nrows)++; + } else { + break; + } + } - if (compBlock.keyLast > pObj->lastKeyOnFile) - pObj->lastKeyOnFile = compBlock.keyLast; + if (*nrows == 0) return -1; - vnodeCloseFileForImport(pObj, &headInfo); - dTrace("vid:%d sid:%d id:%s, %d rows data are imported to file", pObj->vnode, pObj->sid, pObj->meterId, rows); + return 0; +} - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - pthread_mutex_lock(&pPool->vmutex); +int vnodeOpenMinFilesForImport(int vnode, int fid) { + char dname[TSDB_FILENAME_LEN] = "\0"; + SVnodeObj * pVnode = vnodeList + vnode; + struct stat filestat; + int minFileSize; - if (pInfo->numOfBlocks > 0) { - int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; - TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0])); + minFileSize = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM); - // data may be in commited cache, cache shall be released - if (lastKey > firstKeyInCache) { - while (slot != pInfo->commitSlot) { - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - vnodeFreeCacheBlock(pCacheBlock); - slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; - } + vnodeGetHeadDataLname(pVnode->cfn, dname, pVnode->lfn, vnode, fid); - // last slot, the uncommitted slots shall be shifted - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - int points = pCacheBlock->numOfPoints - pInfo->commitPoint; - if (points > 0) { - for (int col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memmove(pCacheBlock->offset[col], pCacheBlock->offset[col] + pObj->schema[col].bytes * pInfo->commitPoint, size); - } - } + // Open .head file + pVnode->hfd = open(pVnode->cfn, O_RDONLY); + if (pVnode->hfd < 0) { + dError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno)); + taosLogError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno)); + goto _error_open; + } - if (pInfo->commitPoint != pObj->pointsPerBlock) { - // commit point shall be set to 0 if last block is not full - pInfo->commitPoint = 0; - pCacheBlock->numOfPoints = points; - if (slot == pInfo->currentSlot) { - __sync_fetch_and_add(&pObj->freePoints, pInfo->commitPoint); - } - } else { - // if last block is full and committed - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - if (pCacheBlock->pMeterObj == pObj) { - vnodeFreeCacheBlock(pCacheBlock); - } - } - } + fstat(pVnode->hfd, &filestat); + if (filestat.st_size < minFileSize) { + dError("vid:%d, head file:%s is corrupted", vnode, pVnode->cfn); + taosLogError("vid:%d, head file:%s corrupted", vnode, pVnode->cfn); + goto _error_open; } - if (lastKey > pObj->lastKeyOnFile) pObj->lastKeyOnFile = lastKey; + // Open .data file + pVnode->dfd = open(dname, O_RDWR); + if (pVnode->dfd < 0) { + dError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno)); + taosLogError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno)); + goto _error_open; + } - pthread_mutex_unlock(&pPool->vmutex); + fstat(pVnode->dfd, &filestat); + if (filestat.st_size < TSDB_FILE_HEADER_LEN) { + dError("vid:%d, data file:%s corrupted", vnode, dname); + taosLogError("vid:%d, data file:%s corrupted", vnode, dname); + goto _error_open; + } -_exit: - tfree(headInfo.headList); - tfree(buffer1); - tfree(buffer2); - tfree(pImport->buffer); + // Open .last file + pVnode->lfd = open(pVnode->lfn, O_RDWR); + if (pVnode->lfd < 0) { + dError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno)); + taosLogError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno)); + goto _error_open; + } - return code; -} + fstat(pVnode->lfd, &filestat); + if (filestat.st_size < TSDB_FILE_HEADER_LEN) { + dError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn); + taosLogError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn); + goto _error_open; + } -int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SVnodeCfg *pCfg = &pVnode->cfg; - int code = -1; - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; - int slot, pos, row, col, points, tpoints; + return 0; - char *data[TSDB_MAX_COLUMNS], *current[TSDB_MAX_COLUMNS]; - int slots = pInfo->unCommittedBlocks + 1; - int trows = slots * pObj->pointsPerBlock + rows; // max rows in buffer - int tsize = (trows / pObj->pointsPerBlock + 1) * pCfg->cacheBlockSize; - TSKEY firstKey = *((TSKEY *)payload); - TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1))); +_error_open: + if (pVnode->hfd > 0) close(pVnode->hfd); + pVnode->hfd = 0; - if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { - dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->freePoints); - pImport->importedRows = 0; - pImport->commit = 1; - code = TSDB_CODE_ACTION_IN_PROGRESS; - return code; - } + if (pVnode->dfd > 0) close(pVnode->dfd); + pVnode->dfd = 0; - dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); + if (pVnode->lfd > 0) close(pVnode->lfd); + pVnode->lfd = 0; - pthread_mutex_lock(&(pVnode->vmutex)); - if (firstKey < pVnode->firstKey) pVnode->firstKey = firstKey; - pthread_mutex_unlock(&(pVnode->vmutex)); + return -1; +} - char *buffer = malloc(tsize); // buffer to hold unCommitted data plus import data - data[0] = buffer; - current[0] = data[0]; - for (col = 1; col < pObj->numOfColumns; ++col) { - data[col] = data[col - 1] + trows * pObj->schema[col - 1].bytes; - current[col] = data[col]; +/* Function to open .t file and sendfile the first part + */ +int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid) { + char dHeadName[TSDB_FILENAME_LEN] = "\0"; + SVnodeObj * pVnode = vnodeList + pObj->vnode; + struct stat filestat; + int sid; + + // cfn: .head + if (readlink(pVnode->cfn, dHeadName, TSDB_FILENAME_LEN) < 0) return -1; + + size_t len = strlen(dHeadName); + // switch head name + switch (dHeadName[len - 1]) { + case '0': + dHeadName[len - 1] = '1'; + break; + case '1': + dHeadName[len - 1] = '0'; + break; + default: + dError("vid: %d, fid: %d, head target filename not end with 0 or 1", pVnode->vnode, fid); + return -1; } - // write import data into buffer first - for (row = 0; row < rows; ++row) { - for (col = 0; col < pObj->numOfColumns; ++col) { - memcpy(current[col], payload, pObj->schema[col].bytes); - payload += pObj->schema[col].bytes; - current[col] += pObj->schema[col].bytes; - } + vnodeGetHeadTname(pVnode->nfn, NULL, pVnode->vnode, fid); + symlink(dHeadName, pVnode->nfn); + + pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + if (pVnode->nfd < 0) { + dError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno)); + taosLogError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno)); + return -1; } - // copy the overwritten data into buffer - tpoints = rows; - pos = pImport->pos; - slot = pImport->slot; - while (1) { - points = pInfo->cacheBlocks[slot]->numOfPoints - pos; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(current[col], pInfo->cacheBlocks[slot]->offset[col] + pos * pObj->schema[col].bytes, size); - current[col] += size; - } - pos = 0; - tpoints += points; + fstat(pVnode->hfd, &filestat); + pHandle->hfSize = filestat.st_size; - if (slot == pInfo->currentSlot) break; - slot = (slot + 1) % pInfo->maxBlocks; + // Find the next sid whose compInfoOffset > 0 + for (sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; sid++) { + if (pHandle->pHeader[sid].compInfoOffset > 0) break; } - for (col = 0; col < pObj->numOfColumns; ++col) current[col] = data[col]; - pos = pImport->pos; - - // write back to existing slots first - slot = pImport->slot; - while (1) { - points = (tpoints > pObj->pointsPerBlock - pos) ? pObj->pointsPerBlock - pos : tpoints; - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size); - current[col] += size; - } - pCacheBlock->numOfPoints = points + pos; - pos = 0; - tpoints -= points; + pHandle->nextNo0Offset = (sid == pVnode->cfg.maxSessions) ? pHandle->hfSize : pHandle->pHeader[sid].compInfoOffset; - if (slot == pInfo->currentSlot) break; - slot = (slot + 1) % pInfo->maxBlocks; + // FIXME: sendfile the original part + // TODO: Here, we need to take the deleted table case in consideration, this function + // just assume the case is handled before calling this function + if (pHandle->pHeader[pObj->sid].compInfoOffset > 0) { + pHandle->compInfoOffset = pHandle->pHeader[pObj->sid].compInfoOffset; + } else { + pHandle->compInfoOffset = pHandle->nextNo0Offset; } - // allocate new cache block if there are still data left - while (tpoints > 0) { - pImport->commit = vnodeAllocateCacheBlock(pObj); - if (pImport->commit < 0) goto _exit; - points = (tpoints > pObj->pointsPerBlock) ? pObj->pointsPerBlock : tpoints; - SCacheBlock *pCacheBlock = pInfo->cacheBlocks[pInfo->currentSlot]; - for (col = 0; col < pObj->numOfColumns; ++col) { - int size = points * pObj->schema[col].bytes; - memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size); - current[col] += size; - } - tpoints -= points; - pCacheBlock->numOfPoints = points; + assert(pHandle->compInfoOffset <= pHandle->hfSize); + + lseek(pVnode->hfd, 0, SEEK_SET); + lseek(pVnode->nfd, 0, SEEK_SET); + if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->compInfoOffset) < 0) { + // TODO : deal with ERROR here } - code = 0; - __sync_fetch_and_sub(&pObj->freePoints, rows); - dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows); + // Leave a SCompInfo space here + lseek(pVnode->nfd, sizeof(SCompInfo), SEEK_CUR); -_exit: - free(buffer); - return code; + return 0; } -int vnodeFindKeyInFile(SImportInfo *pImport, int order) { - SMeterObj *pObj = pImport->pObj; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - int code = -1; - SQuery query; - SColumnInfoEx colList[TSDB_MAX_COLUMNS] = {0}; +typedef enum { DATA_LOAD_TIMESTAMP = 0x1, DATA_LOAD_OTHER_DATA = 0x2 } DataLoadMod; - TSKEY key = order ? pImport->firstKey : pImport->lastKey; - memset(&query, 0, sizeof(query)); - query.order.order = order; - query.skey = key; - query.ekey = order ? INT64_MAX : 0; - query.colList = colList; - query.numOfCols = pObj->numOfColumns; - - for (int16_t i = 0; i < pObj->numOfColumns; ++i) { - colList[i].data.colId = pObj->schema[i].colId; - colList[i].data.bytes = pObj->schema[i].bytes; - colList[i].data.type = pObj->schema[i].type; - - colList[i].colIdx = i; - colList[i].colIdxInBuf = i; - } - - int ret = vnodeSearchPointInFile(pObj, &query); - - if (ret >= 0) { - if (query.slot < 0) { - pImport->slot = 0; - pImport->pos = 0; - pImport->key = 0; - pImport->fileId = pVnode->fileId - pVnode->numOfFiles + 1; - dTrace("vid:%d sid:%d id:%s, import to head of file", pObj->vnode, pObj->sid, pObj->meterId); - code = 0; - } else if (query.slot >= 0) { - code = 0; - pImport->slot = query.slot; - pImport->pos = query.pos; - pImport->key = query.key; - pImport->fileId = query.fileId; - SCompBlock *pBlock = &query.pBlock[query.slot]; - pImport->numOfPoints = pBlock->numOfPoints; - - if (pImport->key != key) { - if (order == 0) { - pImport->pos++; - - if (pImport->pos >= pBlock->numOfPoints) { - pImport->slot++; - pImport->pos = 0; - } - } else { - if (pImport->pos < 0) pImport->pos = 0; - } - } +/* Function to load a block data at the requirement of mod + */ +static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod) { + size_t size; + int code = 0; + SCompBlock *pBlock = pHandle->pBlocks + blockId; - if (pImport->key != key && pImport->pos > 0) { - if ( pObj->sversion != pBlock->sversion ) { - dError("vid:%d sid:%d id:%s, import sversion not matached, expected:%d received:%d", pObj->vnode, pObj->sid, - pBlock->sversion, pObj->sversion); - code = TSDB_CODE_OTHERS; - } else { - pImport->offset = pBlock->offset; - - pImport->buffer = - malloc(pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + sizeof(SData) * pObj->numOfColumns); - pImport->sdata[0] = (SData *)pImport->buffer; - for (int col = 1; col < pObj->numOfColumns; ++col) - pImport->sdata[col] = (SData *)(((char *)pImport->sdata[col - 1]) + sizeof(SData) + - pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); - - code = vnodeReadCompBlockToMem(pObj, &query, pImport->sdata); - if (code < 0) { - code = -code; - tfree(pImport->buffer); - } - } + assert(pBlock->sversion == pObj->sversion); + + SVnodeObj *pVnode = vnodeList + pObj->vnode; + + int dfd = pBlock->last ? pVnode->lfd : pVnode->dfd; + + if (pHandle->blockId != blockId) { + pHandle->blockId = blockId; + pHandle->blockLoadState = 0; + } + + if (pHandle->blockLoadState == 0){ // Reload pField + size = sizeof(SField) * pBlock->numOfCols + sizeof(TSCKSUM); + if (pHandle->pFieldSize < size) { + pHandle->pField = (SField *)realloc((void *)(pHandle->pField), size); + if (pHandle->pField == NULL) { + dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, + pObj->meterId, size); + return -1; } + pHandle->pFieldSize = size; } - } else { - dError("vid:%d sid:%d id:%s, file is corrupted, import failed", pObj->vnode, pObj->sid, pObj->meterId); - code = -ret; - } - tclose(query.hfd); - tclose(query.dfd); - tclose(query.lfd); - vnodeFreeFields(&query); - tfree(query.pBlock); + lseek(dfd, pBlock->offset, SEEK_SET); + if (read(dfd, (void *)(pHandle->pField), pHandle->pFieldSize) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to read data file, size:%ld reason:%s", pVnode->vnode, pObj->sid, + pObj->meterId, pHandle->pFieldSize, strerror(errno)); + return -1; + } - return code; -} + if (!taosCheckChecksumWhole((uint8_t *)(pHandle->pField), pHandle->pFieldSize)) { + dError("vid:%d sid:%d meterId:%s, data file %s is broken since checksum mismatch", pVnode->vnode, pObj->sid, + pObj->meterId, pVnode->lfn); + return -1; + } + } -int vnodeFindKeyInCache(SImportInfo *pImport, int order) { - SMeterObj *pObj = pImport->pObj; - int code = 0; - SQuery query; - SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; + { // Allocate necessary buffer + size = pObj->bytesPerPoint * pObj->pointsPerFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns; + if (pHandle->buffer == NULL) { + pHandle->buffer = malloc(size); + if (pHandle->buffer == NULL) { + dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, + pObj->meterId, size); + return -1; + } - TSKEY key = order ? pImport->firstKey : pImport->lastKey; - memset(&query, 0, sizeof(query)); - query.order.order = order; - query.skey = key; - query.ekey = order ? pImport->lastKey : pImport->firstKey; - vnodeSearchPointInCache(pObj, &query); + // TODO: Init data + pHandle->data[0] = (SData *)(pHandle->buffer); + for (int col = 1; col < pObj->numOfColumns; col++) { + pHandle->data[col] = (SData *)((char *)(pHandle->data[col - 1]) + sizeof(SData) + EXTRA_BYTES + + pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); + } + } - if (query.slot < 0) { - pImport->slot = pInfo->commitSlot; - if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; - pImport->pos = 0; - pImport->key = 0; - dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key); - code = 0; - } else { - pImport->slot = query.slot; - pImport->pos = query.pos; - pImport->key = query.key; + if (pHandle->temp == NULL) { + pHandle->temp = malloc(size); + if (pHandle->temp == NULL) { + dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, + pObj->meterId, size); + return -1; + } + } - if (key != query.key) { - if (order == 0) { - // since pos is the position which has smaller key, data shall be imported after it - pImport->pos++; - if (pImport->pos >= pObj->pointsPerBlock) { - pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; - pImport->pos = 0; - } - } else { - if (pImport->pos < 0) pImport->pos = 0; + if (pHandle->tempBuffer == NULL) { + pHandle->tempBufferSize = pObj->maxBytes + EXTRA_BYTES; + pHandle->tempBuffer = malloc(pHandle->tempBufferSize); + if (pHandle->tempBuffer == NULL) { + dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, + pObj->meterId, pHandle->tempBufferSize); + return -1; } } - code = 0; } - return code; -} + if ((loadMod & DATA_LOAD_TIMESTAMP) && + (~(pHandle->blockLoadState & DATA_LOAD_TIMESTAMP))) { // load only timestamp part + code = + vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX, + pHandle->data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY) * pBlock->numOfPoints, + pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize); -int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; + if (code != 0) return -1; + pHandle->blockLoadState |= DATA_LOAD_TIMESTAMP; + } - code = vnodeFindKeyInCache(pImport, 1); - if (code != 0) return code; + if ((loadMod & DATA_LOAD_OTHER_DATA) && (~(pHandle->blockLoadState & DATA_LOAD_OTHER_DATA))) { // load other columns + for (int col = 1; col < pBlock->numOfCols; col++) { + code = vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data, + pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer, + pHandle->tempBufferSize); + if (code != 0) return -1; + } - if (pImport->key != pImport->firstKey) { - rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); - pImport->importedRows = rows; - code = vnodeImportToCache(pImport, payload, rows); - } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to cache", pObj->vnode, pObj->sid, pObj->meterId); + pHandle->blockLoadState |= DATA_LOAD_OTHER_DATA; } - return code; + return 0; } -int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; +static int vnodeCloseImportFiles(SMeterObj *pObj, SImportHandle *pHandle) { + SVnodeObj *pVnode = vnodeList + pObj->vnode; + char dpath[TSDB_FILENAME_LEN] = "\0"; + SCompInfo compInfo; + __off_t offset = 0; + + if (pVnode->nfd > 0) { + offset = lseek(pVnode->nfd, 0, SEEK_CUR); + assert(offset == pHandle->nextNo0Offset + pHandle->driftOffset); + + { // Write the SCompInfo part + compInfo.uid = pObj->uid; + compInfo.last = pHandle->last; + compInfo.numOfBlocks = pHandle->newNumOfBlocks + pHandle->oldNumOfBlocks; + compInfo.delimiter = TSDB_VNODE_DELIMITER; + taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo)); + + lseek(pVnode->nfd, pHandle->compInfoOffset, SEEK_SET); + if (twrite(pVnode->nfd, (void *)(&compInfo), sizeof(SCompInfo)) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to wirte SCompInfo, reason:%s", pObj->vnode, pObj->sid, pObj->meterId, + strerror(errno)); + return -1; + } + } - code = vnodeFindKeyInFile(pImport, 1); - if (code != 0) return code; + // Write the rest of the SCompBlock part + if (pHandle->hfSize > pHandle->nextNo0Offset) { + lseek(pVnode->nfd, 0, SEEK_END); + lseek(pVnode->hfd, pHandle->nextNo0Offset, SEEK_SET); + if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->hfSize - pHandle->nextNo0Offset) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to sendfile, size:%ld, reason:%s", pObj->vnode, pObj->sid, + pObj->meterId, pHandle->hfSize - pHandle->nextNo0Offset, strerror(errno)); + return -1; + } + } - if (pImport->key != pImport->firstKey) { - pImport->payload = payload; - pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); - pImport->importedRows = pImport->rows; - code = vnodeImportToFile(pImport); - } else { - dTrace("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId); + // Write SCompHeader part + pHandle->pHeader[pObj->sid].compInfoOffset = pHandle->compInfoOffset; + for (int sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; ++sid) { + if (pHandle->pHeader[sid].compInfoOffset > 0) { + pHandle->pHeader[sid].compInfoOffset += pHandle->driftOffset; + } + } + + taosCalcChecksumAppend(0, (uint8_t *)(pHandle->pHeader), pHandle->pHeaderSize); + lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET); + if (twrite(pVnode->nfd, (void *)(pHandle->pHeader), pHandle->pHeaderSize) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to wirte SCompHeader part, size:%ld, reason:%s", pObj->vnode, pObj->sid, + pObj->meterId, pHandle->pHeaderSize, strerror(errno)); + return -1; + } } - return code; -} + // Close opened files + close(pVnode->dfd); + pVnode->dfd = 0; -int vnodeImportWholeToFile(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; + close(pVnode->hfd); + pVnode->hfd = 0; - code = vnodeFindKeyInFile(pImport, 0); - if (code != 0) return code; + close(pVnode->lfd); + pVnode->lfd = 0; - if (pImport->key != pImport->lastKey) { - pImport->payload = payload; - pImport->rows = vnodeGetImportEndPart(pObj, payload, rows, &pImport->payload, pImport->key); - pImport->importedRows = pImport->rows; - code = vnodeImportToFile(pImport); - } else { - code = vnodeImportStartToFile(pImport, payload, rows); + if (pVnode->nfd > 0) { + close(pVnode->nfd); + pVnode->nfd = 0; + + readlink(pVnode->cfn, dpath, TSDB_FILENAME_LEN); + rename(pVnode->nfn, pVnode->cfn); + remove(dpath); } - return code; + return 0; } -int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { - int code = 0; - SMeterObj *pObj = pImport->pObj; +void vnodeConvertRowsToCols(SMeterObj *pObj, const char *payload, int rows, SData *data[], int rowOffset) { + int sdataRow; + int offset; - code = vnodeFindKeyInCache(pImport, 0); - if (code != 0) return code; + for (int row = 0; row < rows; ++row) { + sdataRow = row + rowOffset; + offset = 0; + for (int col = 0; col < pObj->numOfColumns; ++col) { + memcpy(data[col]->data + sdataRow * pObj->schema[col].bytes, payload + pObj->bytesPerPoint * row + offset, + pObj->schema[col].bytes); - if (pImport->key != pImport->lastKey) { - char *pStart; - if ( pImport->key < pObj->lastKeyOnFile ) pImport->key = pObj->lastKeyOnFile; - rows = vnodeGetImportEndPart(pObj, payload, rows, &pStart, pImport->key); - pImport->importedRows = rows; - code = vnodeImportToCache(pImport, pStart, rows); - } else { - if (pImport->firstKey > pObj->lastKeyOnFile) { - code = vnodeImportStartToCache(pImport, payload, rows); - } else if (pImport->firstKey < pObj->lastKeyOnFile) { - code = vnodeImportStartToFile(pImport, payload, rows); - } else { // firstKey == pObj->lastKeyOnFile - dTrace("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId); + offset += pObj->schema[col].bytes; } } +} - return code; +// TODO : Check the correctness +int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) { + int numOfFiles = 0, fileId, filesAdded = 0; + int vnode = pVnode->vnode; + SVnodeCfg *pCfg = &(pVnode->cfg); + + if (pVnode->lastKeyOnFile == 0) { + if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10; + pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; + pVnode->lastKeyOnFile = (long)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1; + pVnode->numOfFiles = 1; + if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1; + } + + numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; + if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1; + + dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode, + pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles); + + if (numOfFiles >= pVnode->numOfFiles) { + // create empty header files backward + filesAdded = numOfFiles - pVnode->numOfFiles + 1; + for (int i = 0; i < filesAdded; ++i) { + fileId = pVnode->fileId - pVnode->numOfFiles - i; + if (vnodeCreateEmptyCompFile(vnode, fileId) < 0) return -1; + } + } else if (numOfFiles < 0) { + // create empty header files forward + pVnode->fileId++; + if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1; + pVnode->lastKeyOnFile += (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; + filesAdded = 1; + numOfFiles = 0; // hacker way + } + + fileId = pVnode->fileId - numOfFiles; + pVnode->commitLastKey = + pVnode->lastKeyOnFile - (long)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; + pVnode->commitFirstKey = pVnode->commitLastKey - (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1; + pVnode->commitFileId = fileId; + pVnode->numOfFiles = pVnode->numOfFiles + filesAdded; + + return 0; } -int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, - int *pNumOfPoints, TSKEY now) { - SSubmitMsg *pSubmit = (SSubmitMsg *)cont; - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - int rows; - char *payload; - int code = TSDB_CODE_ACTION_IN_PROGRESS; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj *pShell = (SShellObj *)param; - int pointsImported = 0; +static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int rows, int fid) { + SMeterObj * pObj = (SMeterObj *)(pImport->pObj); + SVnodeObj * pVnode = vnodeList + pObj->vnode; + SImportHandle importHandle; + size_t size = 0; + SData * data[TSDB_MAX_COLUMNS]; + char * buffer = NULL; + SData * cdata[TSDB_MAX_COLUMNS]; + char * cbuffer = NULL; + SCompBlock compBlock; + TSCKSUM checksum = 0; + int pointsImported = 0; + + TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision]; + TSKEY minFileKey = fid * delta; + TSKEY maxFileKey = minFileKey + delta - 1; + TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); + TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); + + assert(firstKey >= minFileKey && firstKey <= maxFileKey && lastKey >= minFileKey && lastKey <= maxFileKey); + + // create neccessary files + pVnode->commitFirstKey = firstKey; + if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1; + + assert(pVnode->commitFileId == fid); + + // Open least files to import .head(hfd) .data(dfd) .last(lfd) + if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return -1; + + memset(&importHandle, 0, sizeof(SImportHandle)); + + { // Load SCompHeader part from .head file + importHandle.pHeaderSize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM); + importHandle.pHeader = (SCompHeader *)malloc(importHandle.pHeaderSize); + if (importHandle.pHeader == NULL) { + dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, + pObj->meterId, importHandle.pHeaderSize); + goto _error_merge; + } - rows = htons(pSubmit->numOfRows); - int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); - if (expectedLen != contLen) { - dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, - expectedLen, contLen); - return TSDB_CODE_WRONG_MSG_SIZE; + lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET); + if (read(pVnode->hfd, (void *)(importHandle.pHeader), importHandle.pHeaderSize) < importHandle.pHeaderSize) { + dError("vid: %d, sid: %d, meterId: %s, fid: %d failed to read SCompHeader part, reason:%s", pObj->vnode, + pObj->sid, pObj->meterId, fid, strerror(errno)); + goto _error_merge; + } + + if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pHeader), importHandle.pHeaderSize)) { + dError("vid: %d, sid: %d, meterId: %s, fid: %d SCompHeader part is broken", pObj->vnode, pObj->sid, pObj->meterId, + fid); + goto _error_merge; + } } - if (sversion != pObj->sversion) { - dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->sversion, sversion); - return TSDB_CODE_OTHERS; + { // Initialize data[] and cdata[], which is used to hold data to write to data file + size = pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns; + + buffer = (char *)malloc(size); + if (buffer == NULL) { + dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, + pObj->meterId, size); + goto _error_merge; + } + + cbuffer = (char *)malloc(size); + if (cbuffer == NULL) { + dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, + pObj->meterId, size); + goto _error_merge; + } + + data[0] = (SData *)buffer; + cdata[0] = (SData *)cbuffer; + + for (int col = 1; col < pObj->numOfColumns; col++) { + data[col] = (SData *)((char *)data[col - 1] + sizeof(SData) + EXTRA_BYTES + + pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); + cdata[col] = (SData *)((char *)cdata[col - 1] + sizeof(SData) + EXTRA_BYTES + + pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes); + } } - payload = pSubmit->payLoad; - TSKEY firstKey = *(TSKEY *)payload; - TSKEY lastKey = *(TSKEY *)(payload + pObj->bytesPerPoint*(rows-1)); - int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; - TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision]; - TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 1; - if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) { - dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is out of range, rows:%d firstKey:%lld lastKey:%lld minAllowedKey:%lld maxAllowedKey:%lld", - pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, rows, firstKey, lastKey, minAllowedKey, maxAllowedKey); - return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; + if (importHandle.pHeader[pObj->sid].compInfoOffset == 0) { // No data in this file, just write it + _write_empty_point: + if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) { + goto _error_merge; + } + importHandle.oldNumOfBlocks = 0; + importHandle.driftOffset += sizeof(SCompInfo); + + for (int rowsWritten = 0; rowsWritten < rows;) { + int rowsToWrite = MIN(pVnode->cfg.rowsInFileBlock, (rows - rowsWritten) /* the rows left */); + vnodeConvertRowsToCols(pObj, payload + rowsWritten * pObj->bytesPerPoint, rowsToWrite, data, 0); + pointsImported += rowsToWrite; + + // TODO : Write the block to the file + compBlock.last = 1; + if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite) < 0) { + // TODO: deal with ERROR here + } + + importHandle.last = compBlock.last; + + checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock)); + twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)); + importHandle.newNumOfBlocks++; + importHandle.driftOffset += sizeof(SCompBlock); + + rowsWritten += rowsToWrite; + } + twrite(pVnode->nfd, &checksum, sizeof(TSCKSUM)); + importHandle.driftOffset += sizeof(TSCKSUM); + } else { // Else if there are old data in this file. + { // load SCompInfo and SCompBlock part + lseek(pVnode->hfd, importHandle.pHeader[pObj->sid].compInfoOffset, SEEK_SET); + if (read(pVnode->hfd, (void *)(&(importHandle.compInfo)), sizeof(SCompInfo)) < sizeof(SCompInfo)) { + dError("vid:%d sid:%d meterId:%s, failed to read .head file, reason:%s", pVnode->vnode, pObj->sid, + pObj->meterId, strerror(errno)); + goto _error_merge; + } + + if ((importHandle.compInfo.delimiter != TSDB_VNODE_DELIMITER) || + (!taosCheckChecksumWhole((uint8_t *)(&(importHandle.compInfo)), sizeof(SCompInfo)))) { + dError("vid:%d sid:%d meterId:%s, .head file %s is broken, delemeter:%x", pVnode->vnode, pObj->sid, + pObj->meterId, pVnode->cfn, importHandle.compInfo.delimiter); + goto _error_merge; + } + + { // Check the context of SCompInfo part + if (importHandle.compInfo.uid != pObj->uid) { // The data belongs to the other meter + goto _write_empty_point; + } + } + + importHandle.oldNumOfBlocks = importHandle.compInfo.numOfBlocks; + importHandle.last = importHandle.compInfo.last; + + size = sizeof(SCompBlock) * importHandle.compInfo.numOfBlocks + sizeof(TSCKSUM); + importHandle.pBlocks = (SCompBlock *)malloc(size); + if (importHandle.pBlocks == NULL) { + dError("vid:%d sid:%d meterId:%s, failed to allocate importHandle.pBlock, size:%ul", pVnode->vnode, pObj->sid, + pObj->meterId, size); + goto _error_merge; + } + + if (read(pVnode->hfd, (void *)(importHandle.pBlocks), size) < size) { + dError("vid:%d sid:%d meterId:%s, failed to read importHandle.pBlock, reason:%s", pVnode->vnode, pObj->sid, + pObj->meterId, strerror(errno)); + goto _error_merge; + } + + if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pBlocks), size)) { + dError("vid:%d sid:%d meterId:%s, pBlock part is broken in %s", pVnode->vnode, pObj->sid, pObj->meterId, + pVnode->cfn); + goto _error_merge; + } + } + + /* Now we have _payload_, we have _importHandle.pBlocks_, just merge payload into the importHandle.pBlocks + * + * Input: payload, pObj->bytesPerBlock, rows, importHandle.pBlocks + */ + { + int payloadIter = 0; + SBlockIter blockIter = {0, 0, 0, 0}; + + while (1) { + if (payloadIter >= rows) { // payload end, break + // write the remaining blocks to the file + if (pVnode->nfd > 0) { + int blocksLeft = importHandle.compInfo.numOfBlocks - blockIter.oslot; + if (blocksLeft > 0) { + checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot), + sizeof(SCompBlock) * blocksLeft); + if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot), + sizeof(SCompBlock) * blocksLeft) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, + pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * blocksLeft, strerror(errno)); + goto _error_merge; + } + } + + if (twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid, + pObj->meterId, pVnode->nfn, sizeof(TSCKSUM), strerror(errno)); + goto _error_merge; + } + } + break; + } + + if (blockIter.slot >= importHandle.compInfo.numOfBlocks) { // blocks end, break + assert(false); + + // Should never come here + int rowsLeft = rows - payloadIter; + if (pVnode->nfd > 0 && rowsLeft > 0) { + // TODO : Convert into while here + vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, rowsLeft, data, 0); + pointsImported++; + + assert(importHandle.last == 0); + + compBlock.last = 1; + if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rows - payloadIter) < 0) { + // TODO : + } + + checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock)); + importHandle.newNumOfBlocks++; + importHandle.driftOffset += sizeof(SCompBlock); + importHandle.last = compBlock.last; + twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)); + twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)); + } + break; + } + + TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); + + { // Binary search the (slot, pos) which is >= key as well as nextKey + int left = blockIter.slot; + int right = importHandle.compInfo.numOfBlocks - 1; + TSKEY minKey = importHandle.pBlocks[left].keyFirst; + TSKEY maxKey = importHandle.pBlocks[right].keyLast; + + assert(minKey <= maxKey); + + if (key < minKey) { // Case 1. write just ahead the blockIter.slot + blockIter.slot = left; + blockIter.pos = 0; + blockIter.nextKey = minKey; + } else if (key > maxKey) { // Case 2. write to the end + if (importHandle.pBlocks[right].last) { // Case 2.1 last block in .last file, need to merge + assert(importHandle.last != 0); + importHandle.last = 0; + blockIter.slot = right; + blockIter.pos = importHandle.pBlocks[right].numOfPoints; + } else { // Case 2.2 just write after the last block + blockIter.slot = right + 1; + blockIter.pos = 0; + } + blockIter.nextKey = maxFileKey + 1; + } else { // Case 3. need to search the block for slot and pos + if (key == minKey || key == maxKey) { + payloadIter++; + continue; + } + + // Here: minKey < key < maxKey + + int mid; + TSKEY blockMinKey; + TSKEY blockMaxKey; + + // Binary search the slot + do { + mid = (left + right) / 2; + blockMinKey = importHandle.pBlocks[mid].keyFirst; + blockMaxKey = importHandle.pBlocks[mid].keyLast; + + assert(blockMinKey <= blockMaxKey); + + if (key < blockMinKey) { + right = mid; + } else if (key > blockMaxKey) { + left = mid + 1; + } else { /* blockMinKey <= key <= blockMaxKey */ + break; + } + } while (left < right); + + if (key == blockMinKey || key == blockMaxKey) { // duplicate key + payloadIter++; + continue; + } + + // Get the slot + if (key > blockMaxKey) { /* pos = 0 or pos = ? */ + blockIter.slot = mid + 1; + } else { /* key < blockMinKey (pos = 0) || (key > blockMinKey && key < blockMaxKey) (pos=?) */ + blockIter.slot = mid; + } + + // Get the pos + assert(blockIter.slot < importHandle.compInfo.numOfBlocks); + + if (key == importHandle.pBlocks[blockIter.slot].keyFirst || + key == importHandle.pBlocks[blockIter.slot].keyLast) { + payloadIter++; + continue; + } + + assert(key < importHandle.pBlocks[blockIter.slot].keyLast); + + /* */ + if (key < importHandle.pBlocks[blockIter.slot].keyFirst) { + blockIter.pos = 0; + blockIter.nextKey = importHandle.pBlocks[blockIter.slot].keyFirst; + } else { + SCompBlock *pBlock = importHandle.pBlocks + blockIter.slot; + if (pBlock->sversion != pObj->sversion) { /*TODO*/ + } + if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP) < 0) { + } + int pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])( + importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC); + assert(pos != 0); + if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) { + payloadIter++; + continue; + } + + blockIter.pos = pos; + blockIter.nextKey = (blockIter.slot + 1 < importHandle.compInfo.numOfBlocks) + ? importHandle.pBlocks[blockIter.slot + 1].keyFirst + : maxFileKey + 1; + // Need to merge with this block + if (importHandle.pBlocks[blockIter.slot].last) { // this is to merge with the last block + assert((blockIter.slot == (importHandle.compInfo.numOfBlocks - 1))); + importHandle.last = 0; + } + } + } + } + + // Open the new .t file if not opened yet. + if (pVnode->nfd <= 0) { + if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) { + goto _error_merge; + } + } + + if (blockIter.slot > blockIter.oslot) { // write blocks in range [blockIter.oslot, blockIter.slot) to .t file + checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot), + sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot)); + if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot), + sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot)) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid, + pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot), + strerror(errno)); + goto _error_merge; + } + + blockIter.oslot = blockIter.slot; + } + + if (blockIter.pos == 0) { // No need to merge + // copy payload part to data + int rowOffset = 0; + for (; payloadIter < rows; rowOffset++) { + if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) break; + + vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset); + pointsImported++; + payloadIter++; + } + + // write directly to .data file + compBlock.last = 0; + if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) { + // TODO: Deal with the ERROR here + } + + checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock)); + if (twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)) < 0) { + // TODO : deal with the ERROR here + } + importHandle.newNumOfBlocks++; + importHandle.driftOffset += sizeof(SCompBlock); + } else { // Merge block and payload from payloadIter + + if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, + DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA) < 0) { // Load neccessary blocks + goto _error_merge; + } + + importHandle.oldNumOfBlocks--; + importHandle.driftOffset -= sizeof(SCompBlock); + + int rowOffset = blockIter.pos; // counter for data + + // Copy the front part + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy((void *)(data[col]->data), (void *)(importHandle.data[col]->data), + pObj->schema[col].bytes * blockIter.pos); + } + + // Merge part + while (1) { + if (rowOffset >= pVnode->cfg.rowsInFileBlock) { // data full in a block to commit + compBlock.last = 0; + if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) { + // TODO : deal with the ERROR here + } + + checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock)); + if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, + pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno)); + goto _error_merge; + } + importHandle.newNumOfBlocks++; + importHandle.driftOffset += sizeof(SCompBlock); + rowOffset = 0; + } + + if ((payloadIter >= rows || KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) && + blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) + break; + + if (payloadIter >= rows || + KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) { // payload end + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes, + importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos, pObj->schema[col].bytes); + } + blockIter.pos++; + rowOffset++; + } else if (blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) { // block end + vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset); + pointsImported++; + payloadIter++; + rowOffset++; + } else { + if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) == + KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), + blockIter.pos)) { // duplicate key + payloadIter++; + continue; + } else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < + KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), + blockIter.pos)) { + vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset); + pointsImported++; + payloadIter++; + rowOffset++; + } else { + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes, + importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos, + pObj->schema[col].bytes); + } + blockIter.pos++; + rowOffset++; + } + } + } + if (rowOffset > 0) { // data full in a block to commit + compBlock.last = 0; + if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) { + // TODO : deal with the ERROR here + } + + checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock)); + if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) { + dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid, + pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno)); + goto _error_merge; + } + importHandle.newNumOfBlocks++; + importHandle.driftOffset += sizeof(SCompBlock); + rowOffset = 0; + } + + blockIter.slot++; + blockIter.oslot = blockIter.slot; + } + } + } } - // forward to peers - if (pShell && pVnode->cfg.replications > 1) { - code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_IMPORT, sversion); - if (code != 0) return code; + // Write the SCompInfo part + if (vnodeCloseImportFiles(pObj, &importHandle) < 0) { + goto _error_merge; } - if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { - if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; - code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); - if (code != 0) return code; + pImport->importedRows += pointsImported; + + // TODO: free the allocated memory + tfree(buffer); + tfree(cbuffer); + tfree(importHandle.pHeader); + tfree(importHandle.pBlocks); + tfree(importHandle.pField); + tfree(importHandle.buffer); + tfree(importHandle.temp); + tfree(importHandle.tempBuffer); + + return 0; + +_error_merge: + tfree(buffer); + tfree(cbuffer); + tfree(importHandle.pHeader); + tfree(importHandle.pBlocks); + tfree(importHandle.pField); + tfree(importHandle.buffer); + tfree(importHandle.temp); + tfree(importHandle.tempBuffer); + + close(pVnode->dfd); + pVnode->dfd = 0; + + close(pVnode->hfd); + pVnode->hfd = 0; + + close(pVnode->lfd); + pVnode->lfd = 0; + + if (pVnode->nfd > 0) { + close(pVnode->nfd); + pVnode->nfd = 0; + remove(pVnode->nfn); } - if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); - code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); + return -1; +} - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += pointsImported; - } +#define FORWARD_ITER(iter, step, slotLimit, posLimit) \ + { \ + if ((iter.pos) + (step) < (posLimit)) { \ + (iter.pos) = (iter.pos) + (step); \ + } else { \ + (iter.pos) = 0; \ + (iter.slot) = ((iter.slot) + 1) % (slotLimit); \ + } \ + } - vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); +int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) { + SCacheInfo *pInfo = (SCacheInfo *)(pMeter->pCache); + int slot = 0; + int pos = 0; + + if (pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints == pMeter->pointsPerBlock) { + slot = (pInfo->currentSlot + 1) % (pInfo->maxBlocks); + pos = 0; } else { - SImportInfo *pNew, import; + slot = pInfo->currentSlot; + pos = pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints; + } + return ((iter.slot == slot) && (iter.pos == pos)); +} - dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows); - memset(&import, 0, sizeof(import)); - import.firstKey = *((TSKEY *)(payload)); - import.lastKey = *((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)); - import.pObj = pObj; - import.pShell = pShell; - import.payload = payload; - import.rows = rows; +int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int rows) { + SMeterObj * pObj = pImport->pObj; + SVnodeObj * pVnode = vnodeList + pObj->vnode; + int code = -1; + SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache); + int payloadIter; + SCachePool * pPool = pVnode->pCachePool; + int isCacheIterEnd = 0; + int spayloadIter = 0; + int isAppendData = 0; + int rowsImported = 0; + int totalRows = 0; + size_t size = 0; + SMergeBuffer *pBuffer = NULL; + + TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); + TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); + + assert(firstKey <= lastKey && firstKey > pObj->lastKeyOnFile); + + // TODO: make this condition less strict + if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { // No free room to hold the data + dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId, + pObj->freePoints); + pImport->importedRows = 0; + pImport->commit = 1; + code = TSDB_CODE_ACTION_IN_PROGRESS; + return code; + } - int32_t num = 0; - pthread_mutex_lock(&pVnode->vmutex); - num = pObj->numOfQueries; - pthread_mutex_unlock(&pVnode->vmutex); + if (pInfo->numOfBlocks == 0) { + if (vnodeAllocateCacheBlock(pObj) < 0) { + // TODO: deal with the ERROR here + } + } - int32_t commitInProcess = 0; + // Find the first importable record from payload + pImport->lastKey = lastKey; + for (payloadIter = 0; payloadIter < rows; payloadIter++) { + TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); + if (key == pObj->lastKey) continue; + if (key > pObj->lastKey) { // Just as insert + pImport->slot = pInfo->currentSlot; + pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints; + isCacheIterEnd = 1; + break; + } else { + pImport->firstKey = key; + if (vnodeFindKeyInCache(pImport, 1) < 0) { + goto _exit; + } - pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { - pthread_mutex_unlock(&pPool->vmutex); + if (pImport->firstKey != pImport->key) break; + } + } - pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); - memcpy(pNew, &import, sizeof(SImportInfo)); - pNew->signature = pNew; - int payloadLen = contLen - sizeof(SSubmitMsg); - pNew->payload = malloc(payloadLen); - pNew->opayload = pNew->payload; - memcpy(pNew->payload, payload, payloadLen); + if (payloadIter == rows) { + pImport->importedRows = 0; + code = 0; + goto _exit; + } - dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, - pObj->meterId, commitInProcess, pObj->numOfQueries); + spayloadIter = payloadIter; + if (pImport->pos == pObj->pointsPerBlock) assert(isCacheIterEnd); - taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); - return 0; - } else { - pPool->commitInProcess = 1; - pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, &import); - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += import.importedRows; + // Allocate a new merge buffer work as buffer + totalRows = pObj->pointsPerBlock + rows - payloadIter + 1; + size = sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns + pObj->bytesPerPoint * totalRows; + pBuffer = (SMergeBuffer *)malloc(size); + if (pBuffer == NULL) { + dError("vid:%d sid:%d meterId:%s, failed to allocate memory, size:%d", pObj->vnode, pObj->sid, pObj->meterId, size); + return code; + } + pBuffer->spos = 0; + pBuffer->epos = 0; + pBuffer->totalRows = totalRows; + pBuffer->offset[0] = (char *)pBuffer + sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns; + for (int col = 1; col < pObj->numOfColumns; col++) { + pBuffer->offset[col] = pBuffer->offset[col - 1] + pObj->schema[col - 1].bytes * totalRows; + } + + // TODO: take pImport->pos = pObj->pointsPerBlock into consideration + { // Do the merge staff + SBlockIter cacheIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to traverse old cache data + SBlockIter writeIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to write data to cache + int availPoints = pObj->pointsPerBlock - pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints; + + assert(availPoints >= 0); + + while (1) { + if ((payloadIter >= rows) && isCacheIterEnd) break; + + if ((pBuffer->epos + 1) % pBuffer->totalRows == pBuffer->spos) { // merge buffer is full, flush + if (writeIter.pos == pObj->pointsPerBlock) { + writeIter.pos = 0; + writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; + } + + while (pBuffer->spos != pBuffer->epos) { + if (writeIter.slot == cacheIter.slot && writeIter.pos == cacheIter.pos) break; + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos, + pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes); + } + + if (writeIter.pos + 1 < pObj->pointsPerBlock) { + writeIter.pos++; + } else { + pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1; + writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; + writeIter.pos = 0; + } + + pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows; + } + } + + if ((payloadIter >= rows) || + ((!isCacheIterEnd) && + (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) > + KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), + cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey)) + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos, + pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos, + pObj->schema[col].bytes); + } + FORWARD_ITER(cacheIter, 1, pInfo->maxBlocks, pObj->pointsPerBlock); + isCacheIterEnd = isCacheEnd(cacheIter, pObj); + } else if ((isCacheIterEnd) || + ((payloadIter < rows) && + (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < + KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), + cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey) + if (availPoints == 0) { // Need to allocate a new cache block + pthread_mutex_lock(&(pPool->vmutex)); + SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode); + if (pNewBlock == NULL) { // Failed to allocate a new cache block + pthread_mutex_unlock(&(pPool->vmutex)); + payloadIter = rows; + code = TSDB_CODE_ACTION_IN_PROGRESS; + pImport->commit = 1; + continue; + } + + pNewBlock->pMeterObj = pObj; + pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns; + for (int col = 1; col < pObj->numOfColumns; col++) + pNewBlock->offset[col] = pNewBlock->offset[col - 1] + pObj->schema[col - 1].bytes * pObj->pointsPerBlock; + + int newSlot = (writeIter.slot + 1) % pInfo->maxBlocks; + pInfo->blocks++; + int tblockId = pInfo->blocks; + + if (writeIter.slot != pInfo->currentSlot) { + for (int tslot = pInfo->currentSlot; tslot != writeIter.slot;) { + int nextSlot = (tslot + 1) % pInfo->maxBlocks; + pInfo->cacheBlocks[nextSlot] = pInfo->cacheBlocks[tslot]; + pInfo->cacheBlocks[nextSlot]->slot = nextSlot; + pInfo->cacheBlocks[nextSlot]->blockId = tblockId--; + tslot = (tslot - 1 + pInfo->maxBlocks) % pInfo->maxBlocks; + } + } + + int index = pNewBlock->index; + if (cacheIter.slot == writeIter.slot) { + pNewBlock->numOfPoints = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints; + int pointsLeft = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints - cacheIter.pos; + if (pointsLeft > 0) { + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy((void *)(pNewBlock->offset[col]), + pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos, + pObj->schema[col].bytes * pointsLeft); + } + } + } + pNewBlock->blockId = tblockId; + pNewBlock->slot = newSlot; + pNewBlock->index = index; + pInfo->cacheBlocks[newSlot] = pNewBlock; + pInfo->numOfBlocks++; + pInfo->unCommittedBlocks++; + pInfo->currentSlot = (pInfo->currentSlot + 1) % pInfo->maxBlocks; + pthread_mutex_unlock(&(pPool->vmutex)); + cacheIter.slot = (cacheIter.slot + 1) % pInfo->maxBlocks; + // move a cache of data forward + availPoints = pObj->pointsPerBlock; + } + + int offset = 0; + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos, + payload + pObj->bytesPerPoint * payloadIter + offset, pObj->schema[col].bytes); + offset += pObj->schema[col].bytes; + } + if (spayloadIter == payloadIter) {// update pVnode->firstKey + pthread_mutex_lock(&(pVnode->vmutex)); + if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < pVnode->firstKey) pVnode->firstKey = firstKey; + pthread_mutex_unlock(&(pVnode->vmutex)); + } + if (isCacheIterEnd) { + pObj->lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); + if (!isAppendData) isAppendData = 1; + } + + rowsImported++; + availPoints--; + payloadIter++; + + } else { + payloadIter++; + continue; } + pBuffer->epos = (pBuffer->epos + 1) % pBuffer->totalRows; + } + + if (pBuffer->spos != pBuffer->epos) { + if (writeIter.pos == pObj->pointsPerBlock) { + writeIter.pos = 0; + writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; + } + while (pBuffer->spos != pBuffer->epos) { + for (int col = 0; col < pObj->numOfColumns; col++) { + memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos, + pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes); + } + + if (writeIter.pos + 1 < pObj->pointsPerBlock) { + writeIter.pos++; + } else { + pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1; + writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks; + writeIter.pos = 0; + } + + pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows; + } + + if (writeIter.pos != 0) pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos; + } + + if (isAppendData) { + pthread_mutex_lock(&(pVnode->vmutex)); + if (pObj->lastKey > pVnode->lastKey) pVnode->lastKey = pObj->lastKey; + pthread_mutex_unlock(&(pVnode->vmutex)); } } + pImport->importedRows += rowsImported; - pVnode->version++; + code = 0; - if (pShell) { - pShell->count--; - if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints); +_exit: + tfree(pBuffer); + return code; +} + +int vnodeImportDataToFiles(SImportInfo *pImport, char *payload, const int rows) { + int code = 0; + // TODO : Check the correctness of pObj and pVnode + SMeterObj *pObj = (SMeterObj *)(pImport->pObj); + SVnodeObj *pVnode = vnodeList + pObj->vnode; + + int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision]; + int sfid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0) / delta; + int efid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1) / delta; + + for (int fid = sfid; fid <= efid; fid++) { + TSKEY skey = fid * delta; + TSKEY ekey = skey + delta - 1; + int srow = 0, nrows = 0; + + if (vnodeSearchKeyInRange(payload, pObj->bytesPerPoint, rows, skey, ekey, &srow, &nrows) < 0) continue; + + assert(nrows > 0); + + dTrace("vid:%d sid:%d meterId:%s, %d rows of data will be imported to file %d, srow:%d firstKey:%ld lastKey:%ld", + pObj->vnode, pObj->sid, pObj->meterId, nrows, fid, srow, KEY_AT_INDEX(payload, pObj->bytesPerPoint, srow), + KEY_AT_INDEX(payload, pObj->bytesPerPoint, (srow + nrows - 1))); + + code = vnodeMergeDataIntoFile(pImport, payload + (srow * pObj->bytesPerPoint), nrows, fid); + if (code != 0) break; } - return 0; + return code; } -//todo abort from the procedure if the meter is going to be dropped +// TODO : add offset in pShell to make it avoid repeatedly deal with messages int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { - int code = 0; + int code = 0; + int srow = 0, nrows = 0; + SVnodeObj * pVnode = vnodeList + pObj->vnode; + SCachePool *pPool = (SCachePool *)(pVnode->pCachePool); + + // 1. import data in range (pObj->lastKeyOnFile, INT64_MAX) into cache + if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, pObj->lastKeyOnFile + 1, INT64_MAX, + &srow, &nrows) >= 0) { + code = vnodeImportDataToCache(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows); + if (pImport->commit) { // Need to commit now + pPool->commitInProcess = 0; + vnodeProcessCommitTimer(pVnode, NULL); + return code; + } - if (pImport->lastKey > pObj->lastKeyOnFile) { - code = vnodeImportWholeToCache(pImport, pImport->payload, pImport->rows); - } else if (pImport->lastKey < pObj->lastKeyOnFile) { - code = vnodeImportWholeToFile(pImport, pImport->payload, pImport->rows); - } else { // lastKey == pObj->lastkeyOnFile - code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows); + if (code != 0) return code; } - SVnodeObj *pVnode = &vnodeList[pObj->vnode]; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - pPool->commitInProcess = 0; + // 2. import data (0, pObj->lastKeyOnFile) into files + if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, 0, pObj->lastKeyOnFile - 1, &srow, + &nrows) >= 0) { + code = vnodeImportDataToFiles(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows); + } - if (pImport->commit) vnodeProcessCommitTimer(pVnode, NULL); + pPool->commitInProcess = 0; return code; }