提交 0d963c5a 编写于 作者: H hjxilinx

[jira none]

......@@ -56,6 +56,7 @@ static int32_t tscToInteger(SSQLToken *pToken, int64_t *value, char **endPtr) {
radix = 2;
}
errno = 0;
*value = strtoll(pToken->z, endPtr, radix);
return numType;
......@@ -66,6 +67,8 @@ static int32_t tscToDouble(SSQLToken *pToken, double *value, char **endPtr) {
if (TK_ILLEGAL == numType) {
return numType;
}
errno = 0;
*value = strtod(pToken->z, endPtr);
return numType;
}
......
......@@ -2546,6 +2546,10 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
}
}else if (type == SHOW_VNODES) {
if (NULL == pInfo->pDCLInfo) {
return invalidSqlErrMsg(pCmd, "No specified ip of dnode");
}
// show vnodes may be ip addr of dnode in payload
if (pInfo->pDCLInfo->nTokens > 0) {
SSQLToken* pDnodeIp = &pInfo->pDCLInfo->a[0];
......
......@@ -64,8 +64,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
#ifdef CLUSTER
if (ip && ip[0]) {
strcpy(tscMgmtIpList.ipstr[0], ip);
tscMgmtIpList.ip[0] = inet_addr(ip);
strcpy(tscMgmtIpList.ipstr[1], ip);
tscMgmtIpList.ip[1] = inet_addr(ip);
}
#else
if (ip && ip[0]) {
......
......@@ -68,6 +68,8 @@
#define HTTP_COMPRESS_IDENTITY 0
#define HTTP_COMPRESS_GZIP 2
#define HTTP_SESSION_ID_LEN (TSDB_USER_LEN * 2 + 1)
typedef enum {
HTTP_CONTEXT_STATE_READY,
HTTP_CONTEXT_STATE_HANDLING,
......@@ -83,7 +85,7 @@ typedef struct {
int expire;
int access;
void *taos;
char id[TSDB_USER_LEN];
char id[HTTP_SESSION_ID_LEN + 1];
} HttpSession;
typedef enum {
......
......@@ -50,6 +50,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
return false;
}
strncpy(pContext->user, base64, (size_t)user_len);
pContext->user[user_len] = 0;
char *password = user + 1;
int pass_len = (int)((base64 + outlen) - password);
......@@ -60,6 +61,7 @@ bool httpParseBasicAuthToken(HttpContext *pContext, char *token, int len) {
return false;
}
strncpy(pContext->pass, password, (size_t)pass_len);
pContext->pass[pass_len] = 0;
free(base64);
httpTrace("context:%p, fd:%d, ip:%s, basic token parsed success, user:%s", pContext, pContext->fd, pContext->ipstr,
......
......@@ -69,7 +69,7 @@ char* httpMsg[] = {
"field value type should be number or string",
"field value is null", // 51
"parse basic auth token error",
"parse taosd auth token error",
"parse http auth token error",
"host type should be string",
// grafana
......
......@@ -41,8 +41,8 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
pthread_mutex_lock(&server->serverMutex);
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%s:%p expired", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->id, pContext->session->taos);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, set exist session:%p:%p expired", pContext, pContext->fd,
pContext->ipstr, pContext->user, pContext->session, pContext->session->taos);
pContext->session->expire = 0;
pContext->session->access--;
}
......@@ -51,7 +51,7 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
session.taos = taos;
session.expire = (int)taosGetTimestampSec() + server->sessionExpire;
session.access = 1;
strcpy(session.id, pContext->user);
snprintf(session.id, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = (HttpSession *)taosAddStrHash(server->pSessionHash, session.id, (char *)(&session));
if (pContext->session == NULL) {
httpError("context:%p, fd:%d, ip:%s, user:%s, error:%s", pContext, pContext->fd, pContext->ipstr, pContext->user,
......@@ -62,20 +62,23 @@ void httpCreateSession(HttpContext *pContext, void *taos) {
}
pContext->session->signature = pContext->session;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%s:%p", pContext, pContext->fd, pContext->ipstr,
pContext->user, pContext->session, pContext->session->id, pContext->session->taos);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, create a new session:%p:%p", pContext, pContext->fd, pContext->ipstr,
pContext->user, pContext->session, pContext->session->taos);
pthread_mutex_unlock(&server->serverMutex);
}
void httpFetchSession(HttpContext *pContext) {
void httpFetchSessionImp(HttpContext *pContext) {
HttpServer *server = pContext->pThread->pServer;
pthread_mutex_lock(&server->serverMutex);
pContext->session = (HttpSession *)taosGetStrHashData(server->pSessionHash, pContext->user);
char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
pContext->session = (HttpSession *)taosGetStrHashData(server->pSessionHash, sessionId);
if (pContext->session != NULL && pContext->session == pContext->session->signature) {
pContext->session->access++;
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%s:%p, access:%d, expire:%d",
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session, pContext->session->id,
httpTrace("context:%p, fd:%d, ip:%s, user:%s, find an exist session:%p:%p, access:%d, expire:%d",
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session,
pContext->session->taos, pContext->session->access, pContext->session->expire);
pContext->session->expire = (int)taosGetTimestampSec() + server->sessionExpire;
} else {
......@@ -86,6 +89,20 @@ void httpFetchSession(HttpContext *pContext) {
pthread_mutex_unlock(&server->serverMutex);
}
void httpFetchSession(HttpContext *pContext) {
if (pContext->session == NULL) {
httpFetchSessionImp(pContext);
} else {
char sessionId[HTTP_SESSION_ID_LEN];
snprintf(sessionId, HTTP_SESSION_ID_LEN, "%s.%s", pContext->user, pContext->pass);
if (strcmp(pContext->session->id, sessionId) != 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, password may be changed", pContext, pContext->fd, pContext->ipstr, pContext->user);
httpRestoreSession(pContext);
httpFetchSessionImp(pContext);
}
}
}
void httpRestoreSession(HttpContext *pContext) {
HttpServer * server = pContext->pThread->pServer;
......@@ -97,15 +114,16 @@ void httpRestoreSession(HttpContext *pContext) {
return;
}
session->access--;
httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%s:%p, access:%d, expire:%d",
pContext, pContext->ipstr, pContext->user, session, session->id, session->taos,
httpTrace("context:%p, ip:%s, user:%s, restore session:%p:%p, access:%d, expire:%d",
pContext, pContext->ipstr, pContext->user, session, session->taos,
session->access, pContext->session->expire);
pContext->session = NULL;
pthread_mutex_unlock(&server->serverMutex);
}
void httpResetSession(char *session) {
HttpSession *pSession = (HttpSession *)session;
httpTrace("close session:%p:%s:%p", pSession, pSession->id, pSession->taos);
httpTrace("close session:%p:%p", pSession, pSession->taos);
if (pSession->taos != NULL) {
taos_close(pSession->taos);
pSession->taos = NULL;
......@@ -144,12 +162,12 @@ int httpSessionExpired(char *session) {
return 0; // un-expired, so return false
}
if (pSession->access > 0) {
httpTrace("session:%p:%s:%p is expired, but still access:%d", pSession, pSession->id, pSession->taos,
httpTrace("session:%p:%p is expired, but still access:%d", pSession, pSession->taos,
pSession->access);
return 0; // still used, so return false
}
httpTrace("need close session:%p:%s:%p for it expired, cur:%d, expire:%d, invertal:%d",
pSession, pSession->id, pSession->taos, cur, pSession->expire, cur - pSession->expire);
httpTrace("need close session:%p:%p for it expired, cur:%d, expire:%d, invertal:%d",
pSession, pSession->taos, cur, pSession->expire, cur - pSession->expire);
}
return 1;
......
......@@ -378,9 +378,7 @@ void httpProcessRequestCb(void *param, TAOS_RES *result, int code) {
}
void httpProcessRequest(HttpContext *pContext) {
if (pContext->session == NULL) {
httpFetchSession(pContext);
}
httpFetchSession(pContext);
if (pContext->session == NULL || pContext->session != pContext->session->signature ||
pContext->reqType == HTTP_REQTYPE_LOGIN) {
......
......@@ -476,6 +476,8 @@ int mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
continue;
}
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(uint32_t *)pWrite = pVnode->vnode;
cols++;
......
......@@ -660,7 +660,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
pMeter->uid = (((uint64_t)pMeter->gid.vgId) << 40) + ((((uint64_t)pMeter->gid.sid) & ((1ul << 24) - 1ul)) << 16) +
((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%ld db:%s",
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%llu db:%s",
pMeter->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pMeter->uid, pDb->name);
} else {
pMeter->uid = (((uint64_t)pMeter->createdTime) << 16) + ((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
......
......@@ -372,13 +372,60 @@ void vnodeCancelCommit(SVnodeObj *pVnode) {
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
}
/* The vnode cache lock should be hold before calling this interface
*/
SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode) {
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SVnodeCfg *pCfg = &(pVnode->cfg);
SCacheBlock *pCacheBlock = NULL;
int skipped = 0;
while (1) {
pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]);
if (pCacheBlock->blockId == 0) break;
if (pCacheBlock->notFree) {
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
if (skipped > pPool->threshold) {
vnodeCreateCommitThread(pVnode);
pthread_mutex_unlock(&pPool->vmutex);
dError("vid:%d committing process is too slow, notFreeSlots:%d....", pVnode->vnode, pPool->notFreeSlots);
return NULL;
}
} else {
SMeterObj * pRelObj = pCacheBlock->pMeterObj;
SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache;
int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks;
pCacheBlock = pRelInfo->cacheBlocks[firstSlot];
if (pCacheBlock) {
pPool->freeSlot = pCacheBlock->index;
vnodeFreeCacheBlock(pCacheBlock);
break;
} else {
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
}
}
}
pCacheBlock = (SCacheBlock *)(pPool->pMem[pPool->freeSlot]);
pCacheBlock->index = pPool->freeSlot;
pCacheBlock->notFree = 1;
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
pPool->notFreeSlots++;
return pCacheBlock;
}
int vnodeAllocateCacheBlock(SMeterObj *pObj) {
int index;
SCachePool * pPool;
SCacheBlock *pCacheBlock;
SCacheInfo * pInfo;
SVnodeObj * pVnode;
int skipped = 0, commit = 0;
int commit = 0;
pVnode = vnodeList + pObj->vnode;
pPool = (SCachePool *)pVnode->pCachePool;
......@@ -406,45 +453,10 @@ int vnodeAllocateCacheBlock(SMeterObj *pObj) {
return -1;
}
while (1) {
pCacheBlock = (SCacheBlock *)(pPool->pMem[((int64_t)pPool->freeSlot)]);
if (pCacheBlock->blockId == 0) break;
if (pCacheBlock->notFree) {
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
if (skipped > pPool->threshold) {
vnodeCreateCommitThread(pVnode);
pthread_mutex_unlock(&pPool->vmutex);
dError("vid:%d sid:%d id:%s, committing process is too slow, notFreeSlots:%d....",
pObj->vnode, pObj->sid, pObj->meterId, pPool->notFreeSlots);
return -1;
}
} else {
SMeterObj *pRelObj = pCacheBlock->pMeterObj;
SCacheInfo *pRelInfo = (SCacheInfo *)pRelObj->pCache;
int firstSlot = (pRelInfo->currentSlot - pRelInfo->numOfBlocks + 1 + pRelInfo->maxBlocks) % pRelInfo->maxBlocks;
pCacheBlock = pRelInfo->cacheBlocks[firstSlot];
if (pCacheBlock) {
pPool->freeSlot = pCacheBlock->index;
vnodeFreeCacheBlock(pCacheBlock);
break;
} else {
pPool->freeSlot = (pPool->freeSlot + 1) % pCfg->cacheNumOfBlocks.totalBlocks;
skipped++;
}
}
}
index = pPool->freeSlot;
pPool->freeSlot++;
pPool->freeSlot = pPool->freeSlot % pCfg->cacheNumOfBlocks.totalBlocks;
pPool->notFreeSlots++;
if ((pCacheBlock = vnodeGetFreeCacheBlock(pVnode)) == NULL) return -1;
index = pCacheBlock->index;
pCacheBlock->pMeterObj = pObj;
pCacheBlock->notFree = 1;
pCacheBlock->index = index;
pCacheBlock->offset[0] = ((char *)(pCacheBlock)) + sizeof(SCacheBlock) + pObj->numOfColumns * sizeof(char *);
for (int col = 1; col < pObj->numOfColumns; ++col)
......
......@@ -95,8 +95,8 @@ void vnodeGetDnameFromLname(char *lhead, char *ldata, char *llast, char *dhead,
}
void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId) {
sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId);
sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId);
if (nHeadName != NULL) sprintf(nHeadName, "%s/vnode%d/db/v%df%d.t", tsDirectory, vnode, vnode, fileId);
if (nLastName != NULL) sprintf(nLastName, "%s/vnode%d/db/v%df%d.l", tsDirectory, vnode, vnode, fileId);
}
void vnodeCreateDataDirIfNeeded(int vnode, char *path) {
......@@ -180,29 +180,24 @@ int vnodeCreateEmptyCompFile(int vnode, int fileId) {
return 0;
}
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
char name[TSDB_FILENAME_LEN];
char dHeadName[TSDB_FILENAME_LEN] = "\0";
char dLastName[TSDB_FILENAME_LEN] = "\0";
int len = 0;
struct stat filestat;
int vnode = pVnode->vnode;
int fileId, numOfFiles, filesAdded = 0;
SVnodeCfg * pCfg = &pVnode->cfg;
int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) {
int numOfFiles = 0, fileId, filesAdded = 0;
int vnode = pVnode->vnode;
SVnodeCfg *pCfg = &(pVnode->cfg);
if (pVnode->lastKeyOnFile == 0) {
if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10;
pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
pVnode->lastKeyOnFile = (int64_t)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1;
pVnode->numOfFiles = 1;
vnodeCreateEmptyCompFile(vnode, pVnode->fileId);
if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
}
numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1;
dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d",
vnode, pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode,
pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
if (numOfFiles >= pVnode->numOfFiles) {
// create empty header files backward
......@@ -214,7 +209,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
#ifdef CLUSTER
return vnodeRecoverFromPeer(pVnode, fileId);
#else
return -1;
return -1;
#endif
}
} else if (numOfFiles < 0) {
......@@ -224,7 +219,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
#ifdef CLUSTER
return vnodeRecoverFromPeer(pVnode, pVnode->fileId);
#else
return -1;
return -1;
#endif
pVnode->lastKeyOnFile += (int64_t)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
filesAdded = 1;
......@@ -238,6 +233,24 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
pVnode->commitFileId = fileId;
pVnode->numOfFiles = pVnode->numOfFiles + filesAdded;
return 0;
}
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
char name[TSDB_FILENAME_LEN];
char dHeadName[TSDB_FILENAME_LEN] = "\0";
char dLastName[TSDB_FILENAME_LEN] = "\0";
int len = 0;
struct stat filestat;
int vnode = pVnode->vnode;
int fileId, numOfFiles, filesAdded = 0;
SVnodeCfg * pCfg = &pVnode->cfg;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1;
fileId = pVnode->commitFileId;
dTrace("vid:%d, commit fileId:%d, commitLastKey:%ld, vnodeLastKey:%ld, lastKeyOnFile:%ld numOfFiles:%d",
vnode, fileId, pVnode->commitLastKey, pVnode->lastKey, pVnode->lastKeyOnFile, pVnode->numOfFiles);
......@@ -1817,7 +1830,15 @@ int vnodeInitFile(int vnode) {
pVnode->fmagic = (uint64_t *)calloc(pVnode->maxFiles + 1, sizeof(uint64_t));
int fileId = pVnode->fileId;
for (int i = 0; i < pVnode->numOfFiles; ++i) {
/*
* The actual files will far exceed the files that need to exist
*/
if (pVnode->numOfFiles > pVnode->maxFiles) {
dError("vid:%d numOfFiles:%d should not larger than maxFiles:%d", vnode, pVnode->numOfFiles, pVnode->maxFiles);
}
int numOfFiles = MIN(pVnode->numOfFiles, pVnode->maxFiles);
for (int i = 0; i < numOfFiles; ++i) {
if (vnodeUpdateFileMagic(vnode, fileId) < 0) {
if (pVnode->cfg.replications > 1) {
pVnode->badFileId = fileId;
......
......@@ -16,27 +16,20 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "trpc.h"
#include "ttimer.h"
#include "vnode.h"
#include "vnodeMgmt.h"
#include "vnodeShell.h"
#include "vnodeShell.h"
#include "vnodeUtil.h"
#pragma GCC diagnostic ignored "-Wpointer-sign"
#pragma GCC diagnostic ignored "-Wint-conversion"
typedef struct {
SCompHeader *headList;
SCompInfo compInfo;
int last; // 0:last block in data file, 1:not the last block
int newBlocks;
int oldNumOfBlocks;
int64_t compInfoOffset; // offset for compInfo in head file
int64_t leftOffset; // copy from this offset to end of head file
int64_t hfdSize; // old head file size
} SHeadInfo;
extern void vnodeGetHeadTname(char *nHeadName, char *nLastName, int vnode, int fileId);
extern int vnodeReadColumnToMem(int fd, SCompBlock *pBlock, SField **fields, int col, char *data, int dataSize,
char *temp, char *buffer, int bufferSize);
extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints);
extern void vnodeGetHeadDataLname(char *headName, char *dataName, char *lastName, int vnode, int fileId);
extern int vnodeCreateEmptyCompFile(int vnode, int fileId);
extern int vnodeUpdateFreeSlot(SVnodeObj *pVnode);
extern SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode);
extern int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode);
#define KEY_AT_INDEX(payload, step, idx) (*(TSKEY *)((char *)(payload) + (step) * (idx)))
typedef struct {
void * signature;
SShellObj *pShell;
......@@ -53,952 +46,1507 @@ typedef struct {
// only for file
int numOfPoints;
int fileId;
int64_t offset; // offset in data file
SData *sdata[TSDB_MAX_COLUMNS];
char *buffer;
char *payload;
char *opayload;
char * payload;
char * opayload; // allocated space for payload from client
int rows;
} SImportInfo;
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport);
typedef struct {
// in .head file
SCompHeader *pHeader;
size_t pHeaderSize;
int vnodeGetImportStartPart(SMeterObj *pObj, char *payload, int rows, TSKEY key1) {
int i;
SCompInfo compInfo;
SCompBlock *pBlocks;
// in .data file
int blockId;
uint8_t blockLoadState;
for (i = 0; i < rows; ++i) {
TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint));
if (key >= key1) break;
}
SField *pField;
size_t pFieldSize;
return i;
}
SData *data[TSDB_MAX_COLUMNS];
char * buffer;
int vnodeGetImportEndPart(SMeterObj *pObj, char *payload, int rows, char **pStart, TSKEY key0) {
int i;
char *temp;
for (i = 0; i < rows; ++i) {
TSKEY key = *((TSKEY *)(payload + i * pObj->bytesPerPoint));
if (key > key0) break;
}
char * tempBuffer;
size_t tempBufferSize;
// Variables for sendfile
int64_t compInfoOffset;
int64_t nextNo0Offset; // next sid whose compInfoOffset > 0
int64_t hfSize;
int64_t driftOffset;
*pStart = payload + i * pObj->bytesPerPoint;
return rows - i;
}
int oldNumOfBlocks;
int newNumOfBlocks;
int last;
} SImportHandle;
int vnodeCloseFileForImport(SMeterObj *pObj, SHeadInfo *pHinfo) {
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
TSCKSUM chksum = 0;
typedef struct {
int slot;
int pos;
int oslot; // old slot
TSKEY nextKey;
} SBlockIter;
if (pHinfo->newBlocks == 0 || pHinfo->compInfoOffset == 0) return 0;
typedef struct {
int64_t spos;
int64_t epos;
int64_t totalRows;
char * offset[];
} SMergeBuffer;
if (pHinfo->oldNumOfBlocks == 0) twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport);
int leftSize = pHinfo->hfdSize - pHinfo->leftOffset;
if (leftSize > 0) {
lseek(pVnode->hfd, pHinfo->leftOffset, SEEK_SET);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, leftSize);
}
int vnodeFindKeyInCache(SImportInfo *pImport, int order) {
SMeterObj * pObj = pImport->pObj;
int code = 0;
SQuery query;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
TSKEY key = order ? pImport->firstKey : pImport->lastKey;
memset(&query, 0, sizeof(query));
query.order.order = order;
query.skey = key;
query.ekey = order ? pImport->lastKey : pImport->firstKey;
vnodeSearchPointInCache(pObj, &query);
pHinfo->compInfo.numOfBlocks += pHinfo->newBlocks;
int offset = (pHinfo->compInfo.numOfBlocks - pHinfo->oldNumOfBlocks) * sizeof(SCompBlock);
if (pHinfo->oldNumOfBlocks == 0) offset += sizeof(SCompInfo) + sizeof(TSCKSUM);
if (query.slot < 0) {
pImport->slot = pInfo->commitSlot;
if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
pImport->key = 0;
dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key);
code = 0;
} else {
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
pHinfo->headList[pObj->sid].compInfoOffset = pHinfo->compInfoOffset;
for (int sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid) {
if (pHinfo->headList[sid].compInfoOffset) pHinfo->headList[sid].compInfoOffset += offset;
if (key != query.key) {
if (order == 0) {
// since pos is the position which has smaller key, data shall be imported after it
pImport->pos++;
if (pImport->pos >= pObj->pointsPerBlock) {
pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
}
} else {
if (pImport->pos < 0) pImport->pos = 0;
}
}
code = 0;
}
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
int tmsize = sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)pHinfo->headList, tmsize);
twrite(pVnode->nfd, pHinfo->headList, tmsize);
return code;
}
int size = pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock);
char *buffer = malloc(size);
lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
read(pVnode->nfd, buffer, size);
SCompBlock *pBlock = (SCompBlock *)(buffer + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock));
void vnodeGetValidDataRange(int vnode, TSKEY now, TSKEY *minKey, TSKEY *maxKey) {
SVnodeObj *pVnode = vnodeList + vnode;
pHinfo->compInfo.uid = pObj->uid;
pHinfo->compInfo.delimiter = TSDB_VNODE_DELIMITER;
pHinfo->compInfo.last = pBlock->last;
int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
int fid = now / delta;
*minKey = (fid - pVnode->maxFiles + 1) * delta;
*maxKey = (fid + 2) * delta - 1;
return;
}
taosCalcChecksumAppend(0, (uint8_t *)(&pHinfo->compInfo), sizeof(SCompInfo));
lseek(pVnode->nfd, pHinfo->compInfoOffset, SEEK_SET);
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *pNumOfPoints, TSKEY now) {
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
int rows = 0;
char * payload = NULL;
int code = TSDB_CODE_SUCCESS;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SShellObj * pShell = (SShellObj *)param;
TSKEY firstKey, lastKey;
chksum = taosCalcChecksum(0, (uint8_t *)buffer, size);
lseek(pVnode->nfd, pHinfo->compInfoOffset + sizeof(SCompInfo) + size, SEEK_SET);
twrite(pVnode->nfd, &chksum, sizeof(TSCKSUM));
free(buffer);
payload = pSubmit->payLoad;
vnodeCloseCommitFiles(pVnode);
rows = htons(pSubmit->numOfRows);
assert(rows > 0);
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
if (expectedLen != contLen) {
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
expectedLen, contLen);
return TSDB_CODE_WRONG_MSG_SIZE;
}
return 0;
}
// Check timestamp context.
TSKEY minKey = 0, maxKey = 0;
firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey <= lastKey);
vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey);
if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) {
dError(
"vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld "
"maxAllowedKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
// forward to peers
if (pShell && pVnode->cfg.replications > 1) {
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_IMPORT, sversion);
if (code != 0) return code;
}
int vnodeProcessLastBlock(SImportInfo *pImport, SHeadInfo *pHinfo, SData *data[]) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SCompBlock lastBlock;
int code = 0;
if (pHinfo->compInfo.last == 0) return 0;
// read into memory
uint64_t offset =
pHinfo->compInfoOffset + (pHinfo->compInfo.numOfBlocks - 1) * sizeof(SCompBlock) + sizeof(SCompInfo);
lseek(pVnode->hfd, offset, SEEK_SET);
read(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
assert(lastBlock.last);
if (lastBlock.sversion != pObj->sversion) {
lseek(pVnode->lfd, lastBlock.offset, SEEK_SET);
lastBlock.offset = lseek(pVnode->dfd, 0, SEEK_END);
tsendfile(pVnode->dfd, pVnode->lfd, NULL, lastBlock.len);
lastBlock.last = 0;
lseek(pVnode->hfd, offset, SEEK_SET);
twrite(pVnode->hfd, &lastBlock, sizeof(SCompBlock));
} else {
vnodeReadLastBlockToMem(pObj, &lastBlock, data);
pHinfo->compInfo.numOfBlocks--;
code = lastBlock.numOfPoints;
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
if (code != 0) return code;
}
return code;
}
/*
* The timestamp of all records in a submit payload are always in ascending order, guaranteed by client, so here only
* the first key.
*/
if (firstKey > pObj->lastKey) { // Just call insert
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now);
} else { // trigger import
if (sversion != pObj->sversion) {
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->sversion, sversion);
return TSDB_CODE_OTHERS;
}
// check the table status for perform import historical data
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
return code;
}
SImportInfo import = {0};
int vnodeOpenFileForImport(SImportInfo *pImport, char *payload, SHeadInfo *pHinfo, SData *data[]) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
TSKEY firstKey = *((TSKEY *)payload);
struct stat filestat;
int sid, rowsBefore = 0;
dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey);
import.firstKey = firstKey;
import.lastKey = lastKey;
import.pObj = pObj;
import.pShell = pShell;
import.payload = payload;
import.rows = rows;
if (pVnode->nfd <= 0 || firstKey > pVnode->commitLastKey) {
if (pVnode->nfd > 0) vnodeCloseFileForImport(pObj, pHinfo);
// FIXME: mutex here seems meaningless and num here still can be changed
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
pVnode->commitFirstKey = firstKey;
if (vnodeOpenCommitFiles(pVnode, pObj->sid) < 0) return -1;
int32_t commitInProcess = 0;
fstat(pVnode->hfd, &filestat);
pHinfo->hfdSize = filestat.st_size;
pHinfo->newBlocks = 0;
pHinfo->last = 1; // by default, new blockes are at the end of block list
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
// mutual exclusion with read (need to change here)
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
return TSDB_CODE_ACTION_IN_PROGRESS;
lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
read(pVnode->hfd, pHinfo->headList, sizeof(SCompHeader) * pCfg->maxSessions);
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
code = vnodeImportData(pObj, &import);
*pNumOfPoints = import.importedRows;
}
pVnode->version++;
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
}
return code;
}
if (pHinfo->headList[pObj->sid].compInfoOffset > 0) {
lseek(pVnode->hfd, pHinfo->headList[pObj->sid].compInfoOffset, SEEK_SET);
if (read(pVnode->hfd, &pHinfo->compInfo, sizeof(SCompInfo)) != sizeof(SCompInfo)) {
dError("vid:%d sid:%d, failed to read compInfo from file:%s", pObj->vnode, pObj->sid, pVnode->cfn);
return -1;
}
/* Function to search keys in a range
*
* Assumption: keys in payload are in ascending order
*
* @payload: data records, key in ascending order
* @step: bytes each record takes
* @rows: number of data records
* @skey: range start (included)
* @ekey: range end (included)
* @srows: rtype, start index of records
* @nrows: rtype, number of records in range
*
* @rtype: 0 means find data in the range
* -1 means find no data in the range
*/
static int vnodeSearchKeyInRange(char *payload, int step, int rows, TSKEY skey, TSKEY ekey, int *srow, int *nrows) {
if (rows <= 0 || KEY_AT_INDEX(payload, step, 0) > ekey || KEY_AT_INDEX(payload, step, rows - 1) < skey || skey > ekey)
return -1;
if (pHinfo->compInfo.uid == pObj->uid) {
pHinfo->compInfoOffset = pHinfo->headList[pObj->sid].compInfoOffset;
pHinfo->leftOffset = pHinfo->headList[pObj->sid].compInfoOffset + sizeof(SCompInfo);
} else {
pHinfo->headList[pObj->sid].compInfoOffset = 0;
}
}
int left = 0;
int right = rows - 1;
int mid;
// Binary search the first key in payload >= skey
do {
mid = (left + right) / 2;
if (skey < KEY_AT_INDEX(payload, step, mid)) {
right = mid;
} else if (skey > KEY_AT_INDEX(payload, step, mid)) {
left = mid + 1;
} else {
break;
}
} while (left < right);
if ( pHinfo->headList[pObj->sid].compInfoOffset == 0 ) {
memset(&pHinfo->compInfo, 0, sizeof(SCompInfo));
pHinfo->compInfo.uid = pObj->uid;
if (skey <= KEY_AT_INDEX(payload, step, mid)) {
*srow = mid;
} else {
if (mid + 1 >= rows) {
return -1;
} else {
*srow = mid + 1;
}
}
for (sid = pObj->sid + 1; sid < pCfg->maxSessions; ++sid)
if (pHinfo->headList[sid].compInfoOffset > 0) break;
assert(skey <= KEY_AT_INDEX(payload, step, *srow));
pHinfo->compInfoOffset = (sid == pCfg->maxSessions) ? pHinfo->hfdSize : pHinfo->headList[sid].compInfoOffset;
pHinfo->leftOffset = pHinfo->compInfoOffset;
*nrows = 0;
for (int i = *srow; i < rows; i++) {
if (KEY_AT_INDEX(payload, step, i) <= ekey) {
(*nrows)++;
} else {
break;
}
}
pHinfo->oldNumOfBlocks = pHinfo->compInfo.numOfBlocks;
lseek(pVnode->hfd, 0, SEEK_SET);
lseek(pVnode->nfd, 0, SEEK_SET);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfoOffset);
twrite(pVnode->nfd, &pHinfo->compInfo, sizeof(SCompInfo));
if (pHinfo->headList[pObj->sid].compInfoOffset > 0) lseek(pVnode->hfd, sizeof(SCompInfo), SEEK_CUR);
if (*nrows == 0) return -1;
if (pVnode->commitFileId < pImport->fileId) {
if (pHinfo->compInfo.numOfBlocks > 0)
pHinfo->leftOffset += pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock);
return 0;
}
rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data);
int vnodeOpenMinFilesForImport(int vnode, int fid) {
char dname[TSDB_FILENAME_LEN] = "\0";
SVnodeObj * pVnode = vnodeList + vnode;
struct stat filestat;
int minFileSize;
// copy all existing compBlockInfo
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
if (pHinfo->compInfo.numOfBlocks > 0)
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHinfo->compInfo.numOfBlocks * sizeof(SCompBlock));
minFileSize = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
} else if (pVnode->commitFileId == pImport->fileId) {
int slots = pImport->pos ? pImport->slot + 1 : pImport->slot;
pHinfo->leftOffset += slots * sizeof(SCompBlock);
vnodeGetHeadDataLname(pVnode->cfn, dname, pVnode->lfn, vnode, fid);
// check if last block is at last file, if it is, read into memory
if (pImport->pos == 0 && pHinfo->compInfo.numOfBlocks > 0 && pImport->slot == pHinfo->compInfo.numOfBlocks &&
pHinfo->compInfo.last) {
rowsBefore = vnodeProcessLastBlock(pImport, pHinfo, data);
if ( rowsBefore > 0 ) pImport->slot--;
}
// Open .head file
pVnode->hfd = open(pVnode->cfn, O_RDONLY);
if (pVnode->hfd < 0) {
dError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
taosLogError("vid:%d, failed to open head file:%s, reason:%s", vnode, pVnode->cfn, strerror(errno));
goto _error_open;
}
// this block will be replaced by new blocks
if (pImport->pos > 0) pHinfo->compInfo.numOfBlocks--;
fstat(pVnode->hfd, &filestat);
if (filestat.st_size < minFileSize) {
dError("vid:%d, head file:%s is corrupted", vnode, pVnode->cfn);
taosLogError("vid:%d, head file:%s corrupted", vnode, pVnode->cfn);
goto _error_open;
}
if (pImport->slot > 0) {
lseek(pVnode->hfd, pHinfo->compInfoOffset + sizeof(SCompInfo), SEEK_SET);
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pImport->slot * sizeof(SCompBlock));
}
// Open .data file
pVnode->dfd = open(dname, O_RDWR);
if (pVnode->dfd < 0) {
dError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
taosLogError("vid:%d, failed to open data file:%s, reason:%s", vnode, dname, strerror(errno));
goto _error_open;
}
if (pImport->slot < pHinfo->compInfo.numOfBlocks)
pHinfo->last = 0; // new blocks are not at the end of block list
fstat(pVnode->dfd, &filestat);
if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
dError("vid:%d, data file:%s corrupted", vnode, dname);
taosLogError("vid:%d, data file:%s corrupted", vnode, dname);
goto _error_open;
}
} else {
// nothing
// Open .last file
pVnode->lfd = open(pVnode->lfn, O_RDWR);
if (pVnode->lfd < 0) {
dError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
taosLogError("vid:%d, failed to open last file:%s, reason:%s", vnode, pVnode->lfn, strerror(errno));
goto _error_open;
}
pHinfo->last = 0; // new blocks are not at the end of block list
}
fstat(pVnode->lfd, &filestat);
if (filestat.st_size < TSDB_FILE_HEADER_LEN) {
dError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
taosLogError("vid:%d, last file:%s corrupted", vnode, pVnode->lfn);
goto _error_open;
}
return rowsBefore;
}
return 0;
_error_open:
if (pVnode->hfd > 0) close(pVnode->hfd);
pVnode->hfd = 0;
extern int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints);
int vnodeImportToFile(SImportInfo *pImport);
if (pVnode->dfd > 0) close(pVnode->dfd);
pVnode->dfd = 0;
void vnodeProcessImportTimer(void *param, void *tmrId) {
SImportInfo *pImport = (SImportInfo *)param;
if (pImport == NULL || pImport->signature != param) {
dError("import timer is messed up, signature:%p", pImport);
return;
if (pVnode->lfd > 0) close(pVnode->lfd);
pVnode->lfd = 0;
return -1;
}
/* Function to open .t file and sendfile the first part
*/
int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid) {
char dHeadName[TSDB_FILENAME_LEN] = "\0";
SVnodeObj * pVnode = vnodeList + pObj->vnode;
struct stat filestat;
int sid;
// cfn: .head
if (readlink(pVnode->cfn, dHeadName, TSDB_FILENAME_LEN) < 0) return -1;
size_t len = strlen(dHeadName);
// switch head name
switch (dHeadName[len - 1]) {
case '0':
dHeadName[len - 1] = '1';
break;
case '1':
dHeadName[len - 1] = '0';
break;
default:
dError("vid: %d, fid: %d, head target filename not end with 0 or 1", pVnode->vnode, fid);
return -1;
}
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj *pShell = pImport->pShell;
vnodeGetHeadTname(pVnode->nfn, NULL, pVnode->vnode, fid);
if (symlink(dHeadName, pVnode->nfn) < 0) return -1;
pImport->retry++;
int32_t code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING);
if (code == TSDB_CODE_NOT_ACTIVE_TABLE) {
return;
pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (pVnode->nfd < 0) {
dError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
taosLogError("vid:%d, failed to open new head file:%s, reason:%s", pVnode->vnode, pVnode->nfn, strerror(errno));
return -1;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
fstat(pVnode->hfd, &filestat);
pHandle->hfSize = filestat.st_size;
//if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
// Find the next sid whose compInfoOffset > 0
for (sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; sid++) {
if (pHandle->pHeader[sid].compInfoOffset > 0) break;
}
if (pImport->retry < 1000) {
dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
commitInProcess, num, pObj->state);
pHandle->nextNo0Offset = (sid == pVnode->cfg.maxSessions) ? pHandle->hfSize : pHandle->pHeader[sid].compInfoOffset;
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl);
return;
} else {
pShell->code = TSDB_CODE_TOO_SLOW;
}
// FIXME: sendfile the original part
// TODO: Here, we need to take the deleted table case in consideration, this function
// just assume the case is handled before calling this function
if (pHandle->pHeader[pObj->sid].compInfoOffset > 0) {
pHandle->compInfoOffset = pHandle->pHeader[pObj->sid].compInfoOffset;
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int32_t ret = vnodeImportData(pObj, pImport);
if (pShell) {
pShell->code = ret;
pShell->numOfTotalPoints += pImport->importedRows;
}
pHandle->compInfoOffset = pHandle->nextNo0Offset;
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++;
assert(pHandle->compInfoOffset <= pHandle->hfSize);
// send response back to shell
if (pShell) {
pShell->count--;
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pImport->pShell, pShell->code, pShell->numOfTotalPoints);
lseek(pVnode->hfd, 0, SEEK_SET);
lseek(pVnode->nfd, 0, SEEK_SET);
if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->compInfoOffset) < 0) {
return -1;
}
pImport->signature = NULL;
free(pImport->opayload);
free(pImport);
// Leave a SCompInfo space here
lseek(pVnode->nfd, sizeof(SCompInfo), SEEK_CUR);
return 0;
}
int vnodeImportToFile(SImportInfo *pImport) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
SHeadInfo headInfo;
int code = 0, col;
SCompBlock compBlock;
char * payload = pImport->payload;
int rows = pImport->rows;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1)));
TSKEY firstKey = *((TSKEY *)payload);
memset(&headInfo, 0, sizeof(headInfo));
headInfo.headList = malloc(sizeof(SCompHeader) * pCfg->maxSessions + sizeof(TSCKSUM));
SData *cdata[TSDB_MAX_COLUMNS];
char *buffer1 =
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns);
cdata[0] = (SData *)buffer1;
typedef enum { DATA_LOAD_TIMESTAMP = 0x1, DATA_LOAD_OTHER_DATA = 0x2 } DataLoadMod;
SData *data[TSDB_MAX_COLUMNS];
char *buffer2 =
malloc(pObj->bytesPerPoint * pCfg->rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns);
data[0] = (SData *)buffer2;
/* Function to load a block data at the requirement of mod
*/
static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod, int *code) {
size_t size;
SCompBlock *pBlock = pHandle->pBlocks + blockId;
*code = TSDB_CODE_SUCCESS;
for (col = 1; col < pObj->numOfColumns; ++col) {
cdata[col] = (SData *)(((char *)cdata[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + sizeof(TSCKSUM));
data[col] = (SData *)(((char *)data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + sizeof(TSCKSUM));
}
assert(pBlock->sversion == pObj->sversion);
int rowsBefore = 0;
int rowsRead = 0;
int rowsUnread = 0;
int leftRows = rows; // left number of rows of imported data
int row, rowsToWrite;
int64_t offset[TSDB_MAX_COLUMNS];
SVnodeObj *pVnode = vnodeList + pObj->vnode;
if (pImport->pos > 0) {
for (col = 0; col < pObj->numOfColumns; ++col)
memcpy(data[col]->data, pImport->sdata[col]->data, pImport->pos * pObj->schema[col].bytes);
int dfd = pBlock->last ? pVnode->lfd : pVnode->dfd;
rowsBefore = pImport->pos;
rowsRead = pImport->pos;
rowsUnread = pImport->numOfPoints - pImport->pos;
if (pHandle->blockId != blockId) {
pHandle->blockId = blockId;
pHandle->blockLoadState = 0;
}
dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to file, firstKey:%ld lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey);
do {
if (leftRows > 0) {
code = vnodeOpenFileForImport(pImport, payload, &headInfo, data);
if (code < 0) goto _exit;
if (code > 0) {
rowsBefore = code;
code = 0;
};
} else {
// if payload is already imported, rows unread shall still be processed
rowsBefore = 0;
if (pHandle->blockLoadState == 0){ // Reload pField
size = sizeof(SField) * pBlock->numOfCols + sizeof(TSCKSUM);
if (pHandle->pFieldSize < size) {
pHandle->pField = (SField *)realloc((void *)(pHandle->pField), size);
if (pHandle->pField == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
pHandle->pFieldSize = size;
}
int rowsToProcess = pObj->pointsPerFileBlock - rowsBefore;
if (rowsToProcess > leftRows) rowsToProcess = leftRows;
lseek(dfd, pBlock->offset, SEEK_SET);
if (read(dfd, (void *)(pHandle->pField), pHandle->pFieldSize) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to read data file, size:%ld reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pHandle->pFieldSize, strerror(errno));
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
for (col = 0; col < pObj->numOfColumns; ++col) {
offset[col] = data[col]->data + rowsBefore * pObj->schema[col].bytes;
if (!taosCheckChecksumWhole((uint8_t *)(pHandle->pField), pHandle->pFieldSize)) {
dError("vid:%d sid:%d meterId:%s, data file %s is broken since checksum mismatch", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->lfn);
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
}
row = 0;
if (leftRows > 0) {
for (row = 0; row < rowsToProcess; ++row) {
if (*((TSKEY *)payload) > pVnode->commitLastKey) break;
{ // Allocate necessary buffer
size = pObj->bytesPerPoint * pObj->pointsPerFileBlock +
(sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns;
if (pHandle->buffer == NULL) {
pHandle->buffer = malloc(size);
if (pHandle->buffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
for (col = 0; col < pObj->numOfColumns; ++col) {
memcpy((void *)offset[col], payload, pObj->schema[col].bytes);
payload += pObj->schema[col].bytes;
offset[col] += pObj->schema[col].bytes;
}
// TODO: Init data
pHandle->data[0] = (SData *)(pHandle->buffer);
for (int col = 1; col < pObj->numOfColumns; col++) {
pHandle->data[col] = (SData *)((char *)(pHandle->data[col - 1]) + sizeof(SData) + EXTRA_BYTES +
sizeof(TSCKSUM) + pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
}
}
leftRows -= row;
rowsToWrite = rowsBefore + row;
rowsBefore = 0;
if (leftRows == 0 && rowsUnread > 0) {
// copy the unread
int rowsToCopy = pObj->pointsPerFileBlock - rowsToWrite;
if (rowsToCopy > rowsUnread) rowsToCopy = rowsUnread;
for (col = 0; col < pObj->numOfColumns; ++col) {
int bytes = pObj->schema[col].bytes;
memcpy(data[col]->data + rowsToWrite * bytes, pImport->sdata[col]->data + rowsRead * bytes, rowsToCopy * bytes);
if (pHandle->temp == NULL) {
pHandle->temp = malloc(size);
if (pHandle->temp == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
rowsRead += rowsToCopy;
rowsUnread -= rowsToCopy;
rowsToWrite += rowsToCopy;
}
for (col = 0; col < pObj->numOfColumns; ++col) {
data[col]->len = rowsToWrite * pObj->schema[col].bytes;
if (pHandle->tempBuffer == NULL) {
pHandle->tempBufferSize = pObj->maxBytes * pObj->pointsPerFileBlock + EXTRA_BYTES + sizeof(TSCKSUM);
pHandle->tempBuffer = malloc(pHandle->tempBufferSize);
if (pHandle->tempBuffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->tempBufferSize);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
}
}
compBlock.last = headInfo.last;
vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite);
twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock));
rowsToWrite = 0;
headInfo.newBlocks++;
} while (leftRows > 0 || rowsUnread > 0);
if ((loadMod & DATA_LOAD_TIMESTAMP) &&
(~(pHandle->blockLoadState & DATA_LOAD_TIMESTAMP))) { // load only timestamp part
if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX,
pHandle->data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY) * pBlock->numOfPoints,
pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize) < 0) {
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
if (compBlock.keyLast > pObj->lastKeyOnFile)
pObj->lastKeyOnFile = compBlock.keyLast;
pHandle->blockLoadState |= DATA_LOAD_TIMESTAMP;
}
vnodeCloseFileForImport(pObj, &headInfo);
dTrace("vid:%d sid:%d id:%s, %d rows data are imported to file", pObj->vnode, pObj->sid, pObj->meterId, rows);
if ((loadMod & DATA_LOAD_OTHER_DATA) && (~(pHandle->blockLoadState & DATA_LOAD_OTHER_DATA))) { // load other columns
for (int col = 1; col < pBlock->numOfCols; col++) {
if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data,
pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer,
pHandle->tempBufferSize) < 0) {
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
}
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
pthread_mutex_lock(&pPool->vmutex);
pHandle->blockLoadState |= DATA_LOAD_OTHER_DATA;
}
if (pInfo->numOfBlocks > 0) {
int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0]));
return 0;
}
// data may be in commited cache, cache shall be released
if (lastKey > firstKeyInCache) {
while (slot != pInfo->commitSlot) {
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
vnodeFreeCacheBlock(pCacheBlock);
slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
static int vnodeCloseImportFiles(SMeterObj *pObj, SImportHandle *pHandle) {
SVnodeObj *pVnode = vnodeList + pObj->vnode;
char dpath[TSDB_FILENAME_LEN] = "\0";
SCompInfo compInfo;
__off_t offset = 0;
if (pVnode->nfd > 0) {
offset = lseek(pVnode->nfd, 0, SEEK_CUR);
assert(offset == pHandle->nextNo0Offset + pHandle->driftOffset);
{ // Write the SCompInfo part
compInfo.uid = pObj->uid;
compInfo.last = pHandle->last;
compInfo.numOfBlocks = pHandle->newNumOfBlocks + pHandle->oldNumOfBlocks;
compInfo.delimiter = TSDB_VNODE_DELIMITER;
taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo));
lseek(pVnode->nfd, pHandle->compInfoOffset, SEEK_SET);
if (twrite(pVnode->nfd, (void *)(&compInfo), sizeof(SCompInfo)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to wirte SCompInfo, reason:%s", pObj->vnode, pObj->sid, pObj->meterId,
strerror(errno));
return -1;
}
}
// last slot, the uncommitted slots shall be shifted, a cache block may have empty rows
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
int points = pCacheBlock->numOfPoints - pInfo->commitPoint;
if (points > 0) {
for (int col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memmove(pCacheBlock->offset[col], pCacheBlock->offset[col] + pObj->schema[col].bytes * pInfo->commitPoint, size);
}
// Write the rest of the SCompBlock part
if (pHandle->hfSize > pHandle->nextNo0Offset) {
lseek(pVnode->nfd, 0, SEEK_END);
lseek(pVnode->hfd, pHandle->nextNo0Offset, SEEK_SET);
if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->hfSize - pHandle->nextNo0Offset) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to sendfile, size:%ld, reason:%s", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->hfSize - pHandle->nextNo0Offset, strerror(errno));
return -1;
}
}
if (pInfo->commitPoint != pObj->pointsPerBlock) {
// commit point shall be set to 0 if last block is not full
pInfo->commitPoint = 0;
pCacheBlock->numOfPoints = points;
if (slot == pInfo->currentSlot) {
atomic_fetch_add_32(&pObj->freePoints, pInfo->commitPoint);
}
} else {
// if last block is full and committed
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
if (pCacheBlock->pMeterObj == pObj) {
vnodeFreeCacheBlock(pCacheBlock);
}
// Write SCompHeader part
pHandle->pHeader[pObj->sid].compInfoOffset = pHandle->compInfoOffset;
for (int sid = pObj->sid + 1; sid < pVnode->cfg.maxSessions; ++sid) {
if (pHandle->pHeader[sid].compInfoOffset > 0) {
pHandle->pHeader[sid].compInfoOffset += pHandle->driftOffset;
}
}
taosCalcChecksumAppend(0, (uint8_t *)(pHandle->pHeader), pHandle->pHeaderSize);
lseek(pVnode->nfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (twrite(pVnode->nfd, (void *)(pHandle->pHeader), pHandle->pHeaderSize) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to wirte SCompHeader part, size:%ld, reason:%s", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->pHeaderSize, strerror(errno));
return -1;
}
}
if (lastKey > pObj->lastKeyOnFile) pObj->lastKeyOnFile = lastKey;
// Close opened files
close(pVnode->dfd);
pVnode->dfd = 0;
pthread_mutex_unlock(&pPool->vmutex);
close(pVnode->hfd);
pVnode->hfd = 0;
_exit:
tfree(headInfo.headList);
tfree(buffer1);
tfree(buffer2);
tfree(pImport->buffer);
close(pVnode->lfd);
pVnode->lfd = 0;
return code;
if (pVnode->nfd > 0) {
close(pVnode->nfd);
pVnode->nfd = 0;
readlink(pVnode->cfn, dpath, TSDB_FILENAME_LEN);
rename(pVnode->nfn, pVnode->cfn);
remove(dpath);
}
return 0;
}
int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SVnodeCfg *pCfg = &pVnode->cfg;
int code = -1;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
int slot, pos, row, col, points, tpoints;
static void vnodeConvertRowsToCols(SMeterObj *pObj, const char *payload, int rows, SData *data[], int rowOffset) {
int sdataRow;
int offset;
char *data[TSDB_MAX_COLUMNS], *current[TSDB_MAX_COLUMNS];
int slots = pInfo->unCommittedBlocks + 1;
int trows = slots * pObj->pointsPerBlock + rows; // max rows in buffer
int tsize = (trows / pObj->pointsPerBlock + 1) * pCfg->cacheBlockSize;
TSKEY firstKey = *((TSKEY *)payload);
TSKEY lastKey = *((TSKEY *)(payload + pObj->bytesPerPoint * (rows - 1)));
for (int row = 0; row < rows; ++row) {
sdataRow = row + rowOffset;
offset = 0;
for (int col = 0; col < pObj->numOfColumns; ++col) {
memcpy(data[col]->data + sdataRow * pObj->schema[col].bytes, payload + pObj->bytesPerPoint * row + offset,
pObj->schema[col].bytes);
if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) {
dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->freePoints);
pImport->importedRows = 0;
pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
offset += pObj->schema[col].bytes;
}
}
}
assert(rows);
dTrace("vid:%d sid:%d id:%s, %d rows data will be imported to cache, firstKey:%ld lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey);
static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int rows, int fid) {
SMeterObj * pObj = (SMeterObj *)(pImport->pObj);
SVnodeObj * pVnode = vnodeList + pObj->vnode;
SImportHandle importHandle;
size_t size = 0;
SData * data[TSDB_MAX_COLUMNS];
char * buffer = NULL;
SData * cdata[TSDB_MAX_COLUMNS];
char * cbuffer = NULL;
SCompBlock compBlock;
TSCKSUM checksum = 0;
int pointsImported = 0;
int code = TSDB_CODE_SUCCESS;
SCachePool * pPool = (SCachePool *)pVnode->pCachePool;
SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
TSKEY lastKeyImported = 0;
TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
TSKEY minFileKey = fid * delta;
TSKEY maxFileKey = minFileKey + delta - 1;
TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey >= minFileKey && firstKey <= maxFileKey && lastKey >= minFileKey && lastKey <= maxFileKey);
// create neccessary files
pVnode->commitFirstKey = firstKey;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return TSDB_CODE_OTHERS;
assert(pVnode->commitFileId == fid);
// Open least files to import .head(hfd) .data(dfd) .last(lfd)
if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return TSDB_CODE_FILE_CORRUPTED;
memset(&importHandle, 0, sizeof(SImportHandle));
{ // Load SCompHeader part from .head file
importHandle.pHeaderSize = sizeof(SCompHeader) * pVnode->cfg.maxSessions + sizeof(TSCKSUM);
importHandle.pHeader = (SCompHeader *)malloc(importHandle.pHeaderSize);
if (importHandle.pHeader == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, importHandle.pHeaderSize);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
pthread_mutex_lock(&(pVnode->vmutex));
if (firstKey < pVnode->firstKey) pVnode->firstKey = firstKey;
pthread_mutex_unlock(&(pVnode->vmutex));
lseek(pVnode->hfd, TSDB_FILE_HEADER_LEN, SEEK_SET);
if (read(pVnode->hfd, (void *)(importHandle.pHeader), importHandle.pHeaderSize) < importHandle.pHeaderSize) {
dError("vid: %d, sid: %d, meterId: %s, fid: %d failed to read SCompHeader part, reason:%s", pObj->vnode,
pObj->sid, pObj->meterId, fid, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
char *buffer = malloc(tsize); // buffer to hold unCommitted data plus import data
data[0] = buffer;
current[0] = data[0];
for (col = 1; col < pObj->numOfColumns; ++col) {
data[col] = data[col - 1] + trows * pObj->schema[col - 1].bytes;
current[col] = data[col];
if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pHeader), importHandle.pHeaderSize)) {
dError("vid: %d, sid: %d, meterId: %s, fid: %d SCompHeader part is broken", pObj->vnode, pObj->sid, pObj->meterId,
fid);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
}
// write import data into buffer first
for (row = 0; row < rows; ++row) {
for (col = 0; col < pObj->numOfColumns; ++col) {
memcpy(current[col], payload, pObj->schema[col].bytes);
payload += pObj->schema[col].bytes;
current[col] += pObj->schema[col].bytes;
{ // Initialize data[] and cdata[], which is used to hold data to write to data file
size = pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + (sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM)) * pObj->numOfColumns;
buffer = (char *)malloc(size);
if (buffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
}
// copy the overwritten data into buffer, merge cache blocks
tpoints = rows;
pos = pImport->pos;
slot = pImport->slot;
while (1) {
points = pInfo->cacheBlocks[slot]->numOfPoints - pos;
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(current[col], pInfo->cacheBlocks[slot]->offset[col] + pos * pObj->schema[col].bytes, size);
current[col] += size;
cbuffer = (char *)malloc(size);
if (cbuffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
pos = 0;
tpoints += points;
if (slot == pInfo->currentSlot) break;
slot = (slot + 1) % pInfo->maxBlocks;
}
data[0] = (SData *)buffer;
cdata[0] = (SData *)cbuffer;
for (col = 0; col < pObj->numOfColumns; ++col) current[col] = data[col];
pos = pImport->pos;
for (int col = 1; col < pObj->numOfColumns; col++) {
data[col] = (SData *)((char *)data[col - 1] + sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM) +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
cdata[col] = (SData *)((char *)cdata[col - 1] + sizeof(SData) + EXTRA_BYTES + sizeof(TSCKSUM) +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
}
}
// write back to existing slots first
slot = pImport->slot;
while (1) {
points = (tpoints > pObj->pointsPerBlock - pos) ? pObj->pointsPerBlock - pos : tpoints;
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size);
current[col] += size;
if (importHandle.pHeader[pObj->sid].compInfoOffset == 0) { // No data in this file, just write it
_write_empty_point:
if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
pCacheBlock->numOfPoints = points + pos;
pos = 0;
tpoints -= points;
if (tpoints == 0) {
// free the rest of cache blocks, since cache blocks are merged
int currentSlot = slot;
while (slot != pInfo->currentSlot) {
slot = (slot + 1) % pInfo->maxBlocks;
pCacheBlock = pInfo->cacheBlocks[slot];
vnodeFreeCacheBlock(pCacheBlock);
importHandle.oldNumOfBlocks = 0;
importHandle.driftOffset += sizeof(SCompInfo);
lastKeyImported = lastKey;
for (int rowsWritten = 0; rowsWritten < rows;) {
int rowsToWrite = MIN(pVnode->cfg.rowsInFileBlock, (rows - rowsWritten) /* the rows left */);
vnodeConvertRowsToCols(pObj, payload + rowsWritten * pObj->bytesPerPoint, rowsToWrite, data, 0);
pointsImported += rowsToWrite;
compBlock.last = 1;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite) < 0) {
// TODO: deal with ERROR here
}
pInfo->currentSlot = currentSlot;
slot = currentSlot; // make sure to exit from the while loop
importHandle.last = compBlock.last;
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock));
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowsWritten += rowsToWrite;
}
twrite(pVnode->nfd, &checksum, sizeof(TSCKSUM));
importHandle.driftOffset += sizeof(TSCKSUM);
} else { // Else if there are old data in this file.
{ // load SCompInfo and SCompBlock part
lseek(pVnode->hfd, importHandle.pHeader[pObj->sid].compInfoOffset, SEEK_SET);
if (read(pVnode->hfd, (void *)(&(importHandle.compInfo)), sizeof(SCompInfo)) < sizeof(SCompInfo)) {
dError("vid:%d sid:%d meterId:%s, failed to read .head file, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
if (slot == pInfo->currentSlot) break;
slot = (slot + 1) % pInfo->maxBlocks;
}
if ((importHandle.compInfo.delimiter != TSDB_VNODE_DELIMITER) ||
(!taosCheckChecksumWhole((uint8_t *)(&(importHandle.compInfo)), sizeof(SCompInfo)))) {
dError("vid:%d sid:%d meterId:%s, .head file %s is broken, delemeter:%x", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->cfn, importHandle.compInfo.delimiter);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
// allocate new cache block if there are still data left
while (tpoints > 0) {
pImport->commit = vnodeAllocateCacheBlock(pObj);
if (pImport->commit < 0) goto _exit;
points = (tpoints > pObj->pointsPerBlock) ? pObj->pointsPerBlock : tpoints;
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[pInfo->currentSlot];
for (col = 0; col < pObj->numOfColumns; ++col) {
int size = points * pObj->schema[col].bytes;
memcpy(pCacheBlock->offset[col] + pos * pObj->schema[col].bytes, current[col], size);
current[col] += size;
// Check the context of SCompInfo part
if (importHandle.compInfo.uid != pObj->uid) { // The data belongs to the other meter
goto _write_empty_point;
}
importHandle.oldNumOfBlocks = importHandle.compInfo.numOfBlocks;
importHandle.last = importHandle.compInfo.last;
size = sizeof(SCompBlock) * importHandle.compInfo.numOfBlocks + sizeof(TSCKSUM);
importHandle.pBlocks = (SCompBlock *)malloc(size);
if (importHandle.pBlocks == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate importHandle.pBlock, size:%ul", pVnode->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
if (read(pVnode->hfd, (void *)(importHandle.pBlocks), size) < size) {
dError("vid:%d sid:%d meterId:%s, failed to read importHandle.pBlock, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pBlocks), size)) {
dError("vid:%d sid:%d meterId:%s, pBlock part is broken in %s", pVnode->vnode, pObj->sid, pObj->meterId,
pVnode->cfn);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
}
tpoints -= points;
pCacheBlock->numOfPoints = points;
}
code = 0;
atomic_fetch_sub_32(&pObj->freePoints, rows);
dTrace("vid:%d sid:%d id:%s, %d rows data are imported to cache", pObj->vnode, pObj->sid, pObj->meterId, rows);
/* Now we have _payload_, we have _importHandle.pBlocks_, just merge payload into the importHandle.pBlocks
*
* Input: payload, pObj->bytesPerBlock, rows, importHandle.pBlocks
*/
{
int payloadIter = 0;
SBlockIter blockIter = {0, 0, 0, 0};
while (1) {
if (payloadIter >= rows) { // payload end, break
// write the remaining blocks to the file
if (pVnode->nfd > 0) {
int blocksLeft = importHandle.compInfo.numOfBlocks - blockIter.oslot;
if (blocksLeft > 0) {
checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * blocksLeft);
if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * blocksLeft) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * blocksLeft, strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
}
if (twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(TSCKSUM), strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
}
break;
}
_exit:
free(buffer);
return code;
}
if (blockIter.slot >= importHandle.compInfo.numOfBlocks) { // blocks end, break
// Should never come here
assert(false);
}
int vnodeFindKeyInFile(SImportInfo *pImport, int order) {
SMeterObj *pObj = pImport->pObj;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
int code = -1;
SQuery query;
SColumnInfoEx colList[TSDB_MAX_COLUMNS] = {0};
TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
{ // Binary search the (slot, pos) which is >= key as well as nextKey
int left = blockIter.slot;
int right = importHandle.compInfo.numOfBlocks - 1;
TSKEY minKey = importHandle.pBlocks[left].keyFirst;
TSKEY maxKey = importHandle.pBlocks[right].keyLast;
assert(minKey <= maxKey);
if (key < minKey) { // Case 1. write just ahead the blockIter.slot
blockIter.slot = left;
blockIter.pos = 0;
blockIter.nextKey = minKey;
} else if (key > maxKey) { // Case 2. write to the end
if (importHandle.pBlocks[right].last) { // Case 2.1 last block in .last file, need to merge
assert(importHandle.last != 0);
importHandle.last = 0;
blockIter.slot = right;
blockIter.pos = importHandle.pBlocks[right].numOfPoints;
} else { // Case 2.2 just write after the last block
blockIter.slot = right + 1;
blockIter.pos = 0;
}
blockIter.nextKey = maxFileKey + 1;
} else { // Case 3. need to search the block for slot and pos
if (key == minKey || key == maxKey) {
payloadIter++;
continue;
}
// Here: minKey < key < maxKey
int mid;
TSKEY blockMinKey;
TSKEY blockMaxKey;
// Binary search the slot
do {
mid = (left + right) / 2;
blockMinKey = importHandle.pBlocks[mid].keyFirst;
blockMaxKey = importHandle.pBlocks[mid].keyLast;
assert(blockMinKey <= blockMaxKey);
if (key < blockMinKey) {
right = mid;
} else if (key > blockMaxKey) {
left = mid + 1;
} else { /* blockMinKey <= key <= blockMaxKey */
break;
}
} while (left < right);
if (key == blockMinKey || key == blockMaxKey) { // duplicate key
payloadIter++;
continue;
}
// Get the slot
if (key > blockMaxKey) { /* pos = 0 or pos = ? */
blockIter.slot = mid + 1;
} else { /* key < blockMinKey (pos = 0) || (key > blockMinKey && key < blockMaxKey) (pos=?) */
blockIter.slot = mid;
}
// Get the pos
assert(blockIter.slot < importHandle.compInfo.numOfBlocks);
if (key == importHandle.pBlocks[blockIter.slot].keyFirst ||
key == importHandle.pBlocks[blockIter.slot].keyLast) {
payloadIter++;
continue;
}
assert(key < importHandle.pBlocks[blockIter.slot].keyLast);
/* */
if (key < importHandle.pBlocks[blockIter.slot].keyFirst) {
blockIter.pos = 0;
blockIter.nextKey = importHandle.pBlocks[blockIter.slot].keyFirst;
} else {
SCompBlock *pBlock = importHandle.pBlocks + blockIter.slot;
if (pBlock->sversion != pObj->sversion) { /*TODO*/
}
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP, &code) < 0) {
goto _error_merge;
}
int pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(
importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
assert(pos != 0);
if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) {
payloadIter++;
continue;
}
blockIter.pos = pos;
blockIter.nextKey = (blockIter.slot + 1 < importHandle.compInfo.numOfBlocks)
? importHandle.pBlocks[blockIter.slot + 1].keyFirst
: maxFileKey + 1;
// Need to merge with this block
if (importHandle.pBlocks[blockIter.slot].last) { // this is to merge with the last block
assert((blockIter.slot == (importHandle.compInfo.numOfBlocks - 1)));
importHandle.last = 0;
}
}
}
}
TSKEY key = order ? pImport->firstKey : pImport->lastKey;
memset(&query, 0, sizeof(query));
query.order.order = order;
query.skey = key;
query.ekey = order ? INT64_MAX : 0;
query.colList = colList;
query.numOfCols = pObj->numOfColumns;
for (int16_t i = 0; i < pObj->numOfColumns; ++i) {
colList[i].data.colId = pObj->schema[i].colId;
colList[i].data.bytes = pObj->schema[i].bytes;
colList[i].data.type = pObj->schema[i].type;
colList[i].colIdx = i;
colList[i].colIdxInBuf = i;
}
int ret = vnodeSearchPointInFile(pObj, &query);
if (ret >= 0) {
if (query.slot < 0) {
pImport->slot = 0;
pImport->pos = 0;
pImport->key = 0;
pImport->fileId = pVnode->fileId - pVnode->numOfFiles + 1;
dTrace("vid:%d sid:%d id:%s, import to head of file", pObj->vnode, pObj->sid, pObj->meterId);
code = 0;
} else if (query.slot >= 0) {
code = 0;
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
pImport->fileId = query.fileId;
SCompBlock *pBlock = &query.pBlock[query.slot];
pImport->numOfPoints = pBlock->numOfPoints;
if (pImport->key != key) {
if (order == 0) {
pImport->pos++;
if (pImport->pos >= pBlock->numOfPoints) {
pImport->slot++;
pImport->pos = 0;
// Open the new .t file if not opened yet.
if (pVnode->nfd <= 0) {
if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
} else {
if (pImport->pos < 0) pImport->pos = 0;
}
}
if (pImport->key != key && pImport->pos > 0) {
if ( pObj->sversion != pBlock->sversion ) {
dError("vid:%d sid:%d id:%s, import sversion not matched, expected:%d received:%d", pObj->vnode, pObj->sid,
pObj->meterId, pBlock->sversion, pObj->sversion);
code = TSDB_CODE_OTHERS;
} else {
pImport->offset = pBlock->offset;
pImport->buffer =
malloc(pObj->bytesPerPoint * pVnode->cfg.rowsInFileBlock + sizeof(SData) * pObj->numOfColumns);
pImport->sdata[0] = (SData *)pImport->buffer;
for (int col = 1; col < pObj->numOfColumns; ++col)
pImport->sdata[col] = (SData *)(((char *)pImport->sdata[col - 1]) + sizeof(SData) +
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes);
code = vnodeReadCompBlockToMem(pObj, &query, pImport->sdata);
if (code < 0) {
code = -code;
tfree(pImport->buffer);
if (blockIter.slot > blockIter.oslot) { // write blocks in range [blockIter.oslot, blockIter.slot) to .t file
checksum = taosCalcChecksum(checksum, (uint8_t *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot));
if (twrite(pVnode->nfd, (void *)(importHandle.pBlocks + blockIter.oslot),
sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot),
strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
blockIter.oslot = blockIter.slot;
}
}
}
} else {
dError("vid:%d sid:%d id:%s, file is corrupted, import failed", pObj->vnode, pObj->sid, pObj->meterId);
code = -ret;
}
tclose(query.hfd);
tclose(query.dfd);
tclose(query.lfd);
vnodeFreeFields(&query);
tfree(query.pBlock);
if (blockIter.pos == 0) { // No need to merge
// copy payload part to data
int rowOffset = 0;
for (; payloadIter < rows; rowOffset++) {
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) break;
return code;
}
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
}
int vnodeFindKeyInCache(SImportInfo *pImport, int order) {
SMeterObj *pObj = pImport->pObj;
int code = 0;
SQuery query;
SCacheInfo *pInfo = (SCacheInfo *)pObj->pCache;
// write directly to .data file
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO: Deal with the ERROR here
}
TSKEY key = order ? pImport->firstKey : pImport->lastKey;
memset(&query, 0, sizeof(query));
query.order.order = order;
query.skey = key;
query.ekey = order ? pImport->lastKey : pImport->firstKey;
vnodeSearchPointInCache(pObj, &query);
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, &compBlock, sizeof(SCompBlock)) < 0) {
// TODO : deal with the ERROR here
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
} else { // Merge block and payload from payloadIter
if (query.slot < 0) {
pImport->slot = pInfo->commitSlot;
if (pInfo->commitPoint >= pObj->pointsPerBlock) pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
pImport->key = 0;
dTrace("vid:%d sid:%d id:%s, key:%ld, import to head of cache", pObj->vnode, pObj->sid, pObj->meterId, key);
code = 0;
} else {
pImport->slot = query.slot;
pImport->pos = query.pos;
pImport->key = query.key;
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot,
DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA, &code) < 0) { // Load neccessary blocks
goto _error_merge;
}
if (key != query.key) {
if (order == 0) {
// since pos is the position which has smaller key, data shall be imported after it
pImport->pos++;
if (pImport->pos >= pObj->pointsPerBlock) {
pImport->slot = (pImport->slot + 1) % pInfo->maxBlocks;
pImport->pos = 0;
importHandle.oldNumOfBlocks--;
importHandle.driftOffset -= sizeof(SCompBlock);
int rowOffset = blockIter.pos; // counter for data
// Copy the front part
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy((void *)(data[col]->data), (void *)(importHandle.data[col]->data),
pObj->schema[col].bytes * blockIter.pos);
}
// Merge part
while (1) {
if (rowOffset >= pVnode->cfg.rowsInFileBlock) { // data full in a block to commit
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO : deal with the ERROR here
}
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
goto _error_merge;
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowOffset = 0;
}
if ((payloadIter >= rows || KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) &&
blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints)
break;
if (payloadIter >= rows ||
KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) >= blockIter.nextKey) { // payload end
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos, pObj->schema[col].bytes);
}
blockIter.pos++;
rowOffset++;
} else if (blockIter.pos >= importHandle.pBlocks[blockIter.slot].numOfPoints) { // block end
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
rowOffset++;
} else {
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) ==
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
blockIter.pos)) { // duplicate key
payloadIter++;
continue;
} else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
blockIter.pos)) {
vnodeConvertRowsToCols(pObj, payload + pObj->bytesPerPoint * payloadIter, 1, data, rowOffset);
pointsImported++;
lastKeyImported = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
payloadIter++;
rowOffset++;
} else {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(data[col]->data + rowOffset * pObj->schema[col].bytes,
importHandle.data[col]->data + pObj->schema[col].bytes * blockIter.pos,
pObj->schema[col].bytes);
}
blockIter.pos++;
rowOffset++;
}
}
}
if (rowOffset > 0) { // data full in a block to commit
compBlock.last = 0;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowOffset) < 0) {
// TODO : deal with the ERROR here
}
checksum = taosCalcChecksum(checksum, (uint8_t *)(&compBlock), sizeof(SCompBlock));
if (twrite(pVnode->nfd, (void *)(&compBlock), sizeof(SCompBlock)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(SCompBlock), strerror(errno));
goto _error_merge;
}
importHandle.newNumOfBlocks++;
importHandle.driftOffset += sizeof(SCompBlock);
rowOffset = 0;
}
blockIter.slot++;
blockIter.oslot = blockIter.slot;
}
} else {
if (pImport->pos < 0) pImport->pos = 0;
}
}
code = 0;
}
return code;
}
// Write the SCompInfo part
if (vnodeCloseImportFiles(pObj, &importHandle) < 0) {
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
int vnodeImportStartToCache(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
pImport->importedRows += pointsImported;
code = vnodeFindKeyInCache(pImport, 1);
if (code != 0) return code;
pthread_mutex_lock(&(pPool->vmutex));
if (pInfo->numOfBlocks > 0) {
int slot = (pInfo->currentSlot - pInfo->numOfBlocks + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
TSKEY firstKeyInCache = *((TSKEY *)(pInfo->cacheBlocks[slot]->offset[0]));
if (pImport->key != pImport->firstKey) {
rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key);
pImport->importedRows = rows;
code = vnodeImportToCache(pImport, payload, rows);
} else {
dTrace("vid:%d sid:%d id:%s, data is already imported to cache, firstKey:%lld", pObj->vnode, pObj->sid, pObj->meterId, pImport->firstKey);
// data may be in commited cache, cache shall be released
if (lastKeyImported > firstKeyInCache) {
while (slot != pInfo->commitSlot) {
SCacheBlock *pCacheBlock = pInfo->cacheBlocks[slot];
vnodeFreeCacheBlock(pCacheBlock);
slot = (slot + 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
}
if (pInfo->commitPoint == pObj->pointsPerBlock) {
if (pInfo->cacheBlocks[pInfo->commitSlot]->pMeterObj == pObj) {
vnodeFreeCacheBlock(pInfo->cacheBlocks[pInfo->commitSlot]);
}
}
}
}
pthread_mutex_unlock(&(pPool->vmutex));
// TODO: free the allocated memory
tfree(buffer);
tfree(cbuffer);
tfree(importHandle.pHeader);
tfree(importHandle.pBlocks);
tfree(importHandle.pField);
tfree(importHandle.buffer);
tfree(importHandle.temp);
tfree(importHandle.tempBuffer);
return code;
}
int vnodeImportStartToFile(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
code = vnodeFindKeyInFile(pImport, 1);
if (code != 0) return code;
if (pImport->key != pImport->firstKey) {
pImport->payload = payload;
pImport->rows = vnodeGetImportStartPart(pObj, payload, rows, pImport->key);
pImport->importedRows = pImport->rows;
code = vnodeImportToFile(pImport);
} else {
dTrace("vid:%d sid:%d id:%s, data is already imported to file", pObj->vnode, pObj->sid, pObj->meterId);
_error_merge:
tfree(buffer);
tfree(cbuffer);
tfree(importHandle.pHeader);
tfree(importHandle.pBlocks);
tfree(importHandle.pField);
tfree(importHandle.buffer);
tfree(importHandle.temp);
tfree(importHandle.tempBuffer);
close(pVnode->dfd);
pVnode->dfd = 0;
close(pVnode->hfd);
pVnode->hfd = 0;
close(pVnode->lfd);
pVnode->lfd = 0;
if (pVnode->nfd > 0) {
close(pVnode->nfd);
pVnode->nfd = 0;
remove(pVnode->nfn);
}
return code;
}
int vnodeImportWholeToFile(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
#define FORWARD_ITER(iter, step, slotLimit, posLimit) \
{ \
if ((iter.pos) + (step) < (posLimit)) { \
(iter.pos) = (iter.pos) + (step); \
} else { \
(iter.pos) = 0; \
(iter.slot) = ((iter.slot) + 1) % (slotLimit); \
} \
}
code = vnodeFindKeyInFile(pImport, 0);
if (code != 0) return code;
int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) {
SCacheInfo *pInfo = (SCacheInfo *)(pMeter->pCache);
int slot = 0;
int pos = 0;
if (pImport->key != pImport->lastKey) {
pImport->payload = payload;
pImport->rows = vnodeGetImportEndPart(pObj, payload, rows, &pImport->payload, pImport->key);
pImport->importedRows = pImport->rows;
code = vnodeImportToFile(pImport);
if (pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints == pMeter->pointsPerBlock) {
slot = (pInfo->currentSlot + 1) % (pInfo->maxBlocks);
pos = 0;
} else {
code = vnodeImportStartToFile(pImport, payload, rows);
slot = pInfo->currentSlot;
pos = pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
}
return code;
return ((iter.slot == slot) && (iter.pos == pos));
}
int vnodeImportWholeToCache(SImportInfo *pImport, char *payload, int rows) {
int code = 0;
SMeterObj *pObj = pImport->pObj;
static void vnodeFlushMergeBuffer(SMergeBuffer *pBuffer, SBlockIter *pWriteIter, SBlockIter *pCacheIter,
SMeterObj *pObj, SCacheInfo *pInfo, int checkBound) {
// Function to flush the merge buffer data to cache
if (pWriteIter->pos == pObj->pointsPerBlock) {
pWriteIter->pos = 0;
pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
}
code = vnodeFindKeyInCache(pImport, 0);
if (code != 0) return code;
while (pBuffer->spos != pBuffer->epos) {
if (checkBound && pWriteIter->slot == pCacheIter->slot && pWriteIter->pos == pCacheIter->pos) break;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pInfo->cacheBlocks[pWriteIter->slot]->offset[col] + pObj->schema[col].bytes * pWriteIter->pos,
pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->spos, pObj->schema[col].bytes);
}
if (pImport->key != pImport->lastKey) {
char *pStart;
if ( pImport->key < pObj->lastKeyOnFile ) pImport->key = pObj->lastKeyOnFile;
rows = vnodeGetImportEndPart(pObj, payload, rows, &pStart, pImport->key);
pImport->importedRows = rows;
code = vnodeImportToCache(pImport, pStart, rows);
} else {
if (pImport->firstKey > pObj->lastKeyOnFile) {
code = vnodeImportStartToCache(pImport, payload, rows);
} else if (pImport->firstKey < pObj->lastKeyOnFile) {
code = vnodeImportStartToFile(pImport, payload, rows);
} else { // firstKey == pObj->lastKeyOnFile
dTrace("vid:%d sid:%d id:%s, data is already there", pObj->vnode, pObj->sid, pObj->meterId);
if (pWriteIter->pos + 1 < pObj->pointsPerBlock) {
(pWriteIter->pos)++;
} else {
pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos + 1;
pWriteIter->slot = (pWriteIter->slot + 1) % pInfo->maxBlocks;
pWriteIter->pos = 0;
}
pBuffer->spos = (pBuffer->spos + 1) % pBuffer->totalRows;
}
return code;
if ((!checkBound) && pWriteIter->pos != 0) {
pInfo->cacheBlocks[pWriteIter->slot]->numOfPoints = pWriteIter->pos;
}
}
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *pNumOfPoints, TSKEY now) {
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
int rows;
char *payload;
int code = TSDB_CODE_ACTION_IN_PROGRESS;
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj *pShell = (SShellObj *)param;
int pointsImported = 0;
rows = htons(pSubmit->numOfRows);
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
if (expectedLen != contLen) {
dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId,
expectedLen, contLen);
return TSDB_CODE_WRONG_MSG_SIZE;
int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int rows) {
SMeterObj * pObj = pImport->pObj;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
int code = -1;
SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache);
int payloadIter;
SCachePool * pPool = (SCachePool *)(pVnode->pCachePool);
int isCacheIterEnd = 0;
int spayloadIter = 0;
int isAppendData = 0;
int rowsImported = 0;
int totalRows = 0;
size_t size = 0;
SMergeBuffer *pBuffer = NULL;
TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey <= lastKey && firstKey > pObj->lastKeyOnFile);
// TODO: make this condition less strict
if (pObj->freePoints < rows || pObj->freePoints < (pObj->pointsPerBlock << 1)) { // No free room to hold the data
dError("vid:%d sid:%d id:%s, import failed, cache is full, freePoints:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->freePoints);
pImport->importedRows = 0;
pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}
if (sversion != pObj->sversion) {
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->sversion, sversion);
return TSDB_CODE_OTHERS;
if (pInfo->numOfBlocks == 0) {
if (vnodeAllocateCacheBlock(pObj) < 0) {
pImport->importedRows = 0;
pImport->commit = 1;
code = TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}
}
payload = pSubmit->payLoad;
TSKEY firstKey = *(TSKEY *)payload;
TSKEY lastKey = *(TSKEY *)(payload + pObj->bytesPerPoint*(rows-1));
int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision];
TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 1;
if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) {
dError("vid:%d sid:%d id:%s, vnode lastKeyOnFile:%lld, data is out of range, rows:%d firstKey:%lld lastKey:%lld minAllowedKey:%lld maxAllowedKey:%lld",
pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, rows, firstKey, lastKey, minAllowedKey, maxAllowedKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
// Find the first importable record from payload
pImport->lastKey = lastKey;
for (payloadIter = 0; payloadIter < rows; payloadIter++) {
TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
if (key == pObj->lastKey) continue;
if (key > pObj->lastKey) { // Just as insert
pImport->slot = pInfo->currentSlot;
pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints;
isCacheIterEnd = 1;
break;
} else {
pImport->firstKey = key;
if (vnodeFindKeyInCache(pImport, 1) < 0) {
goto _exit;
}
if (pImport->firstKey != pImport->key) break;
}
}
// forward to peers
if (pShell && pVnode->cfg.replications > 1) {
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_IMPORT, sversion);
if (code != 0) return code;
if (payloadIter == rows) {
pImport->importedRows = 0;
code = 0;
goto _exit;
}
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
if (code != 0) return code;
spayloadIter = payloadIter;
if (pImport->pos == pObj->pointsPerBlock) assert(isCacheIterEnd);
// Allocate a new merge buffer work as buffer
totalRows = pObj->pointsPerBlock + rows - payloadIter + 1;
size = sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns + pObj->bytesPerPoint * totalRows;
pBuffer = (SMergeBuffer *)malloc(size);
if (pBuffer == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate memory, size:%d", pObj->vnode, pObj->sid, pObj->meterId, size);
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pBuffer->spos = 0;
pBuffer->epos = 0;
pBuffer->totalRows = totalRows;
pBuffer->offset[0] = (char *)pBuffer + sizeof(SMergeBuffer) + sizeof(char *) * pObj->numOfColumns;
for (int col = 1; col < pObj->numOfColumns; col++) {
pBuffer->offset[col] = pBuffer->offset[col - 1] + pObj->schema[col - 1].bytes * totalRows;
}
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now);
// TODO: take pImport->pos = pObj->pointsPerBlock into consideration
{ // Do the merge staff
SBlockIter cacheIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to traverse old cache data
SBlockIter writeIter = {pImport->slot, pImport->pos, 0, 0}; // Iter to write data to cache
int availPoints = pObj->pointsPerBlock - pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints;
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += pointsImported;
}
} else {
SImportInfo *pNew, import;
assert(availPoints >= 0);
dTrace("vid:%d sid:%d id:%s, import %d rows data", pObj->vnode, pObj->sid, pObj->meterId, rows);
memset(&import, 0, sizeof(import));
import.firstKey = *((TSKEY *)(payload));
import.lastKey = *((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint));
import.pObj = pObj;
import.pShell = pShell;
import.payload = payload;
import.rows = rows;
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
return code;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
while (1) {
if ((payloadIter >= rows) && isCacheIterEnd) break;
int32_t commitInProcess = 0;
if ((pBuffer->epos + 1) % pBuffer->totalRows == pBuffer->spos) { // merge buffer is full, flush
vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 1);
}
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo));
pNew->signature = pNew;
int payloadLen = contLen - sizeof(SSubmitMsg);
pNew->payload = malloc(payloadLen);
pNew->opayload = pNew->payload;
memcpy(pNew->payload, payload, payloadLen);
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->meterId, commitInProcess, pObj->numOfQueries);
/*
* vnodeProcessImportTimer will set the import status for this table, so need to
* set the import flag here
*/
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0;
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int ret = vnodeImportData(pObj, &import);
if (pShell) {
pShell->code = ret;
pShell->numOfTotalPoints += import.importedRows;
TSKEY payloadKey = (payloadIter < rows) ? KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) : INT64_MAX;
TSKEY cacheKey = (isCacheIterEnd) ? INT64_MAX : KEY_AT_INDEX(pInfo->cacheBlocks[cacheIter.slot]->offset[0], sizeof(TSKEY), cacheIter.pos);
if (cacheKey < payloadKey) { // if (payload end || (cacheIter not end && payloadKey > blockKey)), consume cache
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
pObj->schema[col].bytes);
}
FORWARD_ITER(cacheIter, 1, pInfo->maxBlocks, pObj->pointsPerBlock);
isCacheIterEnd = isCacheEnd(cacheIter, pObj);
} else if (cacheKey > payloadKey) { // cacheIter end || (payloadIter not end && payloadKey < blockKey), consume payload
if (availPoints == 0) { // Need to allocate a new cache block
pthread_mutex_lock(&(pPool->vmutex));
// TODO: Need to check if there are enough slots to hold a new one
SCacheBlock *pNewBlock = vnodeGetFreeCacheBlock(pVnode);
if (pNewBlock == NULL) { // Failed to allocate a new cache block, need to commit and loop over the remaining cache records
pthread_mutex_unlock(&(pPool->vmutex));
payloadIter = rows;
code = TSDB_CODE_ACTION_IN_PROGRESS;
pImport->commit = 1;
continue;
}
assert(pInfo->numOfBlocks <= pInfo->maxBlocks);
if (pInfo->numOfBlocks == pInfo->maxBlocks) {
vnodeFreeCacheBlock(pInfo->cacheBlocks[(pInfo->currentSlot + 1) % pInfo->maxBlocks]);
}
pNewBlock->pMeterObj = pObj;
pNewBlock->offset[0] = (char *)pNewBlock + sizeof(SCacheBlock) + sizeof(char *) * pObj->numOfColumns;
for (int col = 1; col < pObj->numOfColumns; col++)
pNewBlock->offset[col] = pNewBlock->offset[col - 1] + pObj->schema[col - 1].bytes * pObj->pointsPerBlock;
int newSlot = (writeIter.slot + 1) % pInfo->maxBlocks;
pInfo->blocks++;
int tblockId = pInfo->blocks;
if (writeIter.slot != pInfo->currentSlot) {
for (int tslot = pInfo->currentSlot; tslot != writeIter.slot;) {
int nextSlot = (tslot + 1) % pInfo->maxBlocks;
pInfo->cacheBlocks[nextSlot] = pInfo->cacheBlocks[tslot];
pInfo->cacheBlocks[nextSlot]->slot = nextSlot;
pInfo->cacheBlocks[nextSlot]->blockId = tblockId--;
tslot = (tslot - 1 + pInfo->maxBlocks) % pInfo->maxBlocks;
}
}
int index = pNewBlock->index;
if (cacheIter.slot == writeIter.slot) {
pNewBlock->numOfPoints = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints;
int pointsLeft = pInfo->cacheBlocks[cacheIter.slot]->numOfPoints - cacheIter.pos;
if (pointsLeft > 0) {
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy((void *)(pNewBlock->offset[col] + pObj->schema[col].bytes*cacheIter.pos),
pInfo->cacheBlocks[cacheIter.slot]->offset[col] + pObj->schema[col].bytes * cacheIter.pos,
pObj->schema[col].bytes * pointsLeft);
}
}
}
pNewBlock->blockId = tblockId;
pNewBlock->slot = newSlot;
pNewBlock->index = index;
pInfo->cacheBlocks[newSlot] = pNewBlock;
pInfo->numOfBlocks++;
pInfo->unCommittedBlocks++;
pInfo->currentSlot = (pInfo->currentSlot + 1) % pInfo->maxBlocks;
pthread_mutex_unlock(&(pPool->vmutex));
cacheIter.slot = (cacheIter.slot + 1) % pInfo->maxBlocks;
// move a cache of data forward
availPoints = pObj->pointsPerBlock;
}
int offset = 0;
for (int col = 0; col < pObj->numOfColumns; col++) {
memcpy(pBuffer->offset[col] + pObj->schema[col].bytes * pBuffer->epos,
payload + pObj->bytesPerPoint * payloadIter + offset, pObj->schema[col].bytes);
offset += pObj->schema[col].bytes;
}
if (spayloadIter == payloadIter) {// update pVnode->firstKey
pthread_mutex_lock(&(pVnode->vmutex));
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < pVnode->firstKey) pVnode->firstKey = firstKey;
pthread_mutex_unlock(&(pVnode->vmutex));
}
if (isCacheIterEnd) {
pObj->lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
if (!isAppendData) isAppendData = 1;
}
rowsImported++;
availPoints--;
payloadIter++;
} else {
payloadIter++;
continue;
}
pBuffer->epos = (pBuffer->epos + 1) % pBuffer->totalRows;
}
if (pBuffer->spos != pBuffer->epos) { // Flush the remaining data in the merge buffer
vnodeFlushMergeBuffer(pBuffer, &writeIter, &cacheIter, pObj, pInfo, 0);
} else {
// Should never come here
assert(false);
}
if (isAppendData) {
pthread_mutex_lock(&(pVnode->vmutex));
if (pObj->lastKey > pVnode->lastKey) pVnode->lastKey = pObj->lastKey;
pthread_mutex_unlock(&(pVnode->vmutex));
}
}
pImport->importedRows += rowsImported;
atomic_fetch_sub_32(&(pObj->freePoints), rowsImported);
code = TSDB_CODE_SUCCESS;
_exit:
tfree(pBuffer);
return code;
}
int vnodeImportDataToFiles(SImportInfo *pImport, char *payload, const int rows) {
int code = 0;
// TODO : Check the correctness of pObj and pVnode
SMeterObj *pObj = (SMeterObj *)(pImport->pObj);
SVnodeObj *pVnode = vnodeList + pObj->vnode;
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++;
int64_t delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
int sfid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0) / delta;
int efid = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1) / delta;
if (pShell) {
pShell->count--;
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints);
for (int fid = sfid; fid <= efid; fid++) {
TSKEY skey = fid * delta;
TSKEY ekey = skey + delta - 1;
int srow = 0, nrows = 0;
if (vnodeSearchKeyInRange(payload, pObj->bytesPerPoint, rows, skey, ekey, &srow, &nrows) < 0) continue;
assert(nrows > 0);
dTrace("vid:%d sid:%d meterId:%s, %d rows of data will be imported to file %d, srow:%d firstKey:%ld lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, nrows, fid, srow, KEY_AT_INDEX(payload, pObj->bytesPerPoint, srow),
KEY_AT_INDEX(payload, pObj->bytesPerPoint, (srow + nrows - 1)));
code = vnodeMergeDataIntoFile(pImport, payload + (srow * pObj->bytesPerPoint), nrows, fid);
if (code != TSDB_CODE_SUCCESS) break;
}
return 0;
return code;
}
//todo abort from the procedure if the meter is going to be dropped
// TODO : add offset in pShell to make it avoid repeatedly deal with messages
int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
int code = 0;
int code = 0;
int srow = 0, nrows = 0;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
// 1. import data in range (pObj->lastKeyOnFile, INT64_MAX) into cache
if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, pObj->lastKeyOnFile + 1, INT64_MAX,
&srow, &nrows) >= 0) {
assert(nrows > 0);
code = vnodeImportDataToCache(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
if (pImport->commit) { // Need to commit now
pPool->commitInProcess = 0;
vnodeProcessCommitTimer(pVnode, NULL);
return code;
}
if (pImport->lastKey > pObj->lastKeyOnFile) {
code = vnodeImportWholeToCache(pImport, pImport->payload, pImport->rows);
} else if (pImport->lastKey < pObj->lastKeyOnFile) {
code = vnodeImportWholeToFile(pImport, pImport->payload, pImport->rows);
} else { // lastKey == pObj->lastkeyOnFile
code = vnodeImportStartToFile(pImport, pImport->payload, pImport->rows);
if (code != TSDB_CODE_SUCCESS) return code;
}
SVnodeObj *pVnode = &vnodeList[pObj->vnode];
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
pPool->commitInProcess = 0;
// 2. import data (0, pObj->lastKeyOnFile) into files
if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, 0, pObj->lastKeyOnFile - 1, &srow,
&nrows) >= 0) {
assert(nrows > 0);
code = vnodeImportDataToFiles(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
}
if (pImport->commit) vnodeProcessCommitTimer(pVnode, NULL);
pPool->commitInProcess = 0;
return code;
}
......@@ -584,12 +584,12 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion);
if (code != 0) return code;
if (code != TSDB_CODE_SUCCESS) return code;
}
if (source == TSDB_DATA_SOURCE_SHELL && pVnode->cfg.replications > 1) {
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_INSERT, sversion);
if (code != 0) return code;
if (code != TSDB_CODE_SUCCESS) return code;
}
if (pObj->sversion < sversion) {
......@@ -601,11 +601,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
pData = pSubmit->payLoad;
code = TSDB_CODE_SUCCESS;
TSKEY firstKey = *((TSKEY *)pData);
TSKEY lastKey = *((TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1)));
int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision];
TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 2;
if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) {
......@@ -619,7 +619,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion
if (pObj->state >= TSDB_METER_STATE_DELETING) {
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
......@@ -648,6 +648,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pData += pObj->bytesPerPoint;
points++;
}
atomic_fetch_add_64(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1));
atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint);
......@@ -660,6 +661,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pVnode->version++;
pthread_mutex_unlock(&(pVnode->vmutex));
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
_over:
......
......@@ -39,10 +39,21 @@ SShellObj **shellList = NULL;
int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj);
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId);
int vnodeSelectReqNum = 0;
int vnodeInsertReqNum = 0;
typedef struct {
int32_t import;
int32_t vnode;
int32_t numOfSid;
int32_t ssid; // Start sid
SShellObj *pObj;
int64_t offset; // offset relative the blks
char blks[];
} SBatchSubmitInfo;
void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
int sid, vnode;
SShellObj *pObj = (SShellObj *)ahandle;
......@@ -249,6 +260,7 @@ int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints) {
char *pMsg, *pStart;
int msgLen;
dTrace("code:%d numOfTotalPoints:%d", code, numOfPoints);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128);
if (pStart == NULL) return -1;
pMsg = pStart;
......@@ -280,6 +292,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
}
if (pQueryMsg->numOfSids <= 0) {
dError("Invalid number of meters to query, numOfSids:%d", pQueryMsg->numOfSids);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
......@@ -485,10 +498,83 @@ int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) {
return msgLen;
}
static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *pVnode) {
int32_t sid = htonl(pBlocks->sid);
uint64_t uid = htobe64(pBlocks->uid);
if (sid >= pVnode->cfg.maxSessions || sid <= 0) {
dError("vid:%d sid:%d, sid is out of range", sid);
return TSDB_CODE_INVALID_TABLE_ID;
}
SMeterObj *pMeterObj = pVnode->meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, not active table", pVnode->vnode, sid);
vnodeSendMeterCfgMsg(pVnode->vnode, sid);
return TSDB_CODE_NOT_ACTIVE_TABLE;
}
if (pMeterObj->uid != uid) {
dError("vid:%d sid:%d id:%s, uid:%lld, uid in msg:%lld, uid mismatch", pVnode->vnode, sid, pMeterObj->meterId,
pMeterObj->uid, uid);
return TSDB_CODE_INVALID_SUBMIT_MSG;
}
return TSDB_CODE_SUCCESS;
}
static int vnodeDoSubmitJob(SVnodeObj *pVnode, int import, int32_t *ssid, int32_t esid, SShellSubmitBlock **ppBlocks,
TSKEY now, SShellObj *pObj) {
SShellSubmitBlock *pBlocks = *ppBlocks;
int code = TSDB_CODE_SUCCESS;
int32_t numOfPoints = 0;
int32_t i = 0;
for (i = *ssid; i < esid; i++) {
numOfPoints = 0;
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
if (import) {
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
// records for one table should be consecutive located in the payload buffer, which is guaranteed by client
if (code == TSDB_CODE_SUCCESS) {
pObj->count--;
}
} else {
code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
}
if (code != TSDB_CODE_SUCCESS) break;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
}
*ssid = i;
*ppBlocks = pBlocks;
return code;
}
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int code = 0, ret = 0;
int32_t i = 0;
SShellSubmitMsg shellSubmit = *(SShellSubmitMsg *)pMsg;
SShellSubmitMsg *pSubmit = &shellSubmit;
SShellSubmitBlock *pBlocks = NULL;
pSubmit->vnode = htons(pSubmit->vnode);
pSubmit->numOfSid = htonl(pSubmit->numOfSid);
......@@ -526,67 +612,69 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
pObj->count = pSubmit->numOfSid; // for import
pObj->code = 0; // for import
pObj->numOfTotalPoints = 0; // for import
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
pObj->numOfTotalPoints = 0;
int32_t numOfPoints = 0;
int32_t numOfTotalPoints = 0;
// We take current time here to avoid it in the for loop.
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
for (int32_t i = 0; i < pSubmit->numOfSid; ++i) {
numOfPoints = 0;
pBlocks->sid = htonl(pBlocks->sid);
pBlocks->uid = htobe64(pBlocks->uid);
pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
i = 0;
code = vnodeDoSubmitJob(pVnode, pSubmit->import, &i, pSubmit->numOfSid, &pBlocks, now, pObj);
if (pBlocks->sid >= pVnode->cfg.maxSessions || pBlocks->sid <= 0) {
dTrace("sid:%d is out of range", pBlocks->sid);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _submit_over;
_submit_over:
ret = 0;
if (pSubmit->import) { // Import case
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
SBatchSubmitInfo *pSubmitInfo =
(SBatchSubmitInfo *)calloc(1, sizeof(SBatchSubmitInfo) + msgLen - sizeof(SShellSubmitMsg));
if (pSubmitInfo == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
} else { // Start a timer to process the next part of request
pSubmitInfo->import = 1;
pSubmitInfo->vnode = pSubmit->vnode;
pSubmitInfo->numOfSid = pSubmit->numOfSid;
pSubmitInfo->ssid = i; // start from this position, not the initial position
pSubmitInfo->pObj = pObj;
pSubmitInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
assert(pSubmitInfo->offset >= 0);
memcpy((void *)(pSubmitInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg));
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
}
} else {
if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0);
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
}
} else { // Insert case
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
}
int vnode = pSubmit->vnode;
int sid = pBlocks->sid;
SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, not active table", vnode, sid);
vnodeSendMeterCfgMsg(vnode, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _submit_over;
}
atomic_fetch_add_32(&vnodeInsertReqNum, 1);
return ret;
}
if (pMeterObj->uid != pBlocks->uid) {
dError("vid:%d sid:%d, meterId:%s, uid:%lld, uid in msg:%lld, uid mismatch", vnode, sid, pMeterObj->meterId,
pMeterObj->uid, pBlocks->uid);
code = TSDB_CODE_INVALID_SUBMIT_MSG;
goto _submit_over;
}
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) {
SBatchSubmitInfo *pSubmitInfo = (SBatchSubmitInfo *)param;
assert(pSubmitInfo != NULL && pSubmitInfo->import);
// dont include sid, vid
int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int sversion = htonl(pBlocks->sversion);
int32_t i = 0;
int32_t code = TSDB_CODE_SUCCESS;
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
}
SShellObj * pShell = pSubmitInfo->pObj;
SVnodeObj * pVnode = &vnodeList[pSubmitInfo->vnode];
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pSubmitInfo->blks + pSubmitInfo->offset);
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
i = pSubmitInfo->ssid;
if (code != TSDB_CODE_SUCCESS) {break;}
code = vnodeDoSubmitJob(pVnode, pSubmitInfo->import, &i, pSubmitInfo->numOfSid, &pBlocks, now, pShell);
numOfTotalPoints += numOfPoints;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
pSubmitInfo->ssid = i;
pSubmitInfo->offset = ((char *)pBlocks) - pSubmitInfo->blks;
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
} else {
if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0);
tfree(param);
vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints);
}
_submit_over:
// for import, send the submit response only when return code is not zero
if (pSubmit->import == 0 || code != 0) ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints);
atomic_fetch_add_32(&vnodeInsertReqNum, 1);
return ret;
}
......@@ -21,6 +21,7 @@
#include "tsqlfunction.h"
#include "ttime.h"
#include "ttypes.h"
#include "tutil.h"
#pragma GCC diagnostic ignored "-Wformat"
......@@ -46,8 +47,7 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%u-%u");
strcat(tmpPath, "-%llu-%u");
snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1));
}
......@@ -431,7 +431,8 @@ void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *
}
void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) {
double v = *(double *)value;
//double v = *(double *)value;
double v = GET_DOUBLE_VAL(value);
if (pBucket->nRange.dMinVal == DBL_MAX) {
/*
......@@ -675,7 +676,8 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
break;
};
case TSDB_DATA_TYPE_DOUBLE: {
double val = *(double *)data;
//double val = *(double *)data;
double val = GET_DOUBLE_VAL(data);
if (r->dMinVal > val) {
r->dMinVal = val;
}
......@@ -686,7 +688,8 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) {
break;
};
case TSDB_DATA_TYPE_FLOAT: {
double val = *(float *)data;
//double val = *(float *)data;
double val = GET_FLOAT_VAL(data);
if (r->dMinVal > val) {
r->dMinVal = val;
......@@ -734,12 +737,14 @@ void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) {
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double val = *(double *)d;
//double val = *(double *)d;
double val = GET_DOUBLE_VAL(d);
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
double val = *(float *)d;
//double val = *(float *)d;
double val = GET_FLOAT_VAL(d);
(pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx);
break;
}
......@@ -840,16 +845,20 @@ static FORCE_INLINE int32_t columnValueAscendingComparator(char *f1, char *f2, i
return (first < second) ? -1 : 1;
};
case TSDB_DATA_TYPE_DOUBLE: {
double first = *(double *)f1;
double second = *(double *)f2;
//double first = *(double *)f1;
double first = GET_DOUBLE_VAL(f1);
//double second = *(double *)f2;
double second = GET_DOUBLE_VAL(f2);
if (first == second) {
return 0;
}
return (first < second) ? -1 : 1;
};
case TSDB_DATA_TYPE_FLOAT: {
float first = *(float *)f1;
float second = *(float *)f2;
//float first = *(float *)f1;
//float second = *(float *)f2;
float first = GET_FLOAT_VAL(f1);
float second = GET_FLOAT_VAL(f2);
if (first == second) {
return 0;
}
......@@ -1298,10 +1307,16 @@ double findOnlyResult(tMemBucket *pMemBucket) {
return *(int8_t *)pPage->data;
case TSDB_DATA_TYPE_BIGINT:
return (double)(*(int64_t *)pPage->data);
case TSDB_DATA_TYPE_DOUBLE:
return *(double *)pPage->data;
case TSDB_DATA_TYPE_FLOAT:
return *(float *)pPage->data;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = GET_DOUBLE_VAL(pPage->data);
//return *(double *)pPage->data;
return dv;
}
case TSDB_DATA_TYPE_FLOAT: {
float fv = GET_FLOAT_VAL(pPage->data);
//return *(float *)pPage->data;
return fv;
}
default:
return 0;
}
......@@ -1788,13 +1803,17 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
break;
};
case TSDB_DATA_TYPE_FLOAT: {
td = *(float *)thisVal;
nd = *(float *)nextVal;
//td = *(float *)thisVal;
//nd = *(float *)nextVal;
td = GET_FLOAT_VAL(thisVal);
nd = GET_FLOAT_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
td = *(double *)thisVal;
nd = *(double *)nextVal;
//td = *(double *)thisVal;
td = GET_DOUBLE_VAL(thisVal);
//nd = *(double *)nextVal;
nd = GET_DOUBLE_VAL(nextVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
......@@ -1831,15 +1850,17 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
break;
};
case TSDB_DATA_TYPE_FLOAT: {
finalResult = *(float *)thisVal;
//finalResult = *(float *)thisVal;
finalResult = GET_FLOAT_VAL(thisVal);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
finalResult = *(double *)thisVal;
//finalResult = *(double *)thisVal;
finalResult = GET_DOUBLE_VAL(thisVal);
break;
}
case TSDB_DATA_TYPE_BIGINT: {
finalResult = (double)*(int64_t *)thisVal;
finalResult = (double)(*(int64_t *)thisVal);
break;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册