提交 c012704a 编写于 作者: H Hongze Cheng

Submit the first version of merge import

上级 d3941fe9
......@@ -372,13 +372,60 @@ void vnodeCancelCommit(SVnodeObj *pVnode) {
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
}
/* The vnode cache lock should be hold before calling this interface
*/
SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode) {
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SVnodeCfg *pCfg = &(pVnode->cfg);
SCacheBlock *pCacheBlock = NULL;
int skipped = 0;
while (1) {
pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]);
if (pCacheBlock->blockId == 0) break;
if (pCacheBlock->notFree) {
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
if (skipped > pPool->threshold) {
vnodeCreateCommitThread(pVnode);
pthread_mutex_unlock(&pPool->vmutex);
dError("vid:%d committing process is too slow, notFreeSlots:%d....", pVnode->vnode, pPool->notFreeSlots);
return NULL;
}
} else {
SMeterObj * pRelObj = pCacheBlock->pMeterObj;
SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache;
int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks;
pCacheBlock = pRelInfo->cacheBlocks[firstSlot];
if (pCacheBlock) {
pPool->freeSlot = pCacheBlock->index;
vnodeFreeCacheBlock(pCacheBlock);
break;
} else {
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
}
}
}
pCacheBlock = (SCacheBlock *)(pPool->pMem[pPool->freeSlot]);
pCacheBlock->index = pPool->freeSlot;
pCacheBlock->notFree = 1;
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
pPool->notFreeSlots++;
return pCacheBlock;
}
int vnodeAllocateCacheBlock(SMeterObj *pObj) {
int index;
SCachePool * pPool;
SCacheBlock *pCacheBlock;
SCacheInfo * pInfo;
SVnodeObj * pVnode;
int skipped = 0, commit = 0;
int commit = 0;
pVnode = vnodeList + pObj->vnode;
pPool = (SCachePool *)pVnode->pCachePool;
......@@ -406,45 +453,10 @@ int vnodeAllocateCacheBlock(SMeterObj *pObj) {
return -1;
}
while (1) {
pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]);
if (pCacheBlock->blockId == 0) break;
if (pCacheBlock->notFree) {
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
if (skipped > pPool->threshold) {
vnodeCreateCommitThread(pVnode);
pthread_mutex_unlock(&pPool->vmutex);
dError("vid:%d sid:%d id:%s, committing process is too slow, notFreeSlots:%d....",
pObj->vnode, pObj->sid, pObj->meterId, pPool->notFreeSlots);
return -1;
}
} else {
SMeterObj *pRelObj = pCacheBlock->pMeterObj;
SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache;
int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks;
pCacheBlock = pRelInfo->cacheBlocks[firstSlot];
if (pCacheBlock) {
pPool->freeSlot = pCacheBlock->index;
vnodeFreeCacheBlock(pCacheBlock);
break;
} else {
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
}
}
}
index = pPool->freeSlot;
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
pPool->notFreeSlots++;
if ((pCacheBlock = vnodeGetFreeCacheBlock(pVnode)) == NULL) return -1;
index = pCacheBlock->index;
pCacheBlock->pMeterObj = pObj;
pCacheBlock->notFree = 1;
pCacheBlock->index = index;
pCacheBlock->offset[0] = ((char *)(pCacheBlock)) + sizeof(SCacheBlock) + pObj->numOfColumns * sizeof(char *);
for (int col = 1; col < pObj->numOfColumns; ++col)
......
......@@ -103,8 +103,8 @@ void vnodeGetDnameFromLname(char *lhead, char *ldata, char *llast, char *dhead,
}
void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId) {
sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId);
sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId);
if (nHeadName != NULL) sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId);
if (nLastName != NULL) sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId);
}
void vnodeCreateDataDirIfNeeded(int vnode, char *path) {
......
......@@ -15,31 +15,24 @@
#define _DEFAULT_SOURCE
#include <arpa/inet.h>
#include <fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "trpc.h"
#include "ttimer.h"
#include "vnode.h"
#include "vnodeMgmt.h"
#include "vnodeShell.h"
#include "vnodeShell.h"
#include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wpointer-sign"
#pragma GCC diagnostic ignored "-Wint-conversion"
typedef struct {
SCompHeader *headList;
SCompInfo compInfo;
int last; // 0:last block in data file, 1:not the last block
int newBlocks;
int oldNumOfBlocks;
int64_t compInfoOffset; // offset for compInfo in head file
int64_t leftOffset; // copy from this offset to end of head file
int64_t hfdSize; // old head file size
} SHeadInfo;
extern void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId);
extern int vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize,
char *temp, char *buffer, int bufferSize);
extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints);
extern void vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId);
extern int vnodeCreateEmptyCompFile(int vnode, int fileId);
extern int vnodeUpdateFreeSlot(SVnodeObj *pVnode);
extern SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode);
#define KEY_AT_INDEX(payload, step, idx) (*(TSKEY *)((char *)(payload) + (step) * (idx)))
typedef struct {
void * signature;
SShellObj *pShell;
......@@ -56,226 +49,113 @@ typedef struct {
// only for file
int numOfPoints;
int fileId;
int64_t offset; // offset in data file
SData *sdata[TSDB_MAX_COLUMNS];
char *buffer;
char *payload;
char *opayload;
char * payload;
char * opayload; // allocated space for payload from client
int rows;
} SImportInfo;
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport);
int vnodeGetImportStartPart(SMeterObj *pObj, char *payload, int rows, TSKEY key1) {
int i;
for (i = 0; i < rows; ++i) {
TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint));
if (key >= key1) break;
}
return i;
}
int vnodeGetImportEndPart(SMeterObj *pObj, char *payload, int rows, char **pStart, TSKEY key0) {
int i;
for (i = 0; i < rows; ++i) {
TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint));
if (key > key0) break;
}
*pStart = payload + i * pObj->bytesPerPoint;
return rows - i;
}
int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) {
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
TSCKSUM chksum = 0;
if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0;
if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
int leftSize = pHinfo->hfdSize - pHinfo->leftOffset;
if (leftSize > 0) {
lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize);
}
typedef struct {
// in .head file
SCompHeader *pHeader;
size_t pHeaderSize;
pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks;
int offset = (pHinfo->compInfo.numOfBlocks - pHinfo->oldNumOfBlocks) * sizeof(SCompBlock);
if (pHinfo->oldNumOfBlocks == 0) offset += sizeof(SCompInfo) + sizeof(TSCKSUM);
SCompInfo compInfo;
SCompBlock *pBlocks;
// in .data file
int blockId;
uint8_t blockLoadState;
pHinfo->headList[pObj->sid].compInfoOffset = pHinfo->compInfoOffset;
for (int sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) {
if (pHinfo->headList[sid].compInfoOffset) pHinfo->headList[sid].compInfoOffset += offset;
}
SField *pField;
size_t pFieldSize;
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize);
twrite(pVnode->nfd, pHinfo->headList, tmsize);
SData *data[TSDB_MAX_COLUMNS];
char * buffer;
int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock);
char *buffer = malloc(size);
lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
read(pVnode->nfd, buffer, size);
SCompBlock *pBlock = (SCompBlock *)(buffer + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock));
char *temp;
pHinfo->compInfo.uid = pObj->uid;
pHinfo->compInfo.delimiter = TSDB_VNODE_DELIMITER;
pHinfo->compInfo.last = pBlock->last;
char * tempBuffer;
size_t tempBufferSize;
// Variables for sendfile
int64_t compInfoOffset;
int64_t nextNo0Offset; // next sid whose compInfoOffset > 0
int64_t hfSize;
int64_t driftOffset;
taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo));
lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET);
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
int oldNumOfBlocks;
int newNumOfBlocks;
int last;
} SImportHandle;
chksum = taosCalcChecksum(0, (uint8_t *)buffer, size);
lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET);
twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
free(buffer);
typedef struct {
int slot;
int pos;
int oslot; // old slot
TSKEY nextKey;
} SBlockIter;
vnodeCloseCommitFiles(pVnode);
typedef struct {
int64_t spos;
int64_t epos;
int64_t totalRows;
char * offset[];
} SMergeBuffer;
return 0;
}
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport);
int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SCompBlock lastBlock;
int vnodeFindKeyInCache(SImportInfo *pImport, int order) {
SMeterObj * pObj = pImport->pObj;
int code = 0;
SQuery query;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
if (pHinfo->compInfo.last == 0) return 0;
// read into memory
uint64_t offset =
pHinfo->compInfoOffset + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock) + sizeof(SCompInfo);
lseek(pVnode->hfd, offset, SEEK_SET);
read(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
assert(lastBlock.last);
if (lastBlock.sversion != pObj->sversion) {
lseek(pVnode->lfd, lastBlock.offset, SEEK_SET);
lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END);
tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len);
TSKEY key = order ? pImport->firstKey : pImport->lastKey;
memset(&query, 0, sizeof(query));
query.order.order = order;
query.skey = key;
query.ekey = order ? pImport->lastKey : pImport->firstKey;
vnodeSearchPointInCache(pObj, &query);
lastBlock.last = 0;
lseek(pVnode->hfd, offset, SEEK_SET);
twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
if (query.slot < 0) {
pImport->slot = pInfo->commitSlot;
if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
pImport->key = 0;
dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key);
code = 0;
} else {
vnodeReadLastBlockToMem(pObj, &lastBlock, data);
pHinfo->compInfo.numOfBlocks--;
code = lastBlock.numOfPoints;
}
return code;
}
int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
TSKEY firstKey = *((TSKEY *)payload);
struct stat filestat;
int sid, rowsBefore = 0;
if (pVnode->nfd <= 0 || firstKey > pVnode->commitLastKey) {
if (pVnode->nfd > 0) vnodeCloseFileForImport(pObj, pHinfo);
pVnode->commitFirstKey = firstKey;
if (vnodeOpenCommitFiles(pVnode, pObj->sid) < 0) return -1;
fstat(pVnode->hfd, &filestat);
pHinfo->hfdSize = filestat.st_size;
pHinfo->newBlocks = 0;
pHinfo->last = 1; // by default, new blockes are at the end of block list
lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
read(pVnode->hfd, pHinfo->headList, sizeof(SCompHeader) * pCfg->maxSessions);
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
if (pHinfo->headList[pObj->sid].compInfoOffset > 0) {
lseek(pVnode->hfd, pHinfo->headList[pObj->sid].compInfoOffset, SEEK_SET);
if (read(pVnode->hfd, &pHinfo->compInfo, sizeof(SCompInfo)) != sizeof(SCompInfo)) {
dError("vid:%d sid:%d, failed to read compInfo from file:%s", pObj->vnode, pObj->sid, pVnode->cfn);
return -1;
if (key != query.key) {
if (order == 0) {
// since pos is the position which has smaller key, data shall be imported after it
pImport->pos++;
if (pImport->pos >= pObj->pointsPerBlock) {
pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
}
if (pHinfo->compInfo.uid == pObj->uid) {
pHinfo->compInfoOffset = pHinfo->headList[pObj->sid].compInfoOffset;
pHinfo->leftOffset = pHinfo->headList[pObj->sid].compInfoOffset + sizeof(SCompInfo);
} else {
pHinfo->headList[pObj->sid].compInfoOffset = 0;
}
}
if ( pHinfo->headList[pObj->sid].compInfoOffset == 0 ) {
memset(&pHinfo->compInfo, 0, sizeof(SCompInfo));
pHinfo->compInfo.uid = pObj->uid;
for (sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid)
if (pHinfo->headList[sid].compInfoOffset > 0) break;
pHinfo->compInfoOffset = (sid == pCfg->maxSessions) ? pHinfo->hfdSize : pHinfo->headList[sid].compInfoOffset;
pHinfo->leftOffset = pHinfo->compInfoOffset;
if (pImport->pos < 0) pImport->pos = 0;
}
pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks;
lseek(pVnode->hfd, 0, SEEK_SET);
lseek(pVnode->nfd, 0, SEEK_SET);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset);
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR);
if (pVnode->commitFileId < pImport->fileId) {
if (pHinfo->compInfo.numOfBlocks > 0)
pHinfo->leftOffset += pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock);
rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data);
// copy all existing compBlockInfo
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
if (pHinfo->compInfo.numOfBlocks > 0)
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock));
} else if (pVnode->commitFileId == pImport->fileId) {
int slots = pImport->pos ? pImport->slot + 1 : pImport->slot;
pHinfo->leftOffset += slots * sizeof(SCompBlock);
// check if last block is at last file, if it is, read into memory
if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks &&
pHinfo->compInfo.last) {
rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data);
if ( rowsBefore > 0 ) pImport->slot--;
}
// this block will be replaced by new blocks
if (pImport->pos > 0) pHinfo->compInfo.numOfBlocks--;
if (pImport->slot > 0) {
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock));
code = 0;
}
if (pImport->slot < pHinfo->compInfo.numOfBlocks)
pHinfo->last = 0; // new blocks are not at the end of block list
} else {
// nothing
return code;
}
pHinfo->last = 0; // new blocks are not at the end of block list
}
}
void vnodeGetValidDataRange(int vnode, TSKEY now, TSKEY *minKey, TSKEY *maxKey) {
SVnodeObj *pVnode = vnodeList + vnode;
return rowsBefore;
int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
int fid = now / delta;
*minKey = (fid - pVnode->maxFiles + 1) * delta;
*maxKey = (fid + 2) * delta - 1;
return;
}
extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints);
int vnodeImportToFile(SImportInfo *pImport);
void vnodeProcessImportTimer(void *param, void *tmrId) {
SImportInfo *pImport = (SImportInfo *)param;
if (pImport == NULL || pImport->signature != param) {
......@@ -283,18 +163,18 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
return;
}
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SMeterObj * pObj = pImport->pObj;
SVnodeObj * pVnode = &vnodeList[pObj->vnode];
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj *pShell = pImport->pShell;
SShellObj * pShell = pImport->pShell;
pImport->retry++;
//slow query will block the import operation
// slow query will block the import operation
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
state);
return;
}
......@@ -303,7 +183,7 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
//if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
// if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) {
......@@ -311,9 +191,10 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (pImport->retry < 1000) {
dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
commitInProcess, num, state);
dTrace(
"vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, commitInProcess, num, state);
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl);
return;
......@@ -345,646 +226,1430 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
free(pImport);
}
int vnodeImportToFile(SImportInfo *pImport) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
SHeadInfo headInfo;
int code = 0, col;
SCompBlock compBlock;
char * payload = pImport->payload;
int rows = pImport->rows;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1)));
TSKEY firstKey = *((TSKEY *)payload);
memset(&headInfo, 0, sizeof(headInfo));
headInfo.headList = malloc(sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM));
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *pNumOfPoints, TSKEY now) {
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
int rows;
char * payload;
int code = TSDB_CODE_ACTION_IN_PROGRESS;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SShellObj * pShell = (SShellObj *)param;
int pointsImported = 0;
TSKEY minKey, maxKey;
SData *cdata[TSDB_MAX_COLUMNS];
char *buffer1 =
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns);
cdata[0] = (SData *)buffer1;
rows = htons(pSubmit->numOfRows);
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
if (expectedLen != contLen) {
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
expectedLen, contLen);
return TSDB_CODE_WRONG_MSG_SIZE;
}
SData *data[TSDB_MAX_COLUMNS];
char *buffer2 =
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns);
data[0] = (SData *)buffer2;
// FIXME: check sversion here should not be here (Take import convert to insert case into consideration)
if (sversion != pObj->sversion) {
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->sversion, sversion);
return TSDB_CODE_OTHERS;
}
for (col = 1; col < pObj->numOfColumns; ++col) {
cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
// Check timestamp context.
payload = pSubmit->payLoad;
TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey <= lastKey);
vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey);
if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) {
dError(
"vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld "
"maxAllowedKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
int rowsBefore = 0;
int rowsRead = 0;
int rowsUnread = 0;
int leftRows = rows; // left number of rows of imported data
int row, rowsToWrite;
int64_t offset[TSDB_MAX_COLUMNS];
// FIXME: Commit log here is invalid (Take retry into consideration)
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
if (code != 0) return code;
}
if (pImport->pos > 0) {
for (col = 0; col < pObj->numOfColumns; ++col)
memcpy(data[col]->data, pImport->sdata[col]->data, pImport->pos * pObj->schema[col].bytes);
if (firstKey > pObj->lastKey) { // Just call insert
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now);
rowsBefore = pImport->pos;
rowsRead = pImport->pos;
rowsUnread = pImport->numOfPoints - pImport->pos;
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += pointsImported;
}
dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to file, firstKey:%ld lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey);
do {
if (leftRows > 0) {
code = vnodeOpenFileForImport(pImport, payload, &headInfo, data);
if (code < 0) goto _exit;
if (code > 0) {
rowsBefore = code;
code = 0;
};
} else {
// if payload is already imported, rows unread shall still be processed
rowsBefore = 0;
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else { // trigger import
SImportInfo *pNew, import;
int rowsToProcess = pObj->pointsPerFileBlock - rowsBefore;
if (rowsToProcess > leftRows) rowsToProcess = leftRows;
dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey);
memset(&import, 0, sizeof(import));
import.firstKey = firstKey;
import.lastKey = lastKey;
import.pObj = pObj;
import.pShell = pShell;
import.payload = payload;
import.rows = rows;
for (col = 0; col < pObj->numOfColumns; ++col) {
offset[col] = data[col]->data + rowsBefore * pObj->schema[col].bytes;
}
// FIXME: mutex here seems meaningless and num here still can
// be changed
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
row = 0;
if (leftRows > 0) {
for (row = 0; row < rowsToProcess; ++row) {
if (*((TSKEY *)payload) > pVnode->commitLastKey) break;
int32_t commitInProcess = 0;
for (col = 0; col < pObj->numOfColumns; ++col) {
memcpy((void *)offset[col], payload, pObj->schema[col].bytes);
payload += pObj->schema[col].bytes;
offset[col] += pObj->schema[col].bytes;
}
}
}
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) ||
num > 0) { // mutual exclusion with read (need to change here)
pthread_mutex_unlock(&pPool->vmutex);
leftRows -= row;
rowsToWrite = rowsBefore + row;
rowsBefore = 0;
pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo));
pNew->signature = pNew;
int payloadLen = contLen - sizeof(SSubmitMsg);
pNew->payload = malloc(payloadLen);
pNew->opayload = pNew->payload;
memcpy(pNew->payload, payload, payloadLen);
if (leftRows == 0 && rowsUnread > 0) {
// copy the unread
int rowsToCopy = pObj->pointsPerFileBlock - rowsToWrite;
if (rowsToCopy > rowsUnread) rowsToCopy = rowsUnread;
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->meterId, commitInProcess, pObj->numOfQueries);
for (col = 0; col < pObj->numOfColumns; ++col) {
int bytes = pObj->schema[col].bytes;
memcpy(data[col]->data + rowsToWrite * bytes, pImport->sdata[col]->data + rowsRead * bytes, rowsToCopy * bytes);
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0;
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, &import);
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += import.importedRows;
}
rowsRead += rowsToCopy;
rowsUnread -= rowsToCopy;
rowsToWrite += rowsToCopy;
}
for (col = 0; col < pObj->numOfColumns; ++col) {
data[col]->len = rowsToWrite * pObj->schema[col].bytes;
}
compBlock.last = headInfo.last;
vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite);
twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock));
rowsToWrite = 0;
headInfo.newBlocks++;
} while (leftRows > 0 || rowsUnread > 0);
// How about the retry? Will this also cause vnode version++?
pVnode->version++;
if (compBlock.keyLast > pObj->lastKeyOnFile)
pObj->lastKeyOnFile = compBlock.keyLast;
if (pShell) {
pShell->count--;
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints);
}
vnodeCloseFileForImport(pObj, &headInfo);
dTrace("vid:%d sid:%d id:%s, %d rows data are imported to file", pObj->vnode, pObj->sid, pObj->meterId, rows);
return 0;
}
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
pthread_mutex_lock(&pPool->vmutex);
/* Function to search keys in a range
*
* Assumption: keys in payload are in ascending order
*
* @payload: data records, key in ascending order
* @step: bytes each record takes
* @rows: number of data records
* @skey: range start (included)
* @ekey: range end (included)
* @srows: rtype, start index of records
* @nrows: rtype, number of records in range
*
* @rtype: 0 means find data in the range
* -1 means find no data in the range
*/
static int vnodeSearchKeyInRange(char *payload, int step, int rows, TSKEY skey, TSKEY ekey, int *srow, int *nrows) {
if (rows <= 0 || KEY_AT_INDEX(payload, step, 0) > ekey || KEY_AT_INDEX(payload, step, rows - 1) < skey || skey > ekey)
return -1;
if (pInfo->numOfBlocks > 0) {
int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0]));
int left = 0;
int right = rows - 1;
int mid;
// data may be in commited cache, cache shall be released
if (lastKey > firstKeyInCache) {
while (slot != pInfo->commitSlot) {
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
vnodeFreeCacheBlock(pCacheBlock);
slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
// Binary search the first key in payload >= skey
do {
mid = (left + right) / 2;
if (skey < KEY_AT_INDEX(payload, step, mid)) {
right = mid;
} else if (skey > KEY_AT_INDEX(payload, step, mid)) {
left = mid + 1;
} else {
break;
}
} while (left < right);
// last slot, the uncommitted slots shall be shifted
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
int points = pCacheBlock->numOfPoints - pInfo->commitPoint;
if (points > 0) {
for (int col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memmove(pCacheBlock->offset[col], pCacheBlock->offset[col] + pObj->schema[col].bytes * pInfo->commitPoint, size);
if (skey <= KEY_AT_INDEX(payload, step, mid)) {
*srow = mid;
} else {
if (mid + 1 >= rows) {
return -1;
} else {
*srow = mid + 1;
}
}
if (pInfo->commitPoint != pObj->pointsPerBlock) {
// commit point shall be set to 0 if last block is not full
pInfo->commitPoint = 0;
pCacheBlock->numOfPoints = points;
if (slot == pInfo->currentSlot) {
__sync_fetch_and_add(&pObj->freePoints, pInfo->commitPoint);
}
assert(skey <= KEY_AT_INDEX(payload, step, *srow));
*nrows = 0;
for (int i = *srow; i < rows; i++) {
if (KEY_AT_INDEX(payload, step, i) <= ekey) {
(*nrows)++;
} else {
// if last block is full and committed
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
if (pCacheBlock->pMeterObj == pObj) {
vnodeFreeCacheBlock(pCacheBlock);
break;
}
}
}
}
if (lastKey > pObj->lastKeyOnFile) pObj->lastKeyOnFile = lastKey;
pthread_mutex_unlock(&pPool->vmutex);
_exit:
tfree(headInfo.headList);
tfree(buffer1);
tfree(buffer2);
tfree(pImport->buffer);
if (*nrows == 0) return -1;
return code;
return 0;
}
int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
int code = -1;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
int slot, pos, row, col, points, tpoints;
char *data[TSDB_MAX_COLUMNS], *current[TSDB_MAX_COLUMNS];
int slots = pInfo->unCommittedBlocks + 1;
int trows = slots * pObj->pointsPerBlock + rows; // max rows in buffer
int tsize = (trows / pObj->pointsPerBlock + 1) * pCfg->cacheBlockSize;
TSKEY firstKey = *((TSKEY *)payload);
TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1)));
if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) {
dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->freePoints);
pImport->importedRows = 0;
pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}
int vnodeOpenMinFilesForImport(int vnode, int fid) {
char dname[TSDB_FILENAME_LEN] = "\0";
SVnodeObj * pVnode = vnodeList + vnode;
struct stat filestat;
int minFileSize;
dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey);
minFileSize = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
pthread_mutex_lock(&(pVnode->vmutex));
if (firstKey < pVnode->firstKey) pVnode->firstKey = firstKey;
pthread_mutex_unlock(&(pVnode->vmutex));
vnodeGetHeadDataLname(pVnode->cfn, dname, pVnode->lfn, vnode, fid);
char *buffer = malloc(tsize); // buffer to hold unCommitted data plus import data
data[0] = buffer;
current[0] = data[0];
for (col = 1; col < pObj->numOfColumns; ++col) {
data[col] = data[col - 1] + trows * pObj->schema[col - 1].bytes;
current[col] = data[col];
// Open .head file
pVnode->hfd = open(pVnode->cfn, O_RDONLY);
if (pVnode->hfd < 0) {
dError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
taosLogError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
goto _error_open;
}
// write import data into buffer first
for (row = 0; row < rows; ++row) {
for (col = 0; col < pObj->numOfColumns; ++col) {
memcpy(current[col], payload, pObj->schema[col].bytes);
payload += pObj->schema[col].bytes;
current[col] += pObj->schema[col].bytes;
fstat(pVnode->hfd, &filestat);
if (filestat.st_size < minFileSize) {
dError("vid:%d, head file:%s is corrupted", vnode, pVnode->cfn);
taosLogError("vid:%d, head file:%s corrupted", vnode, pVnode->cfn);
goto _error_open;
}
// Open .data file
pVnode->dfd = open(dname, O_RDWR);
if (pVnode->dfd < 0) {
dError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
taosLogError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
goto _error_open;
}
// copy the overwritten data into buffer
tpoints = rows;
pos = pImport->pos;
slot = pImport->slot;
while (1) {
points = pInfo->cacheBlocks[slot]->numOfPoints - pos;
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(current[col], pInfo->cacheBlocks[slot]->offset[col] + pos * pObj->schema[col].bytes, size);
current[col] += size;
fstat(pVnode->dfd, &filestat);
if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
dError("vid:%d, data file:%s corrupted", vnode, dname);
taosLogError("vid:%d, data file:%s corrupted", vnode, dname);
goto _error_open;
}
pos = 0;
tpoints += points;
if (slot == pInfo->currentSlot) break;
slot = (slot + 1) % pInfo->maxBlocks;
// Open .last file
pVnode->lfd = open(pVnode->lfn, O_RDWR);
if (pVnode->lfd < 0) {
dError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
taosLogError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
goto _error_open;
}
for (col = 0; col < pObj->numOfColumns; ++col) current[col] = data[col];
pos = pImport->pos;
fstat(pVnode->lfd, &filestat);
if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
dError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
taosLogError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
goto _error_open;
}
// write back to existing slots first
slot = pImport->slot;
while (1) {
points = (tpoints > pObj->pointsPerBlock - pos) ? pObj->pointsPerBlock - pos : tpoints;
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size);
current[col] += size;
}
pCacheBlock->numOfPoints = points + pos;
pos = 0;
tpoints -= points;
return 0;
if (slot == pInfo->currentSlot) break;
slot = (slot + 1) % pInfo->maxBlocks;
}
_error_open:
if (pVnode->hfd > 0) close(pVnode->hfd);
pVnode->hfd = 0;
// allocate new cache block if there are still data left
while (tpoints > 0) {
pImport->commit = vnodeAllocateCacheBlock(pObj);
if (pImport->commit < 0) goto _exit;
points = (tpoints > pObj->pointsPerBlock) ? pObj->pointsPerBlock : tpoints;
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[pInfo->currentSlot];
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size);
current[col] += size;
}
tpoints -= points;
pCacheBlock->numOfPoints = points;
}
if (pVnode->dfd > 0) close(pVnode->dfd);
pVnode->dfd = 0;
code = 0;
__sync_fetch_and_sub(&pObj->freePoints, rows);
dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows);
if (pVnode->lfd > 0) close(pVnode->lfd);
pVnode->lfd = 0;
_exit:
free(buffer);
return code;
return -1;
}
int vnodeFindKeyInFile(SImportInfo *pImport, int order) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
int code = -1;
SQuery query;
SColumnInfoEx colList[TSDB_MAX_COLUMNS] = {0};
/* Function to open .t file and sendfile the first part
*/
int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid) {
char dHeadName[TSDB_FILENAME_LEN] = "\0";
SVnodeObj * pVnode = vnodeList + pObj->vnode;
struct stat filestat;
int sid;
// cfn: .head
if (readlink(pVnode->cfn, dHeadName, TSDB_FILENAME_LEN) < 0) return -1;
size_t len = strlen(dHeadName);
// switch head name
switch (dHeadName[len - 1]) {
case '0':
dHeadName[len - 1] = '1';
break;
case '1':
dHeadName[len - 1] = '0';
break;
default:
dError("vid: %d, fid: %d, head target filename not end with 0 or 1", pVnode->vnode, fid);
return -1;
}
TSKEY key = order ? pImport->firstKey : pImport->lastKey;
memset(&query, 0, sizeof(query));
query.order.order = order;
query.skey = key;
query.ekey = order ? INT64_MAX : 0;
query.colList = colList;
query.numOfCols = pObj->numOfColumns;
vnodeGetHeadTname(pVnode->nfn, NULL, pVnode->vnode, fid);
symlink(dHeadName, pVnode->nfn);
pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (pVnode->nfd < 0) {
dError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
taosLogError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
return -1;
}
for (int16_t i = 0; i < pObj->numOfColumns; ++i) {
colList[i].data.colId = pObj->schema[i].colId;
colList[i].data.bytes = pObj->schema[i].bytes;
colList[i].data.type = pObj->schema[i].type;
fstat(pVnode->hfd, &filestat);
pHandle->hfSize = filestat.st_size;
colList[i].colIdx = i;
colList[i].colIdxInBuf = i;
// Find the next sid whose compInfoOffset > 0
for (sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; sid++) {
if (pHandle->pHeader[sid].compInfoOffset > 0) break;
}
int ret = vnodeSearchPointInFile(pObj, &query);
pHandle->nextNo0Offset = (sid == pVnode->cfg.maxSessions) ? pHandle->hfSize : pHandle->pHeader[sid].compInfoOffset;
if (ret >= 0) {
if (query.slot < 0) {
pImport->slot = 0;
pImport->pos = 0;
pImport->key = 0;
pImport->fileId = pVnode->fileId - pVnode->numOfFiles + 1;
dTrace("vid:%d sid:%d id:%s, import to head of file", pObj->vnode, pObj->sid, pObj->meterId);
code = 0;
} else if (query.slot >= 0) {
code = 0;
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
pImport->fileId = query.fileId;
SCompBlock *pBlock = &query.pBlock[query.slot];
pImport->numOfPoints = pBlock->numOfPoints;
// FIXME: sendfile the original part
// TODO: Here, we need to take the deleted table case in consideration, this function
// just assume the case is handled before calling this function
if (pHandle->pHeader[pObj->sid].compInfoOffset > 0) {
pHandle->compInfoOffset = pHandle->pHeader[pObj->sid].compInfoOffset;
} else {
pHandle->compInfoOffset = pHandle->nextNo0Offset;
}
if (pImport->key != key) {
if (order == 0) {
pImport->pos++;
assert(pHandle->compInfoOffset <= pHandle->hfSize);
if (pImport->pos >= pBlock->numOfPoints) {
pImport->slot++;
pImport->pos = 0;
lseek(pVnode->hfd, 0, SEEK_SET);
lseek(pVnode->nfd, 0, SEEK_SET);
if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->compInfoOffset) < 0) {
// TODO : deal with ERROR here
}
} else {
if (pImport->pos < 0) pImport->pos = 0;
// Leave a SCompInfo space here
lseek(pVnode->nfd, sizeof(SCompInfo), SEEK_CUR);
return 0;
}
typedef enum { DATA_LOAD_TIMESTAMP = 0x1, DATA_LOAD_OTHER_DATA = 0x2 } DataLoadMod;
/* Function to load a block data at the requirement of mod
*/
static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod) {
size_t size;
int code = 0;
SCompBlock *pBlock = pHandle->pBlocks + blockId;
assert(pBlock->sversion == pObj->sversion);
SVnodeObj *pVnode = vnodeList + pObj->vnode;
int dfd = pBlock->last ? pVnode->lfd : pVnode->dfd;
if (pHandle->blockId != blockId) {
pHandle->blockId = blockId;
pHandle->blockLoadState = 0;
}
if (pHandle->blockLoadState == 0){ // Reload pField
size = sizeof(SField) * pBlock->numOfCols + sizeof(TSCKSUM);
if (pHandle->pFieldSize < size) {
pHandle->pField = (SField *)realloc((void *)(pHandle->pField), size);
if (pHandle->pField == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
return -1;
}
pHandle->pFieldSize = size;
}
if (pImport->key != key && pImport->pos > 0) {
if ( pObj->sversion != pBlock->sversion ) {
dError("vid:%d sid:%d id:%s, import sversion not matached, expected:%d received:%d", pObj->vnode, pObj->sid,
pBlock->sversion, pObj->sversion);
code = TSDB_CODE_OTHERS;
} else {
pImport->offset = pBlock->offset;
lseek(dfd, pBlock->offset, SEEK_SET);
if (read(dfd, (void *)(pHandle->pField), pHandle->pFieldSize) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to read data file, size:%ld reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pHandle->pFieldSize, strerror(errno));
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pHandle->pField), pHandle->pFieldSize)) {
dError("vid:%d sid:%d meterId:%s, data file %s is broken since checksum mismatch", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->lfn);
return -1;
}
}
pImport->buffer =
malloc(pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + sizeof(SData) * pObj->numOfColumns);
pImport->sdata[0] = (SData *)pImport->buffer;
for (int col = 1; col < pObj->numOfColumns; ++col)
pImport->sdata[col] = (SData *)(((char *)pImport->sdata[col - 1]) + sizeof(SData) +
{ // Allocate necessary buffer
size = pObj->bytesPerPoint * pObj->pointsPerFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns;
if (pHandle->buffer == NULL) {
pHandle->buffer = malloc(size);
if (pHandle->buffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
return -1;
}
// TODO: Init data
pHandle->data[0] = (SData *)(pHandle->buffer);
for (int col = 1; col < pObj->numOfColumns; col++) {
pHandle->data[col] = (SData *)((char *)(pHandle->data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
}
}
code = vnodeReadCompBlockToMem(pObj, &query, pImport->sdata);
if (code < 0) {
code = -code;
tfree(pImport->buffer);
if (pHandle->temp == NULL) {
pHandle->temp = malloc(size);
if (pHandle->temp == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
return -1;
}
}
if (pHandle->tempBuffer == NULL) {
pHandle->tempBufferSize = pObj->maxBytes + EXTRA_BYTES;
pHandle->tempBuffer = malloc(pHandle->tempBufferSize);
if (pHandle->tempBuffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->tempBufferSize);
return -1;
}
}
} else {
dError("vid:%d sid:%d id:%s, file is corrupted, import failed", pObj->vnode, pObj->sid, pObj->meterId);
code = -ret;
}
tclose(query.hfd);
tclose(query.dfd);
tclose(query.lfd);
vnodeFreeFields(&query);
tfree(query.pBlock);
if ((loadMod & DATA_LOAD_TIMESTAMP) &&
(~(pHandle->blockLoadState & DATA_LOAD_TIMESTAMP))) { // load only timestamp part
code =
vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX,
pHandle->data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY) * pBlock->numOfPoints,
pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize);
return code;
}
if (code != 0) return -1;
pHandle->blockLoadState |= DATA_LOAD_TIMESTAMP;
}
int vnodeFindKeyInCache(SImportInfo *pImport, int order) {
SMeterObj *pObj = pImport->pObj;
int code = 0;
SQuery query;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
if ((loadMod & DATA_LOAD_OTHER_DATA) && (~(pHandle->blockLoadState & DATA_LOAD_OTHER_DATA))) { // load other columns
for (int col = 1; col < pBlock->numOfCols; col++) {
code = vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data,
pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer,
pHandle->tempBufferSize);
if (code != 0) return -1;
}
TSKEY key = order ? pImport->firstKey : pImport->lastKey;
memset(&query, 0, sizeof(query));
query.order.order = order;
query.skey = key;
query.ekey = order ? pImport->lastKey : pImport->firstKey;
vnodeSearchPointInCache(pObj, &query);
pHandle->blockLoadState |= DATA_LOAD_OTHER_DATA;
}
if (query.slot < 0) {
pImport->slot = pInfo->commitSlot;
if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
pImport->key = 0;
dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key);
code = 0;
} else {
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
return 0;
}
if (key != query.key) {
if (order == 0) {
// since pos is the position which has smaller key, data shall be imported after it
pImport->pos++;
if (pImport->pos >= pObj->pointsPerBlock) {
pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
static int vnodeCloseImportFiles(SMeterObj *pObj, SImportHandle *pHandle) {
SVnodeObj *pVnode = vnodeList + pObj->vnode;
char dpath[TSDB_FILENAME_LEN] = "\0";
SCompInfo compInfo;
__off_t offset = 0;
if (pVnode->nfd > 0) {
offset = lseek(pVnode->nfd, 0, SEEK_CUR);
assert(offset == pHandle->nextNo0Offset + pHandle->driftOffset);
{ // Write the SCompInfo part
compInfo.uid = pObj->uid;
compInfo.last = pHandle->last;
compInfo.numOfBlocks = pHandle->newNumOfBlocks + pHandle->oldNumOfBlocks;
compInfo.delimiter = TSDB_VNODE_DELIMITER;
taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo));
lseek(pVnode->nfd, pHandle->compInfoOffset, SEEK_SET);
if (twrite(pVnode->nfd, (void *)(&compInfo), sizeof(SCompInfo)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to wirte SCompInfo, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
strerror(errno));
return -1;
}
} else {
if (pImport->pos < 0) pImport->pos = 0;
}
// Write the rest of the SCompBlock part
if (pHandle->hfSize > pHandle->nextNo0Offset) {
lseek(pVnode->nfd, 0, SEEK_END);
lseek(pVnode->hfd, pHandle->nextNo0Offset, SEEK_SET);
if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->hfSize - pHandle->nextNo0Offset) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to sendfile, size:%ld, reason:%s", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->hfSize - pHandle->nextNo0Offset, strerror(errno));
return -1;
}
code = 0;
}
return code;
}
// Write SCompHeader part
pHandle->pHeader[pObj->sid].compInfoOffset = pHandle->compInfoOffset;
for (int sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; ++sid) {
if (pHandle->pHeader[sid].compInfoOffset > 0) {
pHandle->pHeader[sid].compInfoOffset += pHandle->driftOffset;
}
}
int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
taosCalcChecksumAppend(0, (uint8_t *)(pHandle->pHeader), pHandle->pHeaderSize);
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (twrite(pVnode->nfd, (void *)(pHandle->pHeader), pHandle->pHeaderSize) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to wirte SCompHeader part, size:%ld, reason:%s", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->pHeaderSize, strerror(errno));
return -1;
}
}
code = vnodeFindKeyInCache(pImport, 1);
if (code != 0) return code;
// Close opened files
close(pVnode->dfd);
pVnode->dfd = 0;
if (pImport->key != pImport->firstKey) {
rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key);
pImport->importedRows = rows;
code = vnodeImportToCache(pImport, payload, rows);
} else {
dTrace("vid:%d sid:%d id:%s, data is already imported to cache", pObj->vnode, pObj->sid, pObj->meterId);
close(pVnode->hfd);
pVnode->hfd = 0;
close(pVnode->lfd);
pVnode->lfd = 0;
if (pVnode->nfd > 0) {
close(pVnode->nfd);
pVnode->nfd = 0;
readlink(pVnode->cfn, dpath, TSDB_FILENAME_LEN);
rename(pVnode->nfn, pVnode->cfn);
remove(dpath);
}
return code;
return 0;
}
int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
void vnodeConvertRowsToCols(SMeterObj *pObj, const char *payload, int rows, SData *data[], int rowOffset) {
int sdataRow;
int offset;
code = vnodeFindKeyInFile(pImport, 1);
if (code != 0) return code;
for (int row = 0; row < rows; ++row) {
sdataRow = row + rowOffset;
offset = 0;
for (int col = 0; col < pObj->numOfColumns; ++col) {
memcpy(data[col]->data + sdataRow * pObj->schema[col].bytes, payload + pObj->bytesPerPoint * row + offset,
pObj->schema[col].bytes);
if (pImport->key != pImport->firstKey) {
pImport->payload = payload;
pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key);
pImport->importedRows = pImport->rows;
code = vnodeImportToFile(pImport);
} else {
dTrace("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId);
offset += pObj->schema[col].bytes;
}
}
}
return code;
// TODO : Check the correctness
int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) {
int numOfFiles = 0, fileId, filesAdded = 0;
int vnode = pVnode->vnode;
SVnodeCfg *pCfg = &(pVnode->cfg);
if (pVnode->lastKeyOnFile == 0) {
if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10;
pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
pVnode->lastKeyOnFile = (long)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1;
pVnode->numOfFiles = 1;
if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
}
numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1;
dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode,
pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
if (numOfFiles >= pVnode->numOfFiles) {
// create empty header files backward
filesAdded = numOfFiles - pVnode->numOfFiles + 1;
for (int i = 0; i < filesAdded; ++i) {
fileId = pVnode->fileId - pVnode->numOfFiles - i;
if (vnodeCreateEmptyCompFile(vnode, fileId) < 0) return -1;
}
} else if (numOfFiles < 0) {
// create empty header files forward
pVnode->fileId++;
if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
pVnode->lastKeyOnFile += (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
filesAdded = 1;
numOfFiles = 0; // hacker way
}
fileId = pVnode->fileId - numOfFiles;
pVnode->commitLastKey =
pVnode->lastKeyOnFile - (long)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
pVnode->commitFirstKey = pVnode->commitLastKey - (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1;
pVnode->commitFileId = fileId;
pVnode->numOfFiles = pVnode->numOfFiles + filesAdded;
return 0;
}
int vnodeImportWholeToFile(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int rows, int fid) {
SMeterObj * pObj = (SMeterObj *)(pImport->pObj);
SVnodeObj * pVnode = vnodeList + pObj->vnode;
SImportHandle importHandle;
size_t size = 0;
SData * data[TSDB_MAX_COLUMNS];
char * buffer = NULL;
SData * cdata[TSDB_MAX_COLUMNS];
char * cbuffer = NULL;
SCompBlock compBlock;
TSCKSUM checksum = 0;
int pointsImported = 0;
code = vnodeFindKeyInFile(pImport, 0);
if (code != 0) return code;
TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
TSKEY minFileKey = fid * delta;
TSKEY maxFileKey = minFileKey + delta - 1;
TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey >= minFileKey && firstKey <= maxFileKey && lastKey >= minFileKey && lastKey <= maxFileKey);
// create neccessary files
pVnode->commitFirstKey = firstKey;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1;
assert(pVnode->commitFileId == fid);
// Open least files to import .head(hfd) .data(dfd) .last(lfd)
if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return -1;
memset(&importHandle, 0, sizeof(SImportHandle));
{ // Load SCompHeader part from .head file
importHandle.pHeaderSize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
importHandle.pHeader = (SCompHeader *)malloc(importHandle.pHeaderSize);
if (importHandle.pHeader == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, importHandle.pHeaderSize);
goto _error_merge;
}
lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (read(pVnode->hfd, (void *)(importHandle.pHeader), importHandle.pHeaderSize) < importHandle.pHeaderSize) {
dError("vid: %d, sid: %d, meterId: %s, fid: %d failed to read SCompHeader part, reason:%s", pObj->vnode,
pObj->sid, pObj->meterId, fid, strerror(errno));
goto _error_merge;
}
if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pHeader), importHandle.pHeaderSize)) {
dError("vid: %d, sid: %d, meterId: %s, fid: %d SCompHeader part is broken", pObj->vnode, pObj->sid, pObj->meterId,
fid);
goto _error_merge;
}
}
{ // Initialize data[] and cdata[], which is used to hold data to write to data file
size = pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES) * pObj->numOfColumns;
buffer = (char *)malloc(size);
if (buffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
goto _error_merge;
}
cbuffer = (char *)malloc(size);
if (cbuffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
goto _error_merge;
}
data[0] = (SData *)buffer;
cdata[0] = (SData *)cbuffer;
for (int col = 1; col < pObj->numOfColumns; col++) {
data[col] = (SData *)((char *)data[col - 1] + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
cdata[col] = (SData *)((char *)cdata[col - 1] + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
}
}
if (importHandle.pHeader[pObj->sid].compInfoOffset == 0) { // No data in this file, just write it
_write_empty_point:
if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
goto _error_merge;
}
importHandle.oldNumOfBlocks = 0;
importHandle.driftOffset += sizeof(SCompInfo);
for (int rowsWritten = 0; rowsWritten < rows;) {
int rowsToWrite = MIN(pVnode->cfg.rowsInFileBlock, (rows - rowsWritten) /* the rows left */);
vnodeConvertRowsToCols(pObj, payload + rowsWritten * pObj->bytesPerPoint, rowsToWrite, data, 0);
pointsImported += rowsToWrite;
// TODO : Write the block to the file
compBlock.last = 1;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite) < 0) {
// TODO: deal with ERROR here
}
importHandle.last = compBlock.last;
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock));
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowsWritten += rowsToWrite;
}
twrite(pVnode->nfd, &checksum, sizeof(TSCKSUM));
importHandle.driftOffset += sizeof(TSCKSUM);
} else { // Else if there are old data in this file.
{ // load SCompInfo and SCompBlock part
lseek(pVnode->hfd, importHandle.pHeader[pObj->sid].compInfoOffset, SEEK_SET);
if (read(pVnode->hfd, (void *)(&(importHandle.compInfo)), sizeof(SCompInfo)) < sizeof(SCompInfo)) {
dError("vid:%d sid:%d meterId:%s, failed to read .head file, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
goto _error_merge;
}
if ((importHandle.compInfo.delimiter != TSDB_VNODE_DELIMITER) ||
(!taosCheckChecksumWhole((uint8_t *)(&(importHandle.compInfo)), sizeof(SCompInfo)))) {
dError("vid:%d sid:%d meterId:%s, .head file %s is broken, delemeter:%x", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->cfn, importHandle.compInfo.delimiter);
goto _error_merge;
}
{ // Check the context of SCompInfo part
if (importHandle.compInfo.uid != pObj->uid) { // The data belongs to the other meter
goto _write_empty_point;
}
}
importHandle.oldNumOfBlocks = importHandle.compInfo.numOfBlocks;
importHandle.last = importHandle.compInfo.last;
size = sizeof(SCompBlock) * importHandle.compInfo.numOfBlocks + sizeof(TSCKSUM);
importHandle.pBlocks = (SCompBlock *)malloc(size);
if (importHandle.pBlocks == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate importHandle.pBlock, size:%ul", pVnode->vnode, pObj->sid,
pObj->meterId, size);
goto _error_merge;
}
if (read(pVnode->hfd, (void *)(importHandle.pBlocks), size) < size) {
dError("vid:%d sid:%d meterId:%s, failed to read importHandle.pBlock, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
goto _error_merge;
}
if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pBlocks), size)) {
dError("vid:%d sid:%d meterId:%s, pBlock part is broken in %s", pVnode->vnode, pObj->sid, pObj->meterId,
pVnode->cfn);
goto _error_merge;
}
}
/* Now we have _payload_, we have _importHandle.pBlocks_, just merge payload into the importHandle.pBlocks
*
* Input: payload, pObj->bytesPerBlock, rows, importHandle.pBlocks
*/
{
int payloadIter = 0;
SBlockIter blockIter = {0, 0, 0, 0};
while (1) {
if (payloadIter >= rows) { // payload end, break
// write the remaining blocks to the file
if (pVnode->nfd > 0) {
int blocksLeft = importHandle.compInfo.numOfBlocks - blockIter.oslot;
if (blocksLeft > 0) {
checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * blocksLeft);
if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * blocksLeft) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * blocksLeft, strerror(errno));
goto _error_merge;
}
}
if (twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(TSCKSUM), strerror(errno));
goto _error_merge;
}
}
break;
}
if (blockIter.slot >= importHandle.compInfo.numOfBlocks) { // blocks end, break
assert(false);
// Should never come here
int rowsLeft = rows - payloadIter;
if (pVnode->nfd > 0 && rowsLeft > 0) {
// TODO : Convert into while here
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, rowsLeft, data, 0);
pointsImported++;
assert(importHandle.last == 0);
compBlock.last = 1;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rows - payloadIter) < 0) {
// TODO :
}
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
importHandle.last = compBlock.last;
twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock));
twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM));
}
break;
}
TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
{ // Binary search the (slot, pos) which is >= key as well as nextKey
int left = blockIter.slot;
int right = importHandle.compInfo.numOfBlocks - 1;
TSKEY minKey = importHandle.pBlocks[left].keyFirst;
TSKEY maxKey = importHandle.pBlocks[right].keyLast;
assert(minKey <= maxKey);
if (key < minKey) { // Case 1. write just ahead the blockIter.slot
blockIter.slot = left;
blockIter.pos = 0;
blockIter.nextKey = minKey;
} else if (key > maxKey) { // Case 2. write to the end
if (importHandle.pBlocks[right].last) { // Case 2.1 last block in .last file, need to merge
assert(importHandle.last != 0);
importHandle.last = 0;
blockIter.slot = right;
blockIter.pos = importHandle.pBlocks[right].numOfPoints;
} else { // Case 2.2 just write after the last block
blockIter.slot = right + 1;
blockIter.pos = 0;
}
blockIter.nextKey = maxFileKey + 1;
} else { // Case 3. need to search the block for slot and pos
if (key == minKey || key == maxKey) {
payloadIter++;
continue;
}
// Here: minKey < key < maxKey
int mid;
TSKEY blockMinKey;
TSKEY blockMaxKey;
// Binary search the slot
do {
mid = (left + right) / 2;
blockMinKey = importHandle.pBlocks[mid].keyFirst;
blockMaxKey = importHandle.pBlocks[mid].keyLast;
assert(blockMinKey <= blockMaxKey);
if (key < blockMinKey) {
right = mid;
} else if (key > blockMaxKey) {
left = mid + 1;
} else { /* blockMinKey <= key <= blockMaxKey */
break;
}
} while (left < right);
if (key == blockMinKey || key == blockMaxKey) { // duplicate key
payloadIter++;
continue;
}
// Get the slot
if (key > blockMaxKey) { /* pos = 0 or pos = ? */
blockIter.slot = mid + 1;
} else { /* key < blockMinKey (pos = 0) || (key > blockMinKey && key < blockMaxKey) (pos=?) */
blockIter.slot = mid;
}
if (pImport->key != pImport->lastKey) {
pImport->payload = payload;
pImport->rows = vnodeGetImportEndPart(pObj, payload, rows, &pImport->payload, pImport->key);
pImport->importedRows = pImport->rows;
code = vnodeImportToFile(pImport);
// Get the pos
assert(blockIter.slot < importHandle.compInfo.numOfBlocks);
if (key == importHandle.pBlocks[blockIter.slot].keyFirst ||
key == importHandle.pBlocks[blockIter.slot].keyLast) {
payloadIter++;
continue;
}
assert(key < importHandle.pBlocks[blockIter.slot].keyLast);
/* */
if (key < importHandle.pBlocks[blockIter.slot].keyFirst) {
blockIter.pos = 0;
blockIter.nextKey = importHandle.pBlocks[blockIter.slot].keyFirst;
} else {
code = vnodeImportStartToFile(pImport, payload, rows);
SCompBlock *pBlock = importHandle.pBlocks + blockIter.slot;
if (pBlock->sversion != pObj->sversion) { /*TODO*/
}
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP) < 0) {
}
int pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(
importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
assert(pos != 0);
if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) {
payloadIter++;
continue;
}
return code;
}
blockIter.pos = pos;
blockIter.nextKey = (blockIter.slot + 1 < importHandle.compInfo.numOfBlocks)
? importHandle.pBlocks[blockIter.slot + 1].keyFirst
: maxFileKey + 1;
// Need to merge with this block
if (importHandle.pBlocks[blockIter.slot].last) { // this is to merge with the last block
assert((blockIter.slot == (importHandle.compInfo.numOfBlocks - 1)));
importHandle.last = 0;
}
}
}
}
int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
// Open the new .t file if not opened yet.
if (pVnode->nfd <= 0) {
if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
goto _error_merge;
}
}
code = vnodeFindKeyInCache(pImport, 0);
if (code != 0) return code;
if (blockIter.slot > blockIter.oslot) { // write blocks in range [blockIter.oslot, blockIter.slot) to .t file
checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot));
if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot),
strerror(errno));
goto _error_merge;
}
blockIter.oslot = blockIter.slot;
}
if (blockIter.pos == 0) { // No need to merge
// copy payload part to data
int rowOffset = 0;
for (; payloadIter < rows; rowOffset++) {
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) break;
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
payloadIter++;
}
// write directly to .data file
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO: Deal with the ERROR here
}
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)) < 0) {
// TODO : deal with the ERROR here
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
} else { // Merge block and payload from payloadIter
if (pImport->key != pImport->lastKey) {
char *pStart;
if ( pImport->key < pObj->lastKeyOnFile ) pImport->key = pObj->lastKeyOnFile;
rows = vnodeGetImportEndPart(pObj, payload, rows, &pStart, pImport->key);
pImport->importedRows = rows;
code = vnodeImportToCache(pImport, pStart, rows);
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot,
DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA) < 0) { // Load neccessary blocks
goto _error_merge;
}
importHandle.oldNumOfBlocks--;
importHandle.driftOffset -= sizeof(SCompBlock);
int rowOffset = blockIter.pos; // counter for data
// Copy the front part
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy((void *)(data[col]->data), (void *)(importHandle.data[col]->data),
pObj->schema[col].bytes * blockIter.pos);
}
// Merge part
while (1) {
if (rowOffset >= pVnode->cfg.rowsInFileBlock) { // data full in a block to commit
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO : deal with the ERROR here
}
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
goto _error_merge;
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowOffset = 0;
}
if ((payloadIter >= rows || KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) &&
blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints)
break;
if (payloadIter >= rows ||
KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) { // payload end
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos, pObj->schema[col].bytes);
}
blockIter.pos++;
rowOffset++;
} else if (blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) { // block end
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
payloadIter++;
rowOffset++;
} else {
if (pImport->firstKey > pObj->lastKeyOnFile) {
code = vnodeImportStartToCache(pImport, payload, rows);
} else if (pImport->firstKey < pObj->lastKeyOnFile) {
code = vnodeImportStartToFile(pImport, payload, rows);
} else { // firstKey == pObj->lastKeyOnFile
dTrace("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId);
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) ==
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
blockIter.pos)) { // duplicate key
payloadIter++;
continue;
} else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
blockIter.pos)) {
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
payloadIter++;
rowOffset++;
} else {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos,
pObj->schema[col].bytes);
}
blockIter.pos++;
rowOffset++;
}
}
}
if (rowOffset > 0) { // data full in a block to commit
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO : deal with the ERROR here
}
return code;
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
goto _error_merge;
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowOffset = 0;
}
blockIter.slot++;
blockIter.oslot = blockIter.slot;
}
}
}
}
// Write the SCompInfo part
if (vnodeCloseImportFiles(pObj, &importHandle) < 0) {
goto _error_merge;
}
pImport->importedRows += pointsImported;
// TODO: free the allocated memory
tfree(buffer);
tfree(cbuffer);
tfree(importHandle.pHeader);
tfree(importHandle.pBlocks);
tfree(importHandle.pField);
tfree(importHandle.buffer);
tfree(importHandle.temp);
tfree(importHandle.tempBuffer);
return 0;
_error_merge:
tfree(buffer);
tfree(cbuffer);
tfree(importHandle.pHeader);
tfree(importHandle.pBlocks);
tfree(importHandle.pField);
tfree(importHandle.buffer);
tfree(importHandle.temp);
tfree(importHandle.tempBuffer);
close(pVnode->dfd);
pVnode->dfd = 0;
close(pVnode->hfd);
pVnode->hfd = 0;
close(pVnode->lfd);
pVnode->lfd = 0;
if (pVnode->nfd > 0) {
close(pVnode->nfd);
pVnode->nfd = 0;
remove(pVnode->nfn);
}
return -1;
}
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *pNumOfPoints, TSKEY now) {
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
int rows;
char *payload;
int code = TSDB_CODE_ACTION_IN_PROGRESS;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj *pShell = (SShellObj *)param;
int pointsImported = 0;
#define FORWARD_ITER(iter, step, slotLimit, posLimit) \
{ \
if ((iter.pos) + (step) < (posLimit)) { \
(iter.pos) = (iter.pos) + (step); \
} else { \
(iter.pos) = 0; \
(iter.slot) = ((iter.slot) + 1) % (slotLimit); \
} \
}
rows = htons(pSubmit->numOfRows);
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
if (expectedLen != contLen) {
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
expectedLen, contLen);
return TSDB_CODE_WRONG_MSG_SIZE;
int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) {
SCacheInfo *pInfo = (SCacheInfo *)(pMeter->pCache);
int slot = 0;
int pos = 0;
if (pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints == pMeter->pointsPerBlock) {
slot = (pInfo->currentSlot + 1) % (pInfo->maxBlocks);
pos = 0;
} else {
slot = pInfo->currentSlot;
pos = pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
}
return ((iter.slot == slot) && (iter.pos == pos));
}
if (sversion != pObj->sversion) {
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->sversion, sversion);
return TSDB_CODE_OTHERS;
int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int rows) {
SMeterObj * pObj = pImport->pObj;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
int code = -1;
SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
int payloadIter;
SCachePool * pPool = pVnode->pCachePool;
int isCacheIterEnd = 0;
int spayloadIter = 0;
int isAppendData = 0;
int rowsImported = 0;
int totalRows = 0;
size_t size = 0;
SMergeBuffer *pBuffer = NULL;
TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey <= lastKey && firstKey > pObj->lastKeyOnFile);
// TODO: make this condition less strict
if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { // No free room to hold the data
dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->freePoints);
pImport->importedRows = 0;
pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}
payload = pSubmit->payLoad;
TSKEY firstKey = *(TSKEY *)payload;
TSKEY lastKey = *(TSKEY *)(payload + pObj->bytesPerPoint*(rows-1));
int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision];
TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 1;
if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) {
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is out of range, rows:%d firstKey:%lld lastKey:%lld minAllowedKey:%lld maxAllowedKey:%lld",
pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, rows, firstKey, lastKey, minAllowedKey, maxAllowedKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
if (pInfo->numOfBlocks == 0) {
if (vnodeAllocateCacheBlock(pObj) < 0) {
// TODO: deal with the ERROR here
}
}
// forward to peers
if (pShell && pVnode->cfg.replications > 1) {
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_IMPORT, sversion);
if (code != 0) return code;
// Find the first importable record from payload
pImport->lastKey = lastKey;
for (payloadIter = 0; payloadIter < rows; payloadIter++) {
TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
if (key == pObj->lastKey) continue;
if (key > pObj->lastKey) { // Just as insert
pImport->slot = pInfo->currentSlot;
pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints;
isCacheIterEnd = 1;
break;
} else {
pImport->firstKey = key;
if (vnodeFindKeyInCache(pImport, 1) < 0) {
goto _exit;
}
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
if (code != 0) return code;
if (pImport->firstKey != pImport->key) break;
}
}
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now);
if (payloadIter == rows) {
pImport->importedRows = 0;
code = 0;
goto _exit;
}
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += pointsImported;
spayloadIter = payloadIter;
if (pImport->pos == pObj->pointsPerBlock) assert(isCacheIterEnd);
// Allocate a new merge buffer work as buffer
totalRows = pObj->pointsPerBlock + rows - payloadIter + 1;
size = sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns + pObj->bytesPerPoint * totalRows;
pBuffer = (SMergeBuffer *)malloc(size);
if (pBuffer == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate memory, size:%d", pObj->vnode, pObj->sid, pObj->meterId, size);
return code;
}
pBuffer->spos = 0;
pBuffer->epos = 0;
pBuffer->totalRows = totalRows;
pBuffer->offset[0] = (char *)pBuffer + sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns;
for (int col = 1; col < pObj->numOfColumns; col++) {
pBuffer->offset[col] = pBuffer->offset[col - 1] + pObj->schema[col - 1].bytes * totalRows;
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
// TODO: take pImport->pos = pObj->pointsPerBlock into consideration
{ // Do the merge staff
SBlockIter cacheIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to traverse old cache data
SBlockIter writeIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to write data to cache
int availPoints = pObj->pointsPerBlock - pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
assert(availPoints >= 0);
while (1) {
if ((payloadIter >= rows) && isCacheIterEnd) break;
if ((pBuffer->epos + 1) % pBuffer->totalRows == pBuffer->spos) { // merge buffer is full, flush
if (writeIter.pos == pObj->pointsPerBlock) {
writeIter.pos = 0;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
}
while (pBuffer->spos != pBuffer->epos) {
if (writeIter.slot == cacheIter.slot && writeIter.pos == cacheIter.pos) break;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos,
pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
}
if (writeIter.pos + 1 < pObj->pointsPerBlock) {
writeIter.pos++;
} else {
SImportInfo *pNew, import;
pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
writeIter.pos = 0;
}
pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
}
}
if ((payloadIter >= rows) ||
((!isCacheIterEnd) &&
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
cacheIter.pos)))) { // if (payload end || (cacheIter not end && payloadKey > blockKey))
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
pObj->schema[col].bytes);
}
FORWARD_ITER(cacheIter, 1, pInfo->maxBlocks, pObj->pointsPerBlock);
isCacheIterEnd = isCacheEnd(cacheIter, pObj);
} else if ((isCacheIterEnd) ||
((payloadIter < rows) &&
(KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY),
cacheIter.pos)))) { // cacheIter end || (payloadIter not end && payloadKey < blockKey)
if (availPoints == 0) { // Need to allocate a new cache block
pthread_mutex_lock(&(pPool->vmutex));
SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode);
if (pNewBlock == NULL) { // Failed to allocate a new cache block
pthread_mutex_unlock(&(pPool->vmutex));
payloadIter = rows;
code = TSDB_CODE_ACTION_IN_PROGRESS;
pImport->commit = 1;
continue;
}
dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows);
memset(&import, 0, sizeof(import));
import.firstKey = *((TSKEY *)(payload));
import.lastKey = *((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint));
import.pObj = pObj;
import.pShell = pShell;
import.payload = payload;
import.rows = rows;
pNewBlock->pMeterObj = pObj;
pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns;
for (int col = 1; col < pObj->numOfColumns; col++)
pNewBlock->offset[col] = pNewBlock->offset[col - 1] + pObj->schema[col - 1].bytes * pObj->pointsPerBlock;
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
int newSlot = (writeIter.slot + 1) % pInfo->maxBlocks;
pInfo->blocks++;
int tblockId = pInfo->blocks;
int32_t commitInProcess = 0;
if (writeIter.slot != pInfo->currentSlot) {
for (int tslot = pInfo->currentSlot; tslot != writeIter.slot;) {
int nextSlot = (tslot + 1) % pInfo->maxBlocks;
pInfo->cacheBlocks[nextSlot] = pInfo->cacheBlocks[tslot];
pInfo->cacheBlocks[nextSlot]->slot = nextSlot;
pInfo->cacheBlocks[nextSlot]->blockId = tblockId--;
tslot = (tslot - 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
}
}
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
pthread_mutex_unlock(&pPool->vmutex);
int index = pNewBlock->index;
if (cacheIter.slot == writeIter.slot) {
pNewBlock->numOfPoints = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints;
int pointsLeft = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints - cacheIter.pos;
if (pointsLeft > 0) {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy((void *)(pNewBlock->offset[col]),
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
pObj->schema[col].bytes * pointsLeft);
}
}
}
pNewBlock->blockId = tblockId;
pNewBlock->slot = newSlot;
pNewBlock->index = index;
pInfo->cacheBlocks[newSlot] = pNewBlock;
pInfo->numOfBlocks++;
pInfo->unCommittedBlocks++;
pInfo->currentSlot = (pInfo->currentSlot + 1) % pInfo->maxBlocks;
pthread_mutex_unlock(&(pPool->vmutex));
cacheIter.slot = (cacheIter.slot + 1) % pInfo->maxBlocks;
// move a cache of data forward
availPoints = pObj->pointsPerBlock;
}
pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo));
pNew->signature = pNew;
int payloadLen = contLen - sizeof(SSubmitMsg);
pNew->payload = malloc(payloadLen);
pNew->opayload = pNew->payload;
memcpy(pNew->payload, payload, payloadLen);
int offset = 0;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
payload + pObj->bytesPerPoint * payloadIter + offset, pObj->schema[col].bytes);
offset += pObj->schema[col].bytes;
}
if (spayloadIter == payloadIter) {// update pVnode->firstKey
pthread_mutex_lock(&(pVnode->vmutex));
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < pVnode->firstKey) pVnode->firstKey = firstKey;
pthread_mutex_unlock(&(pVnode->vmutex));
}
if (isCacheIterEnd) {
pObj->lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
if (!isAppendData) isAppendData = 1;
}
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->meterId, commitInProcess, pObj->numOfQueries);
rowsImported++;
availPoints--;
payloadIter++;
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0;
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, &import);
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += import.importedRows;
payloadIter++;
continue;
}
pBuffer->epos = (pBuffer->epos + 1) % pBuffer->totalRows;
}
if (pBuffer->spos != pBuffer->epos) {
if (writeIter.pos == pObj->pointsPerBlock) {
writeIter.pos = 0;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
}
while (pBuffer->spos != pBuffer->epos) {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pInfo->cacheBlocks[writeIter.slot]->offset[col] + pObj->schema[col].bytes * writeIter.pos,
pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
}
pVnode->version++;
if (writeIter.pos + 1 < pObj->pointsPerBlock) {
writeIter.pos++;
} else {
pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos + 1;
writeIter.slot = (writeIter.slot + 1) % pInfo->maxBlocks;
writeIter.pos = 0;
}
if (pShell) {
pShell->count--;
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints);
pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
}
return 0;
if (writeIter.pos != 0) pInfo->cacheBlocks[writeIter.slot]->numOfPoints = writeIter.pos;
}
if (isAppendData) {
pthread_mutex_lock(&(pVnode->vmutex));
if (pObj->lastKey > pVnode->lastKey) pVnode->lastKey = pObj->lastKey;
pthread_mutex_unlock(&(pVnode->vmutex));
}
}
pImport->importedRows += rowsImported;
code = 0;
_exit:
tfree(pBuffer);
return code;
}
//todo abort from the procedure if the meter is going to be dropped
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
int vnodeImportDataToFiles(SImportInfo *pImport, char *payload, const int rows) {
int code = 0;
// TODO : Check the correctness of pObj and pVnode
SMeterObj *pObj = (SMeterObj *)(pImport->pObj);
SVnodeObj *pVnode = vnodeList + pObj->vnode;
int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
int sfid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0) / delta;
int efid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1) / delta;
if (pImport->lastKey > pObj->lastKeyOnFile) {
code = vnodeImportWholeToCache(pImport, pImport->payload, pImport->rows);
} else if (pImport->lastKey < pObj->lastKeyOnFile) {
code = vnodeImportWholeToFile(pImport, pImport->payload, pImport->rows);
} else { // lastKey == pObj->lastkeyOnFile
code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows);
for (int fid = sfid; fid <= efid; fid++) {
TSKEY skey = fid * delta;
TSKEY ekey = skey + delta - 1;
int srow = 0, nrows = 0;
if (vnodeSearchKeyInRange(payload, pObj->bytesPerPoint, rows, skey, ekey, &srow, &nrows) < 0) continue;
assert(nrows > 0);
dTrace("vid:%d sid:%d meterId:%s, %d rows of data will be imported to file %d, srow:%d firstKey:%ld lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, nrows, fid, srow, KEY_AT_INDEX(payload, pObj->bytesPerPoint, srow),
KEY_AT_INDEX(payload, pObj->bytesPerPoint, (srow + nrows - 1)));
code = vnodeMergeDataIntoFile(pImport, payload + (srow * pObj->bytesPerPoint), nrows, fid);
if (code != 0) break;
}
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
return code;
}
// TODO : add offset in pShell to make it avoid repeatedly deal with messages
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
int code = 0;
int srow = 0, nrows = 0;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
// 1. import data in range (pObj->lastKeyOnFile, INT64_MAX) into cache
if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, pObj->lastKeyOnFile + 1, INT64_MAX,
&srow, &nrows) >= 0) {
code = vnodeImportDataToCache(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
if (pImport->commit) { // Need to commit now
pPool->commitInProcess = 0;
vnodeProcessCommitTimer(pVnode, NULL);
return code;
}
if (pImport->commit) vnodeProcessCommitTimer(pVnode, NULL);
if (code != 0) return code;
}
// 2. import data (0, pObj->lastKeyOnFile) into files
if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, 0, pObj->lastKeyOnFile - 1, &srow,
&nrows) >= 0) {
code = vnodeImportDataToFiles(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
}
pPool->commitInProcess = 0;
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册