未验证 提交 22fe4b19 编写于 作者: J Jeff Tao 提交者: GitHub

Merge pull request #792 from taosdata/feature/mergeimport

Feature/mergeimport
...@@ -372,13 +372,60 @@ void vnodeCancelCommit(SVnodeObj *pVnode) { ...@@ -372,13 +372,60 @@ void vnodeCancelCommit(SVnodeObj *pVnode) {
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer); taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
} }
/* The vnode cache lock should be hold before calling this interface
*/
SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode) {
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SVnodeCfg *pCfg = &(pVnode->cfg);
SCacheBlock *pCacheBlock = NULL;
int skipped = 0;
while (1) {
pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]);
if (pCacheBlock->blockId == 0) break;
if (pCacheBlock->notFree) {
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
if (skipped > pPool->threshold) {
vnodeCreateCommitThread(pVnode);
pthread_mutex_unlock(&pPool->vmutex);
dError("vid:%d committing process is too slow, notFreeSlots:%d....", pVnode->vnode, pPool->notFreeSlots);
return NULL;
}
} else {
SMeterObj * pRelObj = pCacheBlock->pMeterObj;
SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache;
int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks;
pCacheBlock = pRelInfo->cacheBlocks[firstSlot];
if (pCacheBlock) {
pPool->freeSlot = pCacheBlock->index;
vnodeFreeCacheBlock(pCacheBlock);
break;
} else {
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
}
}
}
pCacheBlock = (SCacheBlock *)(pPool->pMem[pPool->freeSlot]);
pCacheBlock->index = pPool->freeSlot;
pCacheBlock->notFree = 1;
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
pPool->notFreeSlots++;
return pCacheBlock;
}
int vnodeAllocateCacheBlock(SMeterObj *pObj) { int vnodeAllocateCacheBlock(SMeterObj *pObj) {
int index; int index;
SCachePool * pPool; SCachePool * pPool;
SCacheBlock *pCacheBlock; SCacheBlock *pCacheBlock;
SCacheInfo * pInfo; SCacheInfo * pInfo;
SVnodeObj * pVnode; SVnodeObj * pVnode;
int skipped = 0, commit = 0; int commit = 0;
pVnode = vnodeList + pObj->vnode; pVnode = vnodeList + pObj->vnode;
pPool = (SCachePool *)pVnode->pCachePool; pPool = (SCachePool *)pVnode->pCachePool;
...@@ -406,45 +453,10 @@ int vnodeAllocateCacheBlock(SMeterObj *pObj) { ...@@ -406,45 +453,10 @@ int vnodeAllocateCacheBlock(SMeterObj *pObj) {
return -1; return -1;
} }
while (1) { if ((pCacheBlock = vnodeGetFreeCacheBlock(pVnode)) == NULL) return -1;
pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]);
if (pCacheBlock->blockId == 0) break;
if (pCacheBlock->notFree) {
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
if (skipped > pPool->threshold) {
vnodeCreateCommitThread(pVnode);
pthread_mutex_unlock(&pPool->vmutex);
dError("vid:%d sid:%d id:%s, committing process is too slow, notFreeSlots:%d....",
pObj->vnode, pObj->sid, pObj->meterId, pPool->notFreeSlots);
return -1;
}
} else {
SMeterObj *pRelObj = pCacheBlock->pMeterObj;
SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache;
int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks;
pCacheBlock = pRelInfo->cacheBlocks[firstSlot];
if (pCacheBlock) {
pPool->freeSlot = pCacheBlock->index;
vnodeFreeCacheBlock(pCacheBlock);
break;
} else {
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
}
}
}
index = pPool->freeSlot;
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
pPool->notFreeSlots++;
index = pCacheBlock->index;
pCacheBlock->pMeterObj = pObj; pCacheBlock->pMeterObj = pObj;
pCacheBlock->notFree = 1;
pCacheBlock->index = index;
pCacheBlock->offset[0] = ((char *)(pCacheBlock)) + sizeof(SCacheBlock) + pObj->numOfColumns * sizeof(char *); pCacheBlock->offset[0] = ((char *)(pCacheBlock)) + sizeof(SCacheBlock) + pObj->numOfColumns * sizeof(char *);
for (int col = 1; col < pObj->numOfColumns; ++col) for (int col = 1; col < pObj->numOfColumns; ++col)
......
...@@ -95,8 +95,8 @@ void vnodeGetDnameFromLname(char *lhead, char *ldata, char *llast, char *dhead, ...@@ -95,8 +95,8 @@ void vnodeGetDnameFromLname(char *lhead, char *ldata, char *llast, char *dhead,
} }
void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId) { void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId) {
sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId); if (nHeadName != NULL) sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId);
sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId); if (nLastName != NULL) sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId);
} }
void vnodeCreateDataDirIfNeeded(int vnode, char *path) { void vnodeCreateDataDirIfNeeded(int vnode, char *path) {
...@@ -180,29 +180,24 @@ int vnodeCreateEmptyCompFile(int vnode, int fileId) { ...@@ -180,29 +180,24 @@ int vnodeCreateEmptyCompFile(int vnode, int fileId) {
return 0; return 0;
} }
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) {
char name[TSDB_FILENAME_LEN]; int numOfFiles = 0, fileId, filesAdded = 0;
char dHeadName[TSDB_FILENAME_LEN] = "\0"; int vnode = pVnode->vnode;
char dLastName[TSDB_FILENAME_LEN] = "\0"; SVnodeCfg *pCfg = &(pVnode->cfg);
int len = 0;
struct stat filestat;
int vnode = pVnode->vnode;
int fileId, numOfFiles, filesAdded = 0;
SVnodeCfg * pCfg = &pVnode->cfg;
if (pVnode->lastKeyOnFile == 0) { if (pVnode->lastKeyOnFile == 0) {
if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10; if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10;
pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
pVnode->lastKeyOnFile = (int64_t)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1; pVnode->lastKeyOnFile = (int64_t)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1;
pVnode->numOfFiles = 1; pVnode->numOfFiles = 1;
vnodeCreateEmptyCompFile(vnode, pVnode->fileId); if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
} }
numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1; if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1;
dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode,
vnode, pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles); pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
if (numOfFiles >= pVnode->numOfFiles) { if (numOfFiles >= pVnode->numOfFiles) {
// create empty header files backward // create empty header files backward
...@@ -214,7 +209,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { ...@@ -214,7 +209,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
#ifdef CLUSTER #ifdef CLUSTER
return vnodeRecoverFromPeer(pVnode, fileId); return vnodeRecoverFromPeer(pVnode, fileId);
#else #else
return -1; return -1;
#endif #endif
} }
} else if (numOfFiles < 0) { } else if (numOfFiles < 0) {
...@@ -224,7 +219,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { ...@@ -224,7 +219,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
#ifdef CLUSTER #ifdef CLUSTER
return vnodeRecoverFromPeer(pVnode, pVnode->fileId); return vnodeRecoverFromPeer(pVnode, pVnode->fileId);
#else #else
return -1; return -1;
#endif #endif
pVnode->lastKeyOnFile += (int64_t)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; pVnode->lastKeyOnFile += (int64_t)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
filesAdded = 1; filesAdded = 1;
...@@ -238,6 +233,24 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { ...@@ -238,6 +233,24 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
pVnode->commitFileId = fileId; pVnode->commitFileId = fileId;
pVnode->numOfFiles = pVnode->numOfFiles + filesAdded; pVnode->numOfFiles = pVnode->numOfFiles + filesAdded;
return 0;
}
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
char name[TSDB_FILENAME_LEN];
char dHeadName[TSDB_FILENAME_LEN] = "\0";
char dLastName[TSDB_FILENAME_LEN] = "\0";
int len = 0;
struct stat filestat;
int vnode = pVnode->vnode;
int fileId, numOfFiles, filesAdded = 0;
SVnodeCfg * pCfg = &pVnode->cfg;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1;
fileId = pVnode->commitFileId;
dTrace("vid:%d, commit fileId:%d, commitLastKey:%ld, vnodeLastKey:%ld, lastKeyOnFile:%ld numOfFiles:%d", dTrace("vid:%d, commit fileId:%d, commitLastKey:%ld, vnodeLastKey:%ld, lastKeyOnFile:%ld numOfFiles:%d",
vnode, fileId, pVnode->commitLastKey, pVnode->lastKey, pVnode->lastKeyOnFile, pVnode->numOfFiles); vnode, fileId, pVnode->commitLastKey, pVnode->lastKey, pVnode->lastKeyOnFile, pVnode->numOfFiles);
......
...@@ -14,29 +14,26 @@ ...@@ -14,29 +14,26 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include <arpa/inet.h>
#include <fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include "trpc.h"
#include "ttimer.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeMgmt.h"
#include "vnodeShell.h"
#include "vnodeShell.h"
#include "vnodeUtil.h" #include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wpointer-sign"
#pragma GCC diagnostic ignored "-Wint-conversion"
typedef struct {
SCompHeader *headList;
SCompInfo compInfo;
int last; // 0:last block in data file, 1:not the last block
int newBlocks;
int oldNumOfBlocks;
int64_t compInfoOffset; // offset for compInfo in head file
int64_t leftOffset; // copy from this offset to end of head file
int64_t hfdSize; // old head file size
} SHeadInfo;
extern void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId);
extern int vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize,
char *temp, char *buffer, int bufferSize);
extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints);
extern void vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId);
extern int vnodeCreateEmptyCompFile(int vnode, int fileId);
extern int vnodeUpdateFreeSlot(SVnodeObj *pVnode);
extern SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode);
extern int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode);
#define KEY_AT_INDEX(payload, step, idx) (*(TSKEY *)((char *)(payload) + (step) * (idx)))
typedef struct { typedef struct {
void * signature; void * signature;
SShellObj *pShell; SShellObj *pShell;
...@@ -53,952 +50,1507 @@ typedef struct { ...@@ -53,952 +50,1507 @@ typedef struct {
// only for file // only for file
int numOfPoints; int numOfPoints;
int fileId;
int64_t offset; // offset in data file int64_t offset; // offset in data file
SData *sdata[TSDB_MAX_COLUMNS]; char * payload;
char *buffer; char * opayload; // allocated space for payload from client
char *payload;
char *opayload;
int rows; int rows;
} SImportInfo; } SImportInfo;
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport); typedef struct {
// in .head file
SCompHeader *pHeader;
size_t pHeaderSize;
int vnodeGetImportStartPart(SMeterObj *pObj, char *payload, int rows, TSKEY key1) { SCompInfo compInfo;
int i; SCompBlock *pBlocks;
// in .data file
int blockId;
uint8_t blockLoadState;
for (i = 0; i < rows; ++i) { SField *pField;
TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); size_t pFieldSize;
if (key >= key1) break;
}
return i; SData *data[TSDB_MAX_COLUMNS];
} char * buffer;
int vnodeGetImportEndPart(SMeterObj *pObj, char *payload, int rows, char **pStart, TSKEY key0) { char *temp;
int i;
for (i = 0; i < rows; ++i) { char * tempBuffer;
TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint)); size_t tempBufferSize;
if (key > key0) break; // Variables for sendfile
} int64_t compInfoOffset;
int64_t nextNo0Offset; // next sid whose compInfoOffset > 0
int64_t hfSize;
int64_t driftOffset;
*pStart = payload + i * pObj->bytesPerPoint; int oldNumOfBlocks;
return rows - i; int newNumOfBlocks;
} int last;
} SImportHandle;
int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) { typedef struct {
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; int slot;
SVnodeCfg *pCfg = &pVnode->cfg; int pos;
TSCKSUM chksum = 0; int oslot; // old slot
TSKEY nextKey;
} SBlockIter;
if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0; typedef struct {
int64_t spos;
int64_t epos;
int64_t totalRows;
char * offset[];
} SMergeBuffer;
if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM)); int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport);
int leftSize = pHinfo->hfdSize - pHinfo->leftOffset; int vnodeFindKeyInCache(SImportInfo *pImport, int order) {
if (leftSize > 0) { SMeterObj * pObj = pImport->pObj;
lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET); int code = 0;
tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize); SQuery query;
} SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
TSKEY key = order ? pImport->firstKey : pImport->lastKey;
memset(&query, 0, sizeof(query));
query.order.order = order;
query.skey = key;
query.ekey = order ? pImport->lastKey : pImport->firstKey;
vnodeSearchPointInCache(pObj, &query);
pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks; if (query.slot < 0) {
int offset = (pHinfo->compInfo.numOfBlocks - pHinfo->oldNumOfBlocks) * sizeof(SCompBlock); pImport->slot = pInfo->commitSlot;
if (pHinfo->oldNumOfBlocks == 0) offset += sizeof(SCompInfo) + sizeof(TSCKSUM); if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
pImport->key = 0;
dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key);
code = 0;
} else {
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
pHinfo->headList[pObj->sid].compInfoOffset = pHinfo->compInfoOffset; if (key != query.key) {
for (int sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) { if (order == 0) {
if (pHinfo->headList[sid].compInfoOffset) pHinfo->headList[sid].compInfoOffset += offset; // since pos is the position which has smaller key, data shall be imported after it
pImport->pos++;
if (pImport->pos >= pObj->pointsPerBlock) {
pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
}
} else {
if (pImport->pos < 0) pImport->pos = 0;
}
}
code = 0;
} }
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET); return code;
int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM); }
taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize);
twrite(pVnode->nfd, pHinfo->headList, tmsize);
int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock); void vnodeGetValidDataRange(int vnode, TSKEY now, TSKEY *minKey, TSKEY *maxKey) {
char *buffer = malloc(size); SVnodeObj *pVnode = vnodeList + vnode;
lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
read(pVnode->nfd, buffer, size);
SCompBlock *pBlock = (SCompBlock *)(buffer + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock));
pHinfo->compInfo.uid = pObj->uid; int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
pHinfo->compInfo.delimiter = TSDB_VNODE_DELIMITER; int fid = now / delta;
pHinfo->compInfo.last = pBlock->last; *minKey = (fid - pVnode->maxFiles + 1) * delta;
*maxKey = (fid + 2) * delta - 1;
return;
}
taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo)); int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET); int *pNumOfPoints, TSKEY now) {
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo)); SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
int rows = 0;
char * payload = NULL;
int code = TSDB_CODE_SUCCESS;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SShellObj * pShell = (SShellObj *)param;
TSKEY firstKey, lastKey;
chksum = taosCalcChecksum(0, (uint8_t *)buffer, size); payload = pSubmit->payLoad;
lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET);
twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
free(buffer);
vnodeCloseCommitFiles(pVnode); rows = htons(pSubmit->numOfRows);
assert(rows > 0);
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
if (expectedLen != contLen) {
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
expectedLen, contLen);
return TSDB_CODE_WRONG_MSG_SIZE;
}
return 0; // Check timestamp context.
} TSKEY minKey = 0, maxKey = 0;
firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey <= lastKey);
vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey);
if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) {
dError(
"vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld "
"maxAllowedKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
// forward to peers
if (pShell && pVnode->cfg.replications > 1) {
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_IMPORT, sversion);
if (code != 0) return code;
}
int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]) { if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
SMeterObj *pObj = pImport->pObj; if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
SCompBlock lastBlock; if (code != 0) return code;
int code = 0;
if (pHinfo->compInfo.last == 0) return 0;
// read into memory
uint64_t offset =
pHinfo->compInfoOffset + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock) + sizeof(SCompInfo);
lseek(pVnode->hfd, offset, SEEK_SET);
read(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
assert(lastBlock.last);
if (lastBlock.sversion != pObj->sversion) {
lseek(pVnode->lfd, lastBlock.offset, SEEK_SET);
lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END);
tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len);
lastBlock.last = 0;
lseek(pVnode->hfd, offset, SEEK_SET);
twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
} else {
vnodeReadLastBlockToMem(pObj, &lastBlock, data);
pHinfo->compInfo.numOfBlocks--;
code = lastBlock.numOfPoints;
} }
return code; /*
} * The timestamp of all records in a submit payload are always in ascending order, guaranteed by client, so here only
* the first key.
*/
if (firstKey > pObj->lastKey) { // Just call insert
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now);
} else { // trigger import
if (sversion != pObj->sversion) {
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->sversion, sversion);
return TSDB_CODE_OTHERS;
}
// check the table status for perform import historical data
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
return code;
}
SImportInfo import = {0};
int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) { dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld",
SMeterObj *pObj = pImport->pObj; pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey);
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg; import.firstKey = firstKey;
TSKEY firstKey = *((TSKEY *)payload); import.lastKey = lastKey;
struct stat filestat; import.pObj = pObj;
int sid, rowsBefore = 0; import.pShell = pShell;
import.payload = payload;
import.rows = rows;
if (pVnode->nfd <= 0 || firstKey > pVnode->commitLastKey) { // FIXME: mutex here seems meaningless and num here still can be changed
if (pVnode->nfd > 0) vnodeCloseFileForImport(pObj, pHinfo); int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
pVnode->commitFirstKey = firstKey; int32_t commitInProcess = 0;
if (vnodeOpenCommitFiles(pVnode, pObj->sid) < 0) return -1;
fstat(pVnode->hfd, &filestat); pthread_mutex_lock(&pPool->vmutex);
pHinfo->hfdSize = filestat.st_size; if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
pHinfo->newBlocks = 0; // mutual exclusion with read (need to change here)
pHinfo->last = 1; // by default, new blockes are at the end of block list pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
return TSDB_CODE_ACTION_IN_PROGRESS;
lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET); } else {
read(pVnode->hfd, pHinfo->headList, sizeof(SCompHeader) * pCfg->maxSessions); pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
code = vnodeImportData(pObj, &import);
*pNumOfPoints = import.importedRows;
}
pVnode->version++;
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
}
return code;
}
if (pHinfo->headList[pObj->sid].compInfoOffset > 0) { /* Function to search keys in a range
lseek(pVnode->hfd, pHinfo->headList[pObj->sid].compInfoOffset, SEEK_SET); *
if (read(pVnode->hfd, &pHinfo->compInfo, sizeof(SCompInfo)) != sizeof(SCompInfo)) { * Assumption: keys in payload are in ascending order
dError("vid:%d sid:%d, failed to read compInfo from file:%s", pObj->vnode, pObj->sid, pVnode->cfn); *
return -1; * @payload: data records, key in ascending order
} * @step: bytes each record takes
* @rows: number of data records
* @skey: range start (included)
* @ekey: range end (included)
* @srows: rtype, start index of records
* @nrows: rtype, number of records in range
*
* @rtype: 0 means find data in the range
* -1 means find no data in the range
*/
static int vnodeSearchKeyInRange(char *payload, int step, int rows, TSKEY skey, TSKEY ekey, int *srow, int *nrows) {
if (rows <= 0 || KEY_AT_INDEX(payload, step, 0) > ekey || KEY_AT_INDEX(payload, step, rows - 1) < skey || skey > ekey)
return -1;
if (pHinfo->compInfo.uid == pObj->uid) { int left = 0;
pHinfo->compInfoOffset = pHinfo->headList[pObj->sid].compInfoOffset; int right = rows - 1;
pHinfo->leftOffset = pHinfo->headList[pObj->sid].compInfoOffset + sizeof(SCompInfo); int mid;
} else {
pHinfo->headList[pObj->sid].compInfoOffset = 0; // Binary search the first key in payload >= skey
} do {
} mid = (left + right) / 2;
if (skey < KEY_AT_INDEX(payload, step, mid)) {
right = mid;
} else if (skey > KEY_AT_INDEX(payload, step, mid)) {
left = mid + 1;
} else {
break;
}
} while (left < right);
if ( pHinfo->headList[pObj->sid].compInfoOffset == 0 ) { if (skey <= KEY_AT_INDEX(payload, step, mid)) {
memset(&pHinfo->compInfo, 0, sizeof(SCompInfo)); *srow = mid;
pHinfo->compInfo.uid = pObj->uid; } else {
if (mid + 1 >= rows) {
return -1;
} else {
*srow = mid + 1;
}
}
for (sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) assert(skey <= KEY_AT_INDEX(payload, step, *srow));
if (pHinfo->headList[sid].compInfoOffset > 0) break;
pHinfo->compInfoOffset = (sid == pCfg->maxSessions) ? pHinfo->hfdSize : pHinfo->headList[sid].compInfoOffset; *nrows = 0;
pHinfo->leftOffset = pHinfo->compInfoOffset; for (int i = *srow; i < rows; i++) {
if (KEY_AT_INDEX(payload, step, i) <= ekey) {
(*nrows)++;
} else {
break;
} }
}
pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks; if (*nrows == 0) return -1;
lseek(pVnode->hfd, 0, SEEK_SET);
lseek(pVnode->nfd, 0, SEEK_SET);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset);
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR);
if (pVnode->commitFileId < pImport->fileId) { return 0;
if (pHinfo->compInfo.numOfBlocks > 0) }
pHinfo->leftOffset += pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock);
rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); int vnodeOpenMinFilesForImport(int vnode, int fid) {
char dname[TSDB_FILENAME_LEN] = "\0";
SVnodeObj * pVnode = vnodeList + vnode;
struct stat filestat;
int minFileSize;
// copy all existing compBlockInfo minFileSize = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
if (pHinfo->compInfo.numOfBlocks > 0)
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock));
} else if (pVnode->commitFileId == pImport->fileId) { vnodeGetHeadDataLname(pVnode->cfn, dname, pVnode->lfn, vnode, fid);
int slots = pImport->pos ? pImport->slot + 1 : pImport->slot;
pHinfo->leftOffset += slots * sizeof(SCompBlock);
// check if last block is at last file, if it is, read into memory // Open .head file
if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks && pVnode->hfd = open(pVnode->cfn, O_RDONLY);
pHinfo->compInfo.last) { if (pVnode->hfd < 0) {
rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data); dError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
if ( rowsBefore > 0 ) pImport->slot--; taosLogError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
} goto _error_open;
}
// this block will be replaced by new blocks fstat(pVnode->hfd, &filestat);
if (pImport->pos > 0) pHinfo->compInfo.numOfBlocks--; if (filestat.st_size < minFileSize) {
dError("vid:%d, head file:%s is corrupted", vnode, pVnode->cfn);
taosLogError("vid:%d, head file:%s corrupted", vnode, pVnode->cfn);
goto _error_open;
}
if (pImport->slot > 0) { // Open .data file
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET); pVnode->dfd = open(dname, O_RDWR);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock)); if (pVnode->dfd < 0) {
} dError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
taosLogError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
goto _error_open;
}
if (pImport->slot < pHinfo->compInfo.numOfBlocks) fstat(pVnode->dfd, &filestat);
pHinfo->last = 0; // new blocks are not at the end of block list if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
dError("vid:%d, data file:%s corrupted", vnode, dname);
taosLogError("vid:%d, data file:%s corrupted", vnode, dname);
goto _error_open;
}
} else { // Open .last file
// nothing pVnode->lfd = open(pVnode->lfn, O_RDWR);
if (pVnode->lfd < 0) {
dError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
taosLogError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
goto _error_open;
}
pHinfo->last = 0; // new blocks are not at the end of block list fstat(pVnode->lfd, &filestat);
} if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
dError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
taosLogError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
goto _error_open;
} }
return rowsBefore; return 0;
}
_error_open:
if (pVnode->hfd > 0) close(pVnode->hfd);
pVnode->hfd = 0;
extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints); if (pVnode->dfd > 0) close(pVnode->dfd);
int vnodeImportToFile(SImportInfo *pImport); pVnode->dfd = 0;
void vnodeProcessImportTimer(void *param, void *tmrId) { if (pVnode->lfd > 0) close(pVnode->lfd);
SImportInfo *pImport = (SImportInfo *)param; pVnode->lfd = 0;
if (pImport == NULL || pImport->signature != param) {
dError("import timer is messed up, signature:%p", pImport); return -1;
return; }
/* Function to open .t file and sendfile the first part
*/
int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid) {
char dHeadName[TSDB_FILENAME_LEN] = "\0";
SVnodeObj * pVnode = vnodeList + pObj->vnode;
struct stat filestat;
int sid;
// cfn: .head
if (readlink(pVnode->cfn, dHeadName, TSDB_FILENAME_LEN) < 0) return -1;
size_t len = strlen(dHeadName);
// switch head name
switch (dHeadName[len - 1]) {
case '0':
dHeadName[len - 1] = '1';
break;
case '1':
dHeadName[len - 1] = '0';
break;
default:
dError("vid: %d, fid: %d, head target filename not end with 0 or 1", pVnode->vnode, fid);
return -1;
} }
SMeterObj *pObj = pImport->pObj; vnodeGetHeadTname(pVnode->nfn, NULL, pVnode->vnode, fid);
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; if (symlink(dHeadName, pVnode->nfn) < 0) return -1;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj *pShell = pImport->pShell;
pImport->retry++; pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (pVnode->nfd < 0) {
int32_t code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING); dError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
if (code == TSDB_CODE_NOT_ACTIVE_TABLE) { taosLogError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
return; return -1;
} }
int32_t num = 0; fstat(pVnode->hfd, &filestat);
pthread_mutex_lock(&pVnode->vmutex); pHandle->hfSize = filestat.st_size;
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
//if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY // Find the next sid whose compInfoOffset > 0
int32_t commitInProcess = 0; for (sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; sid++) {
pthread_mutex_lock(&pPool->vmutex); if (pHandle->pHeader[sid].compInfoOffset > 0) break;
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) { }
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (pImport->retry < 1000) { pHandle->nextNo0Offset = (sid == pVnode->cfg.maxSessions) ? pHandle->hfSize : pHandle->pHeader[sid].compInfoOffset;
dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
commitInProcess, num, pObj->state);
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); // FIXME: sendfile the original part
return; // TODO: Here, we need to take the deleted table case in consideration, this function
} else { // just assume the case is handled before calling this function
pShell->code = TSDB_CODE_TOO_SLOW; if (pHandle->pHeader[pObj->sid].compInfoOffset > 0) {
} pHandle->compInfoOffset = pHandle->pHeader[pObj->sid].compInfoOffset;
} else { } else {
pPool->commitInProcess = 1; pHandle->compInfoOffset = pHandle->nextNo0Offset;
pthread_mutex_unlock(&pPool->vmutex);
int32_t ret = vnodeImportData(pObj, pImport);
if (pShell) {
pShell->code = ret;
pShell->numOfTotalPoints += pImport->importedRows;
}
} }
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); assert(pHandle->compInfoOffset <= pHandle->hfSize);
pVnode->version++;
// send response back to shell lseek(pVnode->hfd, 0, SEEK_SET);
if (pShell) { lseek(pVnode->nfd, 0, SEEK_SET);
pShell->count--; if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->compInfoOffset) < 0) {
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pImport->pShell, pShell->code, pShell->numOfTotalPoints); return -1;
} }
pImport->signature = NULL; // Leave a SCompInfo space here
free(pImport->opayload); lseek(pVnode->nfd, sizeof(SCompInfo), SEEK_CUR);
free(pImport);
return 0;
} }
int vnodeImportToFile(SImportInfo *pImport) { typedef enum { DATA_LOAD_TIMESTAMP = 0x1, DATA_LOAD_OTHER_DATA = 0x2 } DataLoadMod;
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
SHeadInfo headInfo;
int code = 0, col;
SCompBlock compBlock;
char * payload = pImport->payload;
int rows = pImport->rows;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1)));
TSKEY firstKey = *((TSKEY *)payload);
memset(&headInfo, 0, sizeof(headInfo));
headInfo.headList = malloc(sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM));
SData *cdata[TSDB_MAX_COLUMNS];
char *buffer1 =
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns);
cdata[0] = (SData *)buffer1;
SData *data[TSDB_MAX_COLUMNS]; /* Function to load a block data at the requirement of mod
char *buffer2 = */
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns); static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod, int *code) {
data[0] = (SData *)buffer2; size_t size;
SCompBlock *pBlock = pHandle->pBlocks + blockId;
*code = TSDB_CODE_SUCCESS;
for (col = 1; col < pObj->numOfColumns; ++col) { assert(pBlock->sversion == pObj->sversion);
cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + sizeof(TSCKSUM));
data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + sizeof(TSCKSUM));
}
int rowsBefore = 0; SVnodeObj *pVnode = vnodeList + pObj->vnode;
int rowsRead = 0;
int rowsUnread = 0;
int leftRows = rows; // left number of rows of imported data
int row, rowsToWrite;
int64_t offset[TSDB_MAX_COLUMNS];
if (pImport->pos > 0) { int dfd = pBlock->last ? pVnode->lfd : pVnode->dfd;
for (col = 0; col < pObj->numOfColumns; ++col)
memcpy(data[col]->data, pImport->sdata[col]->data, pImport->pos * pObj->schema[col].bytes);
rowsBefore = pImport->pos; if (pHandle->blockId != blockId) {
rowsRead = pImport->pos; pHandle->blockId = blockId;
rowsUnread = pImport->numOfPoints - pImport->pos; pHandle->blockLoadState = 0;
} }
dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to file, firstKey:%ld lastKey:%ld", if (pHandle->blockLoadState == 0){ // Reload pField
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); size = sizeof(SField) * pBlock->numOfCols + sizeof(TSCKSUM);
do { if (pHandle->pFieldSize < size) {
if (leftRows > 0) { pHandle->pField = (SField *)realloc((void *)(pHandle->pField), size);
code = vnodeOpenFileForImport(pImport, payload, &headInfo, data); if (pHandle->pField == NULL) {
if (code < 0) goto _exit; dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
if (code > 0) { pObj->meterId, size);
rowsBefore = code; *code = TSDB_CODE_SERV_OUT_OF_MEMORY;
code = 0; return -1;
}; }
} else { pHandle->pFieldSize = size;
// if payload is already imported, rows unread shall still be processed
rowsBefore = 0;
} }
int rowsToProcess = pObj->pointsPerFileBlock - rowsBefore; lseek(dfd, pBlock->offset, SEEK_SET);
if (rowsToProcess > leftRows) rowsToProcess = leftRows; if (read(dfd, (void *)(pHandle->pField), pHandle->pFieldSize) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to read data file, size:%ld reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pHandle->pFieldSize, strerror(errno));
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
for (col = 0; col < pObj->numOfColumns; ++col) { if (!taosCheckChecksumWhole((uint8_t *)(pHandle->pField), pHandle->pFieldSize)) {
offset[col] = data[col]->data + rowsBefore * pObj->schema[col].bytes; dError("vid:%d sid:%d meterId:%s, data file %s is broken since checksum mismatch", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->lfn);
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
} }
}
row = 0; { // Allocate necessary buffer
if (leftRows > 0) { size = pObj->bytesPerPoint * pObj->pointsPerFileBlock +
for (row = 0; row < rowsToProcess; ++row) { (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns;
if (*((TSKEY *)payload) > pVnode->commitLastKey) break; if (pHandle->buffer == NULL) {
pHandle->buffer = malloc(size);
if (pHandle->buffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
for (col = 0; col < pObj->numOfColumns; ++col) { // TODO: Init data
memcpy((void *)offset[col], payload, pObj->schema[col].bytes); pHandle->data[0] = (SData *)(pHandle->buffer);
payload += pObj->schema[col].bytes; for (int col = 1; col < pObj->numOfColumns; col++) {
offset[col] += pObj->schema[col].bytes; pHandle->data[col] = (SData *)((char *)(pHandle->data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
} sizeof(TSCKSUM) + pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
} }
} }
leftRows -= row; if (pHandle->temp == NULL) {
rowsToWrite = rowsBefore + row; pHandle->temp = malloc(size);
rowsBefore = 0; if (pHandle->temp == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
if (leftRows == 0 && rowsUnread > 0) { pObj->meterId, size);
// copy the unread *code = TSDB_CODE_SERV_OUT_OF_MEMORY;
int rowsToCopy = pObj->pointsPerFileBlock - rowsToWrite; return -1;
if (rowsToCopy > rowsUnread) rowsToCopy = rowsUnread;
for (col = 0; col < pObj->numOfColumns; ++col) {
int bytes = pObj->schema[col].bytes;
memcpy(data[col]->data + rowsToWrite * bytes, pImport->sdata[col]->data + rowsRead * bytes, rowsToCopy * bytes);
} }
rowsRead += rowsToCopy;
rowsUnread -= rowsToCopy;
rowsToWrite += rowsToCopy;
} }
for (col = 0; col < pObj->numOfColumns; ++col) { if (pHandle->tempBuffer == NULL) {
data[col]->len = rowsToWrite * pObj->schema[col].bytes; pHandle->tempBufferSize = pObj->maxBytes * pObj->pointsPerFileBlock + EXTRA_BYTES + sizeof(TSCKSUM);
pHandle->tempBuffer = malloc(pHandle->tempBufferSize);
if (pHandle->tempBuffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->tempBufferSize);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
} }
}
compBlock.last = headInfo.last; if ((loadMod & DATA_LOAD_TIMESTAMP) &&
vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite); (~(pHandle->blockLoadState & DATA_LOAD_TIMESTAMP))) { // load only timestamp part
twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)); if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX,
pHandle->data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY) * pBlock->numOfPoints,
rowsToWrite = 0; pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize) < 0) {
headInfo.newBlocks++; *code = TSDB_CODE_FILE_CORRUPTED;
return -1;
} while (leftRows > 0 || rowsUnread > 0); }
if (compBlock.keyLast > pObj->lastKeyOnFile) pHandle->blockLoadState |= DATA_LOAD_TIMESTAMP;
pObj->lastKeyOnFile = compBlock.keyLast; }
vnodeCloseFileForImport(pObj, &headInfo); if ((loadMod & DATA_LOAD_OTHER_DATA) && (~(pHandle->blockLoadState & DATA_LOAD_OTHER_DATA))) { // load other columns
dTrace("vid:%d sid:%d id:%s, %d rows data are imported to file", pObj->vnode, pObj->sid, pObj->meterId, rows); for (int col = 1; col < pBlock->numOfCols; col++) {
if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data,
pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer,
pHandle->tempBufferSize) < 0) {
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
}
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; pHandle->blockLoadState |= DATA_LOAD_OTHER_DATA;
pthread_mutex_lock(&pPool->vmutex); }
if (pInfo->numOfBlocks > 0) { return 0;
int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks; }
TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0]));
// data may be in commited cache, cache shall be released static int vnodeCloseImportFiles(SMeterObj *pObj, SImportHandle *pHandle) {
if (lastKey > firstKeyInCache) { SVnodeObj *pVnode = vnodeList + pObj->vnode;
while (slot != pInfo->commitSlot) { char dpath[TSDB_FILENAME_LEN] = "\0";
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; SCompInfo compInfo;
vnodeFreeCacheBlock(pCacheBlock); __off_t offset = 0;
slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
if (pVnode->nfd > 0) {
offset = lseek(pVnode->nfd, 0, SEEK_CUR);
assert(offset == pHandle->nextNo0Offset + pHandle->driftOffset);
{ // Write the SCompInfo part
compInfo.uid = pObj->uid;
compInfo.last = pHandle->last;
compInfo.numOfBlocks = pHandle->newNumOfBlocks + pHandle->oldNumOfBlocks;
compInfo.delimiter = TSDB_VNODE_DELIMITER;
taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo));
lseek(pVnode->nfd, pHandle->compInfoOffset, SEEK_SET);
if (twrite(pVnode->nfd, (void *)(&compInfo), sizeof(SCompInfo)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to wirte SCompInfo, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
strerror(errno));
return -1;
} }
}
// last slot, the uncommitted slots shall be shifted, a cache block may have empty rows // Write the rest of the SCompBlock part
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; if (pHandle->hfSize > pHandle->nextNo0Offset) {
int points = pCacheBlock->numOfPoints - pInfo->commitPoint; lseek(pVnode->nfd, 0, SEEK_END);
if (points > 0) { lseek(pVnode->hfd, pHandle->nextNo0Offset, SEEK_SET);
for (int col = 0; col < pObj->numOfColumns; ++col) { if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->hfSize - pHandle->nextNo0Offset) < 0) {
int size = points * pObj->schema[col].bytes; dError("vid:%d sid:%d meterId:%s, failed to sendfile, size:%ld, reason:%s", pObj->vnode, pObj->sid,
memmove(pCacheBlock->offset[col], pCacheBlock->offset[col] + pObj->schema[col].bytes * pInfo->commitPoint, size); pObj->meterId, pHandle->hfSize - pHandle->nextNo0Offset, strerror(errno));
} return -1;
} }
}
if (pInfo->commitPoint != pObj->pointsPerBlock) { // Write SCompHeader part
// commit point shall be set to 0 if last block is not full pHandle->pHeader[pObj->sid].compInfoOffset = pHandle->compInfoOffset;
pInfo->commitPoint = 0; for (int sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; ++sid) {
pCacheBlock->numOfPoints = points; if (pHandle->pHeader[sid].compInfoOffset > 0) {
if (slot == pInfo->currentSlot) { pHandle->pHeader[sid].compInfoOffset += pHandle->driftOffset;
atomic_fetch_add_32(&pObj->freePoints, pInfo->commitPoint);
}
} else {
// if last block is full and committed
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
if (pCacheBlock->pMeterObj == pObj) {
vnodeFreeCacheBlock(pCacheBlock);
}
} }
} }
taosCalcChecksumAppend(0, (uint8_t *)(pHandle->pHeader), pHandle->pHeaderSize);
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (twrite(pVnode->nfd, (void *)(pHandle->pHeader), pHandle->pHeaderSize) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to wirte SCompHeader part, size:%ld, reason:%s", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->pHeaderSize, strerror(errno));
return -1;
}
} }
if (lastKey > pObj->lastKeyOnFile) pObj->lastKeyOnFile = lastKey; // Close opened files
close(pVnode->dfd);
pVnode->dfd = 0;
pthread_mutex_unlock(&pPool->vmutex); close(pVnode->hfd);
pVnode->hfd = 0;
_exit: close(pVnode->lfd);
tfree(headInfo.headList); pVnode->lfd = 0;
tfree(buffer1);
tfree(buffer2);
tfree(pImport->buffer);
return code; if (pVnode->nfd > 0) {
close(pVnode->nfd);
pVnode->nfd = 0;
readlink(pVnode->cfn, dpath, TSDB_FILENAME_LEN);
rename(pVnode->nfn, pVnode->cfn);
remove(dpath);
}
return 0;
} }
int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) { static void vnodeConvertRowsToCols(SMeterObj *pObj, const char *payload, int rows, SData *data[], int rowOffset) {
SMeterObj *pObj = pImport->pObj; int sdataRow;
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; int offset;
SVnodeCfg *pCfg = &pVnode->cfg;
int code = -1;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
int slot, pos, row, col, points, tpoints;
char *data[TSDB_MAX_COLUMNS], *current[TSDB_MAX_COLUMNS]; for (int row = 0; row < rows; ++row) {
int slots = pInfo->unCommittedBlocks + 1; sdataRow = row + rowOffset;
int trows = slots * pObj->pointsPerBlock + rows; // max rows in buffer offset = 0;
int tsize = (trows / pObj->pointsPerBlock + 1) * pCfg->cacheBlockSize; for (int col = 0; col < pObj->numOfColumns; ++col) {
TSKEY firstKey = *((TSKEY *)payload); memcpy(data[col]->data + sdataRow * pObj->schema[col].bytes, payload + pObj->bytesPerPoint * row + offset,
TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1))); pObj->schema[col].bytes);
if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { offset += pObj->schema[col].bytes;
dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId, }
pObj->freePoints);
pImport->importedRows = 0;
pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
} }
}
assert(rows); static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int rows, int fid) {
dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld", SMeterObj * pObj = (SMeterObj *)(pImport->pObj);
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey); SVnodeObj * pVnode = vnodeList + pObj->vnode;
SImportHandle importHandle;
size_t size = 0;
SData * data[TSDB_MAX_COLUMNS];
char * buffer = NULL;
SData * cdata[TSDB_MAX_COLUMNS];
char * cbuffer = NULL;
SCompBlock compBlock;
TSCKSUM checksum = 0;
int pointsImported = 0;
int code = TSDB_CODE_SUCCESS;
SCachePool * pPool = (SCachePool *)pVnode->pCachePool;
SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
TSKEY lastKeyImported = 0;
TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
TSKEY minFileKey = fid * delta;
TSKEY maxFileKey = minFileKey + delta - 1;
TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey >= minFileKey && firstKey <= maxFileKey && lastKey >= minFileKey && lastKey <= maxFileKey);
// create neccessary files
pVnode->commitFirstKey = firstKey;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return TSDB_CODE_OTHERS;
assert(pVnode->commitFileId == fid);
// Open least files to import .head(hfd) .data(dfd) .last(lfd)
if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return TSDB_CODE_FILE_CORRUPTED;
memset(&importHandle, 0, sizeof(SImportHandle));
{ // Load SCompHeader part from .head file
importHandle.pHeaderSize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
importHandle.pHeader = (SCompHeader *)malloc(importHandle.pHeaderSize);
if (importHandle.pHeader == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, importHandle.pHeaderSize);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
pthread_mutex_lock(&(pVnode->vmutex)); lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (firstKey < pVnode->firstKey) pVnode->firstKey = firstKey; if (read(pVnode->hfd, (void *)(importHandle.pHeader), importHandle.pHeaderSize) < importHandle.pHeaderSize) {
pthread_mutex_unlock(&(pVnode->vmutex)); dError("vid: %d, sid: %d, meterId: %s, fid: %d failed to read SCompHeader part, reason:%s", pObj->vnode,
pObj->sid, pObj->meterId, fid, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
char *buffer = malloc(tsize); // buffer to hold unCommitted data plus import data if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pHeader), importHandle.pHeaderSize)) {
data[0] = buffer; dError("vid: %d, sid: %d, meterId: %s, fid: %d SCompHeader part is broken", pObj->vnode, pObj->sid, pObj->meterId,
current[0] = data[0]; fid);
for (col = 1; col < pObj->numOfColumns; ++col) { code = TSDB_CODE_FILE_CORRUPTED;
data[col] = data[col - 1] + trows * pObj->schema[col - 1].bytes; goto _error_merge;
current[col] = data[col]; }
} }
// write import data into buffer first { // Initialize data[] and cdata[], which is used to hold data to write to data file
for (row = 0; row < rows; ++row) { size = pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns;
for (col = 0; col < pObj->numOfColumns; ++col) {
memcpy(current[col], payload, pObj->schema[col].bytes); buffer = (char *)malloc(size);
payload += pObj->schema[col].bytes; if (buffer == NULL) {
current[col] += pObj->schema[col].bytes; dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
} }
}
// copy the overwritten data into buffer, merge cache blocks cbuffer = (char *)malloc(size);
tpoints = rows; if (cbuffer == NULL) {
pos = pImport->pos; dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
slot = pImport->slot; pObj->meterId, size);
while (1) { code = TSDB_CODE_SERV_OUT_OF_MEMORY;
points = pInfo->cacheBlocks[slot]->numOfPoints - pos; goto _error_merge;
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(current[col], pInfo->cacheBlocks[slot]->offset[col] + pos * pObj->schema[col].bytes, size);
current[col] += size;
} }
pos = 0;
tpoints += points;
if (slot == pInfo->currentSlot) break; data[0] = (SData *)buffer;
slot = (slot + 1) % pInfo->maxBlocks; cdata[0] = (SData *)cbuffer;
}
for (col = 0; col < pObj->numOfColumns; ++col) current[col] = data[col]; for (int col = 1; col < pObj->numOfColumns; col++) {
pos = pImport->pos; data[col] = (SData *)((char *)data[col - 1] + sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM) +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
cdata[col] = (SData *)((char *)cdata[col - 1] + sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM) +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
}
}
// write back to existing slots first if (importHandle.pHeader[pObj->sid].compInfoOffset == 0) { // No data in this file, just write it
slot = pImport->slot; _write_empty_point:
while (1) { if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
points = (tpoints > pObj->pointsPerBlock - pos) ? pObj->pointsPerBlock - pos : tpoints; code = TSDB_CODE_OTHERS;
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot]; goto _error_merge;
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size);
current[col] += size;
} }
pCacheBlock->numOfPoints = points + pos; importHandle.oldNumOfBlocks = 0;
pos = 0; importHandle.driftOffset += sizeof(SCompInfo);
tpoints -= points; lastKeyImported = lastKey;
if (tpoints == 0) { for (int rowsWritten = 0; rowsWritten < rows;) {
// free the rest of cache blocks, since cache blocks are merged int rowsToWrite = MIN(pVnode->cfg.rowsInFileBlock, (rows - rowsWritten) /* the rows left */);
int currentSlot = slot; vnodeConvertRowsToCols(pObj, payload + rowsWritten * pObj->bytesPerPoint, rowsToWrite, data, 0);
while (slot != pInfo->currentSlot) { pointsImported += rowsToWrite;
slot = (slot + 1) % pInfo->maxBlocks;
pCacheBlock = pInfo->cacheBlocks[slot]; compBlock.last = 1;
vnodeFreeCacheBlock(pCacheBlock); if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite) < 0) {
// TODO: deal with ERROR here
} }
pInfo->currentSlot = currentSlot; importHandle.last = compBlock.last;
slot = currentSlot; // make sure to exit from the while loop
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock));
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowsWritten += rowsToWrite;
} }
twrite(pVnode->nfd, &checksum, sizeof(TSCKSUM));
importHandle.driftOffset += sizeof(TSCKSUM);
} else { // Else if there are old data in this file.
{ // load SCompInfo and SCompBlock part
lseek(pVnode->hfd, importHandle.pHeader[pObj->sid].compInfoOffset, SEEK_SET);
if (read(pVnode->hfd, (void *)(&(importHandle.compInfo)), sizeof(SCompInfo)) < sizeof(SCompInfo)) {
dError("vid:%d sid:%d meterId:%s, failed to read .head file, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
if (slot == pInfo->currentSlot) break; if ((importHandle.compInfo.delimiter != TSDB_VNODE_DELIMITER) ||
slot = (slot + 1) % pInfo->maxBlocks; (!taosCheckChecksumWhole((uint8_t *)(&(importHandle.compInfo)), sizeof(SCompInfo)))) {
} dError("vid:%d sid:%d meterId:%s, .head file %s is broken, delemeter:%x", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->cfn, importHandle.compInfo.delimiter);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
// allocate new cache block if there are still data left // Check the context of SCompInfo part
while (tpoints > 0) { if (importHandle.compInfo.uid != pObj->uid) { // The data belongs to the other meter
pImport->commit = vnodeAllocateCacheBlock(pObj); goto _write_empty_point;
if (pImport->commit < 0) goto _exit; }
points = (tpoints > pObj->pointsPerBlock) ? pObj->pointsPerBlock : tpoints;
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[pInfo->currentSlot]; importHandle.oldNumOfBlocks = importHandle.compInfo.numOfBlocks;
for (col = 0; col < pObj->numOfColumns; ++col) { importHandle.last = importHandle.compInfo.last;
int size = points * pObj->schema[col].bytes;
memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size); size = sizeof(SCompBlock) * importHandle.compInfo.numOfBlocks + sizeof(TSCKSUM);
current[col] += size; importHandle.pBlocks = (SCompBlock *)malloc(size);
if (importHandle.pBlocks == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate importHandle.pBlock, size:%ul", pVnode->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
if (read(pVnode->hfd, (void *)(importHandle.pBlocks), size) < size) {
dError("vid:%d sid:%d meterId:%s, failed to read importHandle.pBlock, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pBlocks), size)) {
dError("vid:%d sid:%d meterId:%s, pBlock part is broken in %s", pVnode->vnode, pObj->sid, pObj->meterId,
pVnode->cfn);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
} }
tpoints -= points;
pCacheBlock->numOfPoints = points;
}
code = 0; /* Now we have _payload_, we have _importHandle.pBlocks_, just merge payload into the importHandle.pBlocks
atomic_fetch_sub_32(&pObj->freePoints, rows); *
dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows); * Input: payload, pObj->bytesPerBlock, rows, importHandle.pBlocks
*/
{
int payloadIter = 0;
SBlockIter blockIter = {0, 0, 0, 0};
while (1) {
if (payloadIter >= rows) { // payload end, break
// write the remaining blocks to the file
if (pVnode->nfd > 0) {
int blocksLeft = importHandle.compInfo.numOfBlocks - blockIter.oslot;
if (blocksLeft > 0) {
checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * blocksLeft);
if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * blocksLeft) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * blocksLeft, strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
}
if (twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(TSCKSUM), strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
}
break;
}
_exit: if (blockIter.slot >= importHandle.compInfo.numOfBlocks) { // blocks end, break
free(buffer); // Should never come here
return code; assert(false);
} }
int vnodeFindKeyInFile(SImportInfo *pImport, int order) { TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; { // Binary search the (slot, pos) which is >= key as well as nextKey
int code = -1; int left = blockIter.slot;
SQuery query; int right = importHandle.compInfo.numOfBlocks - 1;
SColumnInfoEx colList[TSDB_MAX_COLUMNS] = {0}; TSKEY minKey = importHandle.pBlocks[left].keyFirst;
TSKEY maxKey = importHandle.pBlocks[right].keyLast;
assert(minKey <= maxKey);
if (key < minKey) { // Case 1. write just ahead the blockIter.slot
blockIter.slot = left;
blockIter.pos = 0;
blockIter.nextKey = minKey;
} else if (key > maxKey) { // Case 2. write to the end
if (importHandle.pBlocks[right].last) { // Case 2.1 last block in .last file, need to merge
assert(importHandle.last != 0);
importHandle.last = 0;
blockIter.slot = right;
blockIter.pos = importHandle.pBlocks[right].numOfPoints;
} else { // Case 2.2 just write after the last block
blockIter.slot = right + 1;
blockIter.pos = 0;
}
blockIter.nextKey = maxFileKey + 1;
} else { // Case 3. need to search the block for slot and pos
if (key == minKey || key == maxKey) {
payloadIter++;
continue;
}
// Here: minKey < key < maxKey
int mid;
TSKEY blockMinKey;
TSKEY blockMaxKey;
// Binary search the slot
do {
mid = (left + right) / 2;
blockMinKey = importHandle.pBlocks[mid].keyFirst;
blockMaxKey = importHandle.pBlocks[mid].keyLast;
assert(blockMinKey <= blockMaxKey);
if (key < blockMinKey) {
right = mid;
} else if (key > blockMaxKey) {
left = mid + 1;
} else { /* blockMinKey <= key <= blockMaxKey */
break;
}
} while (left < right);
if (key == blockMinKey || key == blockMaxKey) { // duplicate key
payloadIter++;
continue;
}
// Get the slot
if (key > blockMaxKey) { /* pos = 0 or pos = ? */
blockIter.slot = mid + 1;
} else { /* key < blockMinKey (pos = 0) || (key > blockMinKey && key < blockMaxKey) (pos=?) */
blockIter.slot = mid;
}
// Get the pos
assert(blockIter.slot < importHandle.compInfo.numOfBlocks);
if (key == importHandle.pBlocks[blockIter.slot].keyFirst ||
key == importHandle.pBlocks[blockIter.slot].keyLast) {
payloadIter++;
continue;
}
assert(key < importHandle.pBlocks[blockIter.slot].keyLast);
/* */
if (key < importHandle.pBlocks[blockIter.slot].keyFirst) {
blockIter.pos = 0;
blockIter.nextKey = importHandle.pBlocks[blockIter.slot].keyFirst;
} else {
SCompBlock *pBlock = importHandle.pBlocks + blockIter.slot;
if (pBlock->sversion != pObj->sversion) { /*TODO*/
}
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP, &code) < 0) {
goto _error_merge;
}
int pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(
importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
assert(pos != 0);
if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) {
payloadIter++;
continue;
}
blockIter.pos = pos;
blockIter.nextKey = (blockIter.slot + 1 < importHandle.compInfo.numOfBlocks)
? importHandle.pBlocks[blockIter.slot + 1].keyFirst
: maxFileKey + 1;
// Need to merge with this block
if (importHandle.pBlocks[blockIter.slot].last) { // this is to merge with the last block
assert((blockIter.slot == (importHandle.compInfo.numOfBlocks - 1)));
importHandle.last = 0;
}
}
}
}
TSKEY key = order ? pImport->firstKey : pImport->lastKey; // Open the new .t file if not opened yet.
memset(&query, 0, sizeof(query)); if (pVnode->nfd <= 0) {
query.order.order = order; if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
query.skey = key; code = TSDB_CODE_OTHERS;
query.ekey = order ? INT64_MAX : 0; goto _error_merge;
query.colList = colList;
query.numOfCols = pObj->numOfColumns;
for (int16_t i = 0; i < pObj->numOfColumns; ++i) {
colList[i].data.colId = pObj->schema[i].colId;
colList[i].data.bytes = pObj->schema[i].bytes;
colList[i].data.type = pObj->schema[i].type;
colList[i].colIdx = i;
colList[i].colIdxInBuf = i;
}
int ret = vnodeSearchPointInFile(pObj, &query);
if (ret >= 0) {
if (query.slot < 0) {
pImport->slot = 0;
pImport->pos = 0;
pImport->key = 0;
pImport->fileId = pVnode->fileId - pVnode->numOfFiles + 1;
dTrace("vid:%d sid:%d id:%s, import to head of file", pObj->vnode, pObj->sid, pObj->meterId);
code = 0;
} else if (query.slot >= 0) {
code = 0;
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
pImport->fileId = query.fileId;
SCompBlock *pBlock = &query.pBlock[query.slot];
pImport->numOfPoints = pBlock->numOfPoints;
if (pImport->key != key) {
if (order == 0) {
pImport->pos++;
if (pImport->pos >= pBlock->numOfPoints) {
pImport->slot++;
pImport->pos = 0;
} }
} else {
if (pImport->pos < 0) pImport->pos = 0;
} }
}
if (pImport->key != key && pImport->pos > 0) { if (blockIter.slot > blockIter.oslot) { // write blocks in range [blockIter.oslot, blockIter.slot) to .t file
if ( pObj->sversion != pBlock->sversion ) { checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
dError("vid:%d sid:%d id:%s, import sversion not matched, expected:%d received:%d", pObj->vnode, pObj->sid, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot));
pObj->meterId, pBlock->sversion, pObj->sversion); if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
code = TSDB_CODE_OTHERS; sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot)) < 0) {
} else { dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pImport->offset = pBlock->offset; pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot),
strerror(errno));
pImport->buffer = code = TSDB_CODE_OTHERS;
malloc(pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + sizeof(SData) * pObj->numOfColumns); goto _error_merge;
pImport->sdata[0] = (SData *)pImport->buffer;
for (int col = 1; col < pObj->numOfColumns; ++col)
pImport->sdata[col] = (SData *)(((char *)pImport->sdata[col - 1]) + sizeof(SData) +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
code = vnodeReadCompBlockToMem(pObj, &query, pImport->sdata);
if (code < 0) {
code = -code;
tfree(pImport->buffer);
} }
blockIter.oslot = blockIter.slot;
} }
}
}
} else {
dError("vid:%d sid:%d id:%s, file is corrupted, import failed", pObj->vnode, pObj->sid, pObj->meterId);
code = -ret;
}
tclose(query.hfd); if (blockIter.pos == 0) { // No need to merge
tclose(query.dfd); // copy payload part to data
tclose(query.lfd); int rowOffset = 0;
vnodeFreeFields(&query); for (; payloadIter < rows; rowOffset++) {
tfree(query.pBlock); if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) break;
return code; vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
} pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
}
int vnodeFindKeyInCache(SImportInfo *pImport, int order) { // write directly to .data file
SMeterObj *pObj = pImport->pObj; compBlock.last = 0;
int code = 0; if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
SQuery query; // TODO: Deal with the ERROR here
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache; }
TSKEY key = order ? pImport->firstKey : pImport->lastKey; checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
memset(&query, 0, sizeof(query)); if (twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)) < 0) {
query.order.order = order; // TODO : deal with the ERROR here
query.skey = key; }
query.ekey = order ? pImport->lastKey : pImport->firstKey; importHandle.newNumOfBlocks++;
vnodeSearchPointInCache(pObj, &query); importHandle.driftOffset += sizeof(SCompBlock);
} else { // Merge block and payload from payloadIter
if (query.slot < 0) { if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot,
pImport->slot = pInfo->commitSlot; DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA, &code) < 0) { // Load neccessary blocks
if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; goto _error_merge;
pImport->pos = 0; }
pImport->key = 0;
dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key);
code = 0;
} else {
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
if (key != query.key) { importHandle.oldNumOfBlocks--;
if (order == 0) { importHandle.driftOffset -= sizeof(SCompBlock);
// since pos is the position which has smaller key, data shall be imported after it
pImport->pos++; int rowOffset = blockIter.pos; // counter for data
if (pImport->pos >= pObj->pointsPerBlock) {
pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks; // Copy the front part
pImport->pos = 0; for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy((void *)(data[col]->data), (void *)(importHandle.data[col]->data),
pObj->schema[col].bytes * blockIter.pos);
}
// Merge part
while (1) {
if (rowOffset >= pVnode->cfg.rowsInFileBlock) { // data full in a block to commit
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO : deal with the ERROR here
}
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
goto _error_merge;
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowOffset = 0;
}
if ((payloadIter >= rows || KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) &&
blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints)
break;
if (payloadIter >= rows ||
KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) { // payload end
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos, pObj->schema[col].bytes);
}
blockIter.pos++;
rowOffset++;
} else if (blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) { // block end
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
rowOffset++;
} else {
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) ==
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
blockIter.pos)) { // duplicate key
payloadIter++;
continue;
} else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
blockIter.pos)) {
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
rowOffset++;
} else {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos,
pObj->schema[col].bytes);
}
blockIter.pos++;
rowOffset++;
}
}
}
if (rowOffset > 0) { // data full in a block to commit
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO : deal with the ERROR here
}
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
goto _error_merge;
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowOffset = 0;
}
blockIter.slot++;
blockIter.oslot = blockIter.slot;
} }
} else {
if (pImport->pos < 0) pImport->pos = 0;
} }
} }
code = 0;
} }
return code; // Write the SCompInfo part
} if (vnodeCloseImportFiles(pObj, &importHandle) < 0) {
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) { pImport->importedRows += pointsImported;
int code = 0;
SMeterObj *pObj = pImport->pObj;
code = vnodeFindKeyInCache(pImport, 1); pthread_mutex_lock(&(pPool->vmutex));
if (code != 0) return code; if (pInfo->numOfBlocks > 0) {
int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0]));
if (pImport->key != pImport->firstKey) { // data may be in commited cache, cache shall be released
rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key); if (lastKeyImported > firstKeyInCache) {
pImport->importedRows = rows; while (slot != pInfo->commitSlot) {
code = vnodeImportToCache(pImport, payload, rows); SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
} else { vnodeFreeCacheBlock(pCacheBlock);
dTrace("vid:%d sid:%d id:%s, data is already imported to cache, firstKey:%lld", pObj->vnode, pObj->sid, pObj->meterId, pImport->firstKey); slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
}
if (pInfo->commitPoint == pObj->pointsPerBlock) {
if (pInfo->cacheBlocks[pInfo->commitSlot]->pMeterObj == pObj) {
vnodeFreeCacheBlock(pInfo->cacheBlocks[pInfo->commitSlot]);
}
}
}
} }
pthread_mutex_unlock(&(pPool->vmutex));
// TODO: free the allocated memory
tfree(buffer);
tfree(cbuffer);
tfree(importHandle.pHeader);
tfree(importHandle.pBlocks);
tfree(importHandle.pField);
tfree(importHandle.buffer);
tfree(importHandle.temp);
tfree(importHandle.tempBuffer);
return code; return code;
}
int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) { _error_merge:
int code = 0; tfree(buffer);
SMeterObj *pObj = pImport->pObj; tfree(cbuffer);
tfree(importHandle.pHeader);
code = vnodeFindKeyInFile(pImport, 1); tfree(importHandle.pBlocks);
if (code != 0) return code; tfree(importHandle.pField);
tfree(importHandle.buffer);
if (pImport->key != pImport->firstKey) { tfree(importHandle.temp);
pImport->payload = payload; tfree(importHandle.tempBuffer);
pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key);
pImport->importedRows = pImport->rows; close(pVnode->dfd);
code = vnodeImportToFile(pImport); pVnode->dfd = 0;
} else {
dTrace("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId); close(pVnode->hfd);
pVnode->hfd = 0;
close(pVnode->lfd);
pVnode->lfd = 0;
if (pVnode->nfd > 0) {
close(pVnode->nfd);
pVnode->nfd = 0;
remove(pVnode->nfn);
} }
return code; return code;
} }
int vnodeImportWholeToFile(SImportInfo *pImport, char *payload, int rows) { #define FORWARD_ITER(iter, step, slotLimit, posLimit) \
int code = 0; { \
SMeterObj *pObj = pImport->pObj; if ((iter.pos) + (step) < (posLimit)) { \
(iter.pos) = (iter.pos) + (step); \
} else { \
(iter.pos) = 0; \
(iter.slot) = ((iter.slot) + 1) % (slotLimit); \
} \
}
code = vnodeFindKeyInFile(pImport, 0); int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) {
if (code != 0) return code; SCacheInfo *pInfo = (SCacheInfo *)(pMeter->pCache);
int slot = 0;
int pos = 0;
if (pImport->key != pImport->lastKey) { if (pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints == pMeter->pointsPerBlock) {
pImport->payload = payload; slot = (pInfo->currentSlot + 1) % (pInfo->maxBlocks);
pImport->rows = vnodeGetImportEndPart(pObj, payload, rows, &pImport->payload, pImport->key); pos = 0;
pImport->importedRows = pImport->rows;
code = vnodeImportToFile(pImport);
} else { } else {
code = vnodeImportStartToFile(pImport, payload, rows); slot = pInfo->currentSlot;
pos = pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
} }
return ((iter.slot == slot) && (iter.pos == pos));
return code;
} }
int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) { static void vnodeFlushMergeBuffer(SMergeBuffer *pBuffer, SBlockIter *pWriteIter, SBlockIter *pCacheIter,
int code = 0; SMeterObj *pObj, SCacheInfo *pInfo, int checkBound) {
SMeterObj *pObj = pImport->pObj; // Function to flush the merge buffer data to cache
if (pWriteIter->pos == pObj->pointsPerBlock) {
pWriteIter->pos = 0;
pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
}
code = vnodeFindKeyInCache(pImport, 0); while (pBuffer->spos != pBuffer->epos) {
if (code != 0) return code; if (checkBound && pWriteIter->slot == pCacheIter->slot && pWriteIter->pos == pCacheIter->pos) break;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pInfo->cacheBlocks[pWriteIter->slot]->offset[col] + pObj->schema[col].bytes * pWriteIter->pos,
pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
}
if (pImport->key != pImport->lastKey) { if (pWriteIter->pos + 1 < pObj->pointsPerBlock) {
char *pStart; (pWriteIter->pos)++;
if ( pImport->key < pObj->lastKeyOnFile ) pImport->key = pObj->lastKeyOnFile; } else {
rows = vnodeGetImportEndPart(pObj, payload, rows, &pStart, pImport->key); pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos + 1;
pImport->importedRows = rows; pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
code = vnodeImportToCache(pImport, pStart, rows); pWriteIter->pos = 0;
} else {
if (pImport->firstKey > pObj->lastKeyOnFile) {
code = vnodeImportStartToCache(pImport, payload, rows);
} else if (pImport->firstKey < pObj->lastKeyOnFile) {
code = vnodeImportStartToFile(pImport, payload, rows);
} else { // firstKey == pObj->lastKeyOnFile
dTrace("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId);
} }
pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
} }
return code; if ((!checkBound) && pWriteIter->pos != 0) {
pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos;
}
} }
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int rows) {
int *pNumOfPoints, TSKEY now) { SMeterObj * pObj = pImport->pObj;
SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SVnodeObj * pVnode = vnodeList + pObj->vnode;
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; int code = -1;
int rows; SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
char *payload; int payloadIter;
int code = TSDB_CODE_ACTION_IN_PROGRESS; SCachePool * pPool = (SCachePool *)(pVnode->pCachePool);
SCachePool *pPool = (SCachePool *)pVnode->pCachePool; int isCacheIterEnd = 0;
SShellObj *pShell = (SShellObj *)param; int spayloadIter = 0;
int pointsImported = 0; int isAppendData = 0;
int rowsImported = 0;
rows = htons(pSubmit->numOfRows); int totalRows = 0;
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); size_t size = 0;
if (expectedLen != contLen) { SMergeBuffer *pBuffer = NULL;
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
expectedLen, contLen); TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
return TSDB_CODE_WRONG_MSG_SIZE; TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey <= lastKey && firstKey > pObj->lastKeyOnFile);
// TODO: make this condition less strict
if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { // No free room to hold the data
dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->freePoints);
pImport->importedRows = 0;
pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
} }
if (sversion != pObj->sversion) { if (pInfo->numOfBlocks == 0) {
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, if (vnodeAllocateCacheBlock(pObj) < 0) {
pObj->sversion, sversion); pImport->importedRows = 0;
return TSDB_CODE_OTHERS; pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}
} }
payload = pSubmit->payLoad; // Find the first importable record from payload
TSKEY firstKey = *(TSKEY *)payload; pImport->lastKey = lastKey;
TSKEY lastKey = *(TSKEY *)(payload + pObj->bytesPerPoint*(rows-1)); for (payloadIter = 0; payloadIter < rows; payloadIter++) {
int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision]; if (key == pObj->lastKey) continue;
TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 1; if (key > pObj->lastKey) { // Just as insert
if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) { pImport->slot = pInfo->currentSlot;
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is out of range, rows:%d firstKey:%lld lastKey:%lld minAllowedKey:%lld maxAllowedKey:%lld", pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints;
pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, rows, firstKey, lastKey, minAllowedKey, maxAllowedKey); isCacheIterEnd = 1;
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; break;
} else {
pImport->firstKey = key;
if (vnodeFindKeyInCache(pImport, 1) < 0) {
goto _exit;
}
if (pImport->firstKey != pImport->key) break;
}
} }
// forward to peers if (payloadIter == rows) {
if (pShell && pVnode->cfg.replications > 1) { pImport->importedRows = 0;
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_IMPORT, sversion); code = 0;
if (code != 0) return code; goto _exit;
} }
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { spayloadIter = payloadIter;
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; if (pImport->pos == pObj->pointsPerBlock) assert(isCacheIterEnd);
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
if (code != 0) return code; // Allocate a new merge buffer work as buffer
totalRows = pObj->pointsPerBlock + rows - payloadIter + 1;
size = sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns + pObj->bytesPerPoint * totalRows;
pBuffer = (SMergeBuffer *)malloc(size);
if (pBuffer == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate memory, size:%d", pObj->vnode, pObj->sid, pObj->meterId, size);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pBuffer->spos = 0;
pBuffer->epos = 0;
pBuffer->totalRows = totalRows;
pBuffer->offset[0] = (char *)pBuffer + sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns;
for (int col = 1; col < pObj->numOfColumns; col++) {
pBuffer->offset[col] = pBuffer->offset[col - 1] + pObj->schema[col - 1].bytes * totalRows;
} }
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { // TODO: take pImport->pos = pObj->pointsPerBlock into consideration
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); { // Do the merge staff
SBlockIter cacheIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to traverse old cache data
SBlockIter writeIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to write data to cache
int availPoints = pObj->pointsPerBlock - pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
if (pShell) { assert(availPoints >= 0);
pShell->code = code;
pShell->numOfTotalPoints += pointsImported;
}
} else {
SImportInfo *pNew, import;
dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows); while (1) {
memset(&import, 0, sizeof(import)); if ((payloadIter >= rows) && isCacheIterEnd) break;
import.firstKey = *((TSKEY *)(payload));
import.lastKey = *((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint));
import.pObj = pObj;
import.pShell = pShell;
import.payload = payload;
import.rows = rows;
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
return code;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
int32_t commitInProcess = 0; if ((pBuffer->epos + 1) % pBuffer->totalRows == pBuffer->spos) { // merge buffer is full, flush
vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 1);
}
pthread_mutex_lock(&pPool->vmutex); TSKEY payloadKey = (payloadIter < rows) ? KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) : INT64_MAX;
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { TSKEY cacheKey = (isCacheIterEnd) ? INT64_MAX : KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), cacheIter.pos);
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); if (cacheKey < payloadKey) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache
for (int col = 0; col < pObj->numOfColumns; col++) {
pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
memcpy(pNew, &import, sizeof(SImportInfo)); pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
pNew->signature = pNew; pObj->schema[col].bytes);
int payloadLen = contLen - sizeof(SSubmitMsg); }
pNew->payload = malloc(payloadLen); FORWARD_ITER(cacheIter, 1, pInfo->maxBlocks, pObj->pointsPerBlock);
pNew->opayload = pNew->payload; isCacheIterEnd = isCacheEnd(cacheIter, pObj);
memcpy(pNew->payload, payload, payloadLen); } else if (cacheKey > payloadKey) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload
if (availPoints == 0) { // Need to allocate a new cache block
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, pthread_mutex_lock(&(pPool->vmutex));
pObj->meterId, commitInProcess, pObj->numOfQueries); // TODO: Need to check if there are enough slots to hold a new one
SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode);
/* if (pNewBlock == NULL) { // Failed to allocate a new cache block, need to commit and loop over the remaining cache records
* vnodeProcessImportTimer will set the import status for this table, so need to pthread_mutex_unlock(&(pPool->vmutex));
* set the import flag here payloadIter = rows;
*/ code = TSDB_CODE_ACTION_IN_PROGRESS;
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); pImport->commit = 1;
return 0; continue;
} else { }
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex); assert(pInfo->numOfBlocks <= pInfo->maxBlocks);
if (pInfo->numOfBlocks == pInfo->maxBlocks) {
int ret = vnodeImportData(pObj, &import); vnodeFreeCacheBlock(pInfo->cacheBlocks[(pInfo->currentSlot + 1) % pInfo->maxBlocks]);
if (pShell) { }
pShell->code = ret;
pShell->numOfTotalPoints += import.importedRows; pNewBlock->pMeterObj = pObj;
pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns;
for (int col = 1; col < pObj->numOfColumns; col++)
pNewBlock->offset[col] = pNewBlock->offset[col - 1] + pObj->schema[col - 1].bytes * pObj->pointsPerBlock;
int newSlot = (writeIter.slot + 1) % pInfo->maxBlocks;
pInfo->blocks++;
int tblockId = pInfo->blocks;
if (writeIter.slot != pInfo->currentSlot) {
for (int tslot = pInfo->currentSlot; tslot != writeIter.slot;) {
int nextSlot = (tslot + 1) % pInfo->maxBlocks;
pInfo->cacheBlocks[nextSlot] = pInfo->cacheBlocks[tslot];
pInfo->cacheBlocks[nextSlot]->slot = nextSlot;
pInfo->cacheBlocks[nextSlot]->blockId = tblockId--;
tslot = (tslot - 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
}
}
int index = pNewBlock->index;
if (cacheIter.slot == writeIter.slot) {
pNewBlock->numOfPoints = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints;
int pointsLeft = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints - cacheIter.pos;
if (pointsLeft > 0) {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy((void *)(pNewBlock->offset[col] + pObj->schema[col].bytes*cacheIter.pos),
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
pObj->schema[col].bytes * pointsLeft);
}
}
}
pNewBlock->blockId = tblockId;
pNewBlock->slot = newSlot;
pNewBlock->index = index;
pInfo->cacheBlocks[newSlot] = pNewBlock;
pInfo->numOfBlocks++;
pInfo->unCommittedBlocks++;
pInfo->currentSlot = (pInfo->currentSlot + 1) % pInfo->maxBlocks;
pthread_mutex_unlock(&(pPool->vmutex));
cacheIter.slot = (cacheIter.slot + 1) % pInfo->maxBlocks;
// move a cache of data forward
availPoints = pObj->pointsPerBlock;
}
int offset = 0;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
payload + pObj->bytesPerPoint * payloadIter + offset, pObj->schema[col].bytes);
offset += pObj->schema[col].bytes;
}
if (spayloadIter == payloadIter) {// update pVnode->firstKey
pthread_mutex_lock(&(pVnode->vmutex));
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < pVnode->firstKey) pVnode->firstKey = firstKey;
pthread_mutex_unlock(&(pVnode->vmutex));
}
if (isCacheIterEnd) {
pObj->lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
if (!isAppendData) isAppendData = 1;
}
rowsImported++;
availPoints--;
payloadIter++;
} else {
payloadIter++;
continue;
} }
pBuffer->epos = (pBuffer->epos + 1) % pBuffer->totalRows;
}
if (pBuffer->spos != pBuffer->epos) { // Flush the remaining data in the merge buffer
vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 0);
} else {
// Should never come here
assert(false);
}
if (isAppendData) {
pthread_mutex_lock(&(pVnode->vmutex));
if (pObj->lastKey > pVnode->lastKey) pVnode->lastKey = pObj->lastKey;
pthread_mutex_unlock(&(pVnode->vmutex));
} }
} }
pImport->importedRows += rowsImported;
atomic_fetch_sub_32(&(pObj->freePoints), rowsImported);
code = TSDB_CODE_SUCCESS;
_exit:
tfree(pBuffer);
return code;
}
int vnodeImportDataToFiles(SImportInfo *pImport, char *payload, const int rows) {
int code = 0;
// TODO : Check the correctness of pObj and pVnode
SMeterObj *pObj = (SMeterObj *)(pImport->pObj);
SVnodeObj *pVnode = vnodeList + pObj->vnode;
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
pVnode->version++; int sfid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0) / delta;
int efid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1) / delta;
if (pShell) { for (int fid = sfid; fid <= efid; fid++) {
pShell->count--; TSKEY skey = fid * delta;
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints); TSKEY ekey = skey + delta - 1;
int srow = 0, nrows = 0;
if (vnodeSearchKeyInRange(payload, pObj->bytesPerPoint, rows, skey, ekey, &srow, &nrows) < 0) continue;
assert(nrows > 0);
dTrace("vid:%d sid:%d meterId:%s, %d rows of data will be imported to file %d, srow:%d firstKey:%ld lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, nrows, fid, srow, KEY_AT_INDEX(payload, pObj->bytesPerPoint, srow),
KEY_AT_INDEX(payload, pObj->bytesPerPoint, (srow + nrows - 1)));
code = vnodeMergeDataIntoFile(pImport, payload + (srow * pObj->bytesPerPoint), nrows, fid);
if (code != TSDB_CODE_SUCCESS) break;
} }
return 0; return code;
} }
//todo abort from the procedure if the meter is going to be dropped // TODO : add offset in pShell to make it avoid repeatedly deal with messages
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
int code = 0; int code = 0;
int srow = 0, nrows = 0;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
// 1. import data in range (pObj->lastKeyOnFile, INT64_MAX) into cache
if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, pObj->lastKeyOnFile + 1, INT64_MAX,
&srow, &nrows) >= 0) {
assert(nrows > 0);
code = vnodeImportDataToCache(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
if (pImport->commit) { // Need to commit now
pPool->commitInProcess = 0;
vnodeProcessCommitTimer(pVnode, NULL);
return code;
}
if (pImport->lastKey > pObj->lastKeyOnFile) { if (code != TSDB_CODE_SUCCESS) return code;
code = vnodeImportWholeToCache(pImport, pImport->payload, pImport->rows);
} else if (pImport->lastKey < pObj->lastKeyOnFile) {
code = vnodeImportWholeToFile(pImport, pImport->payload, pImport->rows);
} else { // lastKey == pObj->lastkeyOnFile
code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows);
} }
SVnodeObj *pVnode = &vnodeList[pObj->vnode]; // 2. import data (0, pObj->lastKeyOnFile) into files
SCachePool *pPool = (SCachePool *)pVnode->pCachePool; if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, 0, pObj->lastKeyOnFile - 1, &srow,
pPool->commitInProcess = 0; &nrows) >= 0) {
assert(nrows > 0);
code = vnodeImportDataToFiles(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
}
if (pImport->commit) vnodeProcessCommitTimer(pVnode, NULL); pPool->commitInProcess = 0;
return code; return code;
} }
...@@ -584,12 +584,12 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -584,12 +584,12 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion); code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion);
if (code != 0) return code; if (code != TSDB_CODE_SUCCESS) return code;
} }
if (source == TSDB_DATA_SOURCE_SHELL && pVnode->cfg.replications > 1) { if (source == TSDB_DATA_SOURCE_SHELL && pVnode->cfg.replications > 1) {
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_INSERT, sversion); code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_INSERT, sversion);
if (code != 0) return code; if (code != TSDB_CODE_SUCCESS) return code;
} }
if (pObj->sversion < sversion) { if (pObj->sversion < sversion) {
...@@ -601,11 +601,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -601,11 +601,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
pData = pSubmit->payLoad; pData = pSubmit->payLoad;
code = TSDB_CODE_SUCCESS;
TSKEY firstKey = *((TSKEY *)pData); TSKEY firstKey = *((TSKEY *)pData);
TSKEY lastKey = *((TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1))); TSKEY lastKey = *((TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1)));
int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision]; TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision];
TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 2; TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 2;
if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) { if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) {
...@@ -619,7 +619,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -619,7 +619,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} }
for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion
if (pObj->state >= TSDB_METER_STATE_DELETING) { if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId, dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state); pObj->state);
...@@ -648,6 +648,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -648,6 +648,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pData += pObj->bytesPerPoint; pData += pObj->bytesPerPoint;
points++; points++;
} }
atomic_fetch_add_64(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1)); atomic_fetch_add_64(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1));
atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint); atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint);
...@@ -660,6 +661,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi ...@@ -660,6 +661,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pVnode->version++; pVnode->version++;
pthread_mutex_unlock(&(pVnode->vmutex)); pthread_mutex_unlock(&(pVnode->vmutex));
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
_over: _over:
......
...@@ -39,10 +39,21 @@ SShellObj **shellList = NULL; ...@@ -39,10 +39,21 @@ SShellObj **shellList = NULL;
int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj); int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj); int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj); int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj);
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId);
int vnodeSelectReqNum = 0; int vnodeSelectReqNum = 0;
int vnodeInsertReqNum = 0; int vnodeInsertReqNum = 0;
typedef struct {
int32_t import;
int32_t vnode;
int32_t numOfSid;
int32_t ssid; // Start sid
SShellObj *pObj;
int64_t offset; // offset relative the blks
char blks[];
} SBatchSubmitInfo;
void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
int sid, vnode; int sid, vnode;
SShellObj *pObj = (SShellObj *)ahandle; SShellObj *pObj = (SShellObj *)ahandle;
...@@ -249,6 +260,7 @@ int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints) { ...@@ -249,6 +260,7 @@ int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints) {
char *pMsg, *pStart; char *pMsg, *pStart;
int msgLen; int msgLen;
dTrace("code:%d numOfTotalPoints:%d", code, numOfPoints);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128); pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128);
if (pStart == NULL) return -1; if (pStart == NULL) return -1;
pMsg = pStart; pMsg = pStart;
...@@ -280,6 +292,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -280,6 +292,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
} }
if (pQueryMsg->numOfSids <= 0) { if (pQueryMsg->numOfSids <= 0) {
dError("Invalid number of meters to query, numOfSids:%d", pQueryMsg->numOfSids);
code = TSDB_CODE_INVALID_QUERY_MSG; code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over; goto _query_over;
} }
...@@ -485,10 +498,83 @@ int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -485,10 +498,83 @@ int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) {
return msgLen; return msgLen;
} }
static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *pVnode) {
int32_t sid = htonl(pBlocks->sid);
uint64_t uid = htobe64(pBlocks->uid);
if (sid >= pVnode->cfg.maxSessions || sid <= 0) {
dError("sid:%d is out of range", sid);
return TSDB_CODE_INVALID_TABLE_ID;
}
SMeterObj *pMeterObj = pVnode->meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, not active table", pVnode->vnode, sid);
vnodeSendMeterCfgMsg(pVnode->vnode, sid);
return TSDB_CODE_NOT_ACTIVE_TABLE;
}
if (pMeterObj->uid != uid) {
dError("vid:%d sid:%d, meterId:%s, uid:%lld, uid in msg:%lld, uid mismatch", pVnode->vnode, sid, pMeterObj->meterId,
pMeterObj->uid, uid);
return TSDB_CODE_INVALID_SUBMIT_MSG;
}
return TSDB_CODE_SUCCESS;
}
static int vnodeDoSubmitJob(SVnodeObj *pVnode, int import, int32_t *ssid, int32_t esid, SShellSubmitBlock **ppBlocks,
TSKEY now, SShellObj *pObj) {
SShellSubmitBlock *pBlocks = *ppBlocks;
int code = TSDB_CODE_SUCCESS;
int32_t numOfPoints = 0;
int32_t i = 0;
for (i = *ssid; i < esid; i++) {
numOfPoints = 0;
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
if (import) {
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
// records for one table should be consecutive located in the payload buffer, which is guaranteed by client
if (code == TSDB_CODE_SUCCESS) {
pObj->count--;
}
} else {
code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
}
if (code != TSDB_CODE_SUCCESS) break;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
}
*ssid = i;
*ppBlocks = pBlocks;
return code;
}
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int code = 0, ret = 0; int code = 0, ret = 0;
int32_t i = 0;
SShellSubmitMsg shellSubmit = *(SShellSubmitMsg *)pMsg; SShellSubmitMsg shellSubmit = *(SShellSubmitMsg *)pMsg;
SShellSubmitMsg *pSubmit = &shellSubmit; SShellSubmitMsg *pSubmit = &shellSubmit;
SShellSubmitBlock *pBlocks = NULL;
pSubmit->vnode = htons(pSubmit->vnode); pSubmit->vnode = htons(pSubmit->vnode);
pSubmit->numOfSid = htonl(pSubmit->numOfSid); pSubmit->numOfSid = htonl(pSubmit->numOfSid);
...@@ -526,67 +612,69 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -526,67 +612,69 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
pObj->count = pSubmit->numOfSid; // for import pObj->count = pSubmit->numOfSid; // for import
pObj->code = 0; // for import pObj->code = 0; // for import
pObj->numOfTotalPoints = 0; // for import pObj->numOfTotalPoints = 0;
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
int32_t numOfPoints = 0;
int32_t numOfTotalPoints = 0;
// We take current time here to avoid it in the for loop.
TSKEY now = taosGetTimestamp(pVnode->cfg.precision); TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
for (int32_t i = 0; i < pSubmit->numOfSid; ++i) { pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
numOfPoints = 0; i = 0;
code = vnodeDoSubmitJob(pVnode, pSubmit->import, &i, pSubmit->numOfSid, &pBlocks, now, pObj);
pBlocks->sid = htonl(pBlocks->sid);
pBlocks->uid = htobe64(pBlocks->uid);
if (pBlocks->sid >= pVnode->cfg.maxSessions || pBlocks->sid <= 0) { _submit_over:
dTrace("sid:%d is out of range", pBlocks->sid); ret = 0;
code = TSDB_CODE_INVALID_TABLE_ID; if (pSubmit->import) { // Import case
goto _submit_over; if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
SBatchSubmitInfo *pSubmitInfo =
(SBatchSubmitInfo *)calloc(1, sizeof(SBatchSubmitInfo) + msgLen - sizeof(SShellSubmitMsg));
if (pSubmitInfo == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
} else { // Start a timer to process the next part of request
pSubmitInfo->import = 1;
pSubmitInfo->vnode = pSubmit->vnode;
pSubmitInfo->numOfSid = pSubmit->numOfSid;
pSubmitInfo->ssid = i; // start from this position, not the initial position
pSubmitInfo->pObj = pObj;
pSubmitInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
assert(pSubmitInfo->offset >= 0);
memcpy((void *)(pSubmitInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg));
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
}
} else {
if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0);
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
} }
} else { // Insert case
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
}
int vnode = pSubmit->vnode; atomic_fetch_add_32(&vnodeInsertReqNum, 1);
int sid = pBlocks->sid; return ret;
}
SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active table", vnode, sid);
vnodeSendMeterCfgMsg(vnode, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _submit_over;
}
if (pMeterObj->uid != pBlocks->uid) { static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) {
dError("vid:%d sid:%d, meterId:%s, uid:%lld, uid in msg:%lld, uid mismatch", vnode, sid, pMeterObj->meterId, SBatchSubmitInfo *pSubmitInfo = (SBatchSubmitInfo *)param;
pMeterObj->uid, pBlocks->uid); assert(pSubmitInfo != NULL && pSubmitInfo->import);
code = TSDB_CODE_INVALID_SUBMIT_MSG;
goto _submit_over;
}
// dont include sid, vid int32_t i = 0;
int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; int32_t code = TSDB_CODE_SUCCESS;
int sversion = htonl(pBlocks->sversion);
if (pSubmit->import) { SShellObj * pShell = pSubmitInfo->pObj;
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, SVnodeObj * pVnode = &vnodeList[pSubmitInfo->vnode];
sversion, &numOfPoints, now); SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pSubmitInfo->blks + pSubmitInfo->offset);
} else { TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, i = pSubmitInfo->ssid;
sversion, &numOfPoints, now);
}
if (code != TSDB_CODE_SUCCESS) {break;} code = vnodeDoSubmitJob(pVnode, pSubmitInfo->import, &i, pSubmitInfo->numOfSid, &pBlocks, now, pShell);
numOfTotalPoints += numOfPoints; if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + pSubmitInfo->ssid = i;
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); pSubmitInfo->offset = ((char *)pBlocks) - pSubmitInfo->blks;
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
} else {
if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0);
tfree(param);
vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints);
} }
_submit_over:
// for import, send the submit response only when return code is not zero
if (pSubmit->import == 0 || code != 0) ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints);
atomic_fetch_add_32(&vnodeInsertReqNum, 1);
return ret;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册