未验证 提交 3fedb004 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #11529 from taosdata/feature/vnode_refact

Feature/vnode refact
......@@ -91,9 +91,9 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->keep = pCreate->daysToKeep0;
pCfg->streamMode = pCreate->streamMode;
pCfg->isWeak = true;
pCfg->tsdbCfg.keep = pCreate->daysToKeep0;
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
pCfg->tsdbCfg.keep0 = pCreate->daysToKeep2;
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0;
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
pCfg->tsdbCfg.retentions = pCreate->pRetensions;
pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
......@@ -121,6 +121,8 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SCreateVnodeReq createReq = {0};
char path[TSDB_FILENAME_LEN];
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
......@@ -143,6 +145,14 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
// create vnode
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&createReq);
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
return -1;
}
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
......
......@@ -124,12 +124,12 @@ struct STsdbCfg {
int8_t precision;
int8_t update;
int8_t compression;
int32_t daysPerFile;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int32_t keep;
int32_t keep1;
int32_t days;
int32_t minRows;
int32_t maxRows;
int32_t keep2;
int32_t keep0;
int32_t keep1;
uint64_t lruCacheSize;
SArray *retentions;
};
......
......@@ -43,6 +43,7 @@ int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ====================
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
#if 1
// SVBufPool
......
......@@ -27,6 +27,7 @@
#include "tdbInt.h"
#include "tfs.h"
#include "tglobal.h"
#include "tjson.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tlosertree.h"
......
......@@ -55,7 +55,7 @@ typedef struct {
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRows)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static void tsdbStartCommit(STsdb *pRepo);
......@@ -217,14 +217,14 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
minKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep0 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision));
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision));
pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision));
pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision));
tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey,
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
......@@ -286,7 +286,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
return -1;
}
pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRows);
if (pCommith->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCommitH(pCommith);
......@@ -319,7 +319,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) {
if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
continue;
} else {
int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision));
int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->days, pCfg->precision));
if (fid == TSDB_IVLD_FID || fid > tfid) {
fid = tfid;
}
......@@ -346,7 +346,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
ASSERT(pSet == NULL || pSet->fid == fid);
tsdbResetCommitFile(pCommith);
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
// Set and open files
if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
......@@ -1210,8 +1210,8 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF
int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRows);
ASSERT((!isLast) || rowsToWrite < pCfg->minRows);
// Make buffer space
if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) {
......@@ -1460,7 +1460,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
if (pCommith->pDataCols->numOfRows <= 0) break;
if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRows) {
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
isLast = false;
} else {
......@@ -1619,7 +1619,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols
if (pCommith->pDataCols->numOfRows == 0) break;
if (isLastOneBlock) {
if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
if (pCommith->pDataCols->numOfRows < pCfg->minRows) {
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
isLast = true;
} else {
......@@ -1667,7 +1667,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) {
TASSERT(0);
}
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, pTarget->bitmapMode);
tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints,
pTarget->bitmapMode);
}
++pTarget->numOfRows;
......@@ -1774,11 +1775,11 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
ASSERT(mergeRows > 0);
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) {
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRows) {
if (pBlock->last) {
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
if (pCommith->isLFileSame && mergeRows < pCfg->minRows) return true;
} else {
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true;
if (pCommith->isDFileSame && mergeRows <= pCfg->maxRows) return true;
}
}
......
......@@ -190,8 +190,8 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
// ================== STsdbFS
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) {
int keep = pCfg->keep;
int days = pCfg->daysPerFile;
int keep = pCfg->keep2;
int days = pCfg->days;
int maxFSet = TSDB_MAX_FSETS(keep, days);
STsdbFS *pfs;
......
......@@ -19,9 +19,9 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows);
static STbData *tsdbNewTbData(tb_uid_t uid);
static void tsdbFreeTbData(STbData *pTbData);
static char * tsdbGetTsTupleKey(const void *data);
static char *tsdbGetTsTupleKey(const void *data);
static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char * tsdbTbDataGetUid(const void *arg);
static char *tsdbTbDataGetUid(const void *arg);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row);
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
......@@ -74,7 +74,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
}
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
SSubmitBlk * pBlock = NULL;
SSubmitBlk *pBlock = NULL;
SSubmitMsgIter msgIter = {0};
int32_t affectedrows = 0, numOfRows = 0;
......@@ -119,12 +119,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) {
ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0);
if (pIter == NULL) return 0;
STSchema * pSchema = NULL;
STSchema *pSchema = NULL;
TSKEY rowKey = 0;
TSKEY fKey = 0;
bool isRowDel = false;
int filterIter = 0;
STSRow * row = NULL;
STSRow *row = NULL;
SMergeInfo mInfo;
if (pMergeInfo == NULL) pMergeInfo = &mInfo;
......@@ -259,12 +259,12 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL;
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow * row = NULL;
STSRow *row = NULL;
TSKEY now = taosGetTimestamp(pTsdb->config.precision);
TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep;
TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.daysPerFile;
TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep2;
TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.days;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
......@@ -332,9 +332,9 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p
// STable *pTable = NULL;
SSubmitBlkIter blkIter = {0};
STsdbMemTable *pMemTable = pTsdb->mem;
void * tptr;
STbData * pTbData;
STSRow * row;
void *tptr;
STbData *pTbData;
STSRow *row;
TSKEY keyMin;
TSKEY keyMax;
......@@ -504,7 +504,7 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitReq *pMsg) {
#include "tskiplist.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
#define TSDB_MAX_INSERT_BATCH 512
typedef struct {
int32_t totalLen;
......
......@@ -17,12 +17,12 @@
const STsdbCfg defautlTsdbOptions = {.precision = 0,
.lruCacheSize = 0,
.daysPerFile = 10,
.minRowsPerFileBlock = 100,
.maxRowsPerFileBlock = 4096,
.keep = 3650,
.keep1 = 3650,
.days = 10,
.minRows = 100,
.maxRows = 4096,
.keep2 = 3650,
.keep0 = 3650,
.keep1 = 3650,
.update = 0,
.compression = TWO_STAGE_COMP};
......
......@@ -314,7 +314,7 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
STsdbCfg* pCfg = &pTsdb->config;
int64_t now = taosGetTimestamp(pCfg->precision);
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep) + 1; // needs to add one tick
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
}
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) {
......@@ -404,7 +404,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
}
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock);
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRows);
if (pReadHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......@@ -2199,7 +2199,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
break;
}
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
......@@ -2295,7 +2295,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
// find the start data block in file
pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
......@@ -2321,7 +2321,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
break;
}
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) ||
......@@ -2396,7 +2396,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
if (!pTsdbReadHandle->locateStart) {
pTsdbReadHandle->locateStart = true;
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision);
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
tsdbRLockFS(pFileHandle);
tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order);
......
......@@ -43,14 +43,14 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) {
return -1;
}
pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRows);
if (pReadh->pDCols[0] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyReadH(pReadh);
return -1;
}
pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock);
pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRows);
if (pReadh->pDCols[1] == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyReadH(pReadh);
......@@ -276,8 +276,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return 0;
}
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
int numOfColsIds, bool mergeBitmap) {
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
bool mergeBitmap) {
ASSERT(pBlock->numOfSubBlocks > 0);
int8_t update = pReadh->pRepo->config.update;
......@@ -513,7 +513,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
tdResetDataCols(pDataCols);
if(tsdbIsSupBlock(pBlock)) {
if (tsdbIsSupBlock(pBlock)) {
tdDataColsSetBitmapI(pDataCols);
}
......@@ -710,7 +710,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
tdResetDataCols(pDataCols);
if(tsdbIsSupBlock(pBlock)) {
if (tsdbIsSupBlock(pBlock)) {
tdDataColsSetBitmapI(pDataCols);
}
......@@ -836,7 +836,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
}
if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlockCol->blen, pBlock->algorithm,
pBlock->numOfRows, tLenBitmap, pCfg->maxRowsPerFileBlock, pReadh->pCBuf,
pBlock->numOfRows, tLenBitmap, pCfg->maxRows, pReadh->pCBuf,
(int32_t)taosTSizeof(pReadh->pCBuf)) < 0) {
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
pBlockCol->colId, offset);
......
......@@ -106,7 +106,8 @@ struct SSmaStat {
// expired window
static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version);
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
......@@ -197,7 +198,7 @@ static SPoolMem *openPool() {
static void clearPool(SPoolMem *pPool) {
if (!pPool) return;
SPoolMem *pMem;
do {
......@@ -544,7 +545,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return TSDB_CODE_SUCCESS;
};
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) {
static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey,
int64_t version) {
SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
if (pItem == NULL) {
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
......@@ -946,7 +948,7 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t
*/
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
STsdbCfg *pCfg = REPO_CFG(pTsdb);
int32_t daysPerFile = pCfg->daysPerFile;
int32_t daysPerFile = pCfg->days;
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
......
......@@ -18,8 +18,8 @@
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, uint8_t **ppData, int *len);
static int vnodeDecodeInfo(uint8_t *pData, int len, SVnodeInfo *pInfo);
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
static int vnodeCommit(void *arg);
......@@ -28,16 +28,14 @@ static void vnodeWaitCommit(SVnode *pVnode);
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile;
uint8_t *data;
int len;
char *data;
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
// encode info
data = NULL;
len = 0;
if (vnodeEncodeInfo(pInfo, &data, &len) < 0) {
if (vnodeEncodeInfo(pInfo, &data) < 0) {
return -1;
}
......@@ -48,7 +46,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
return -1;
}
if (taosWriteFile(pFile, data, len) < 0) {
if (taosWriteFile(pFile, data, strlen(data)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
......@@ -90,9 +88,53 @@ int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
return 0;
}
int vnodeLoadInfo(const char *dir) {
// TODO
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile = NULL;
char *pData = NULL;
int64_t size;
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
// read info
pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pData = taosMemoryMalloc(size);
if (pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (taosReadFile(pFile, pData, size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFile);
// decode info
if (vnodeDecodeInfo(pData, pInfo) < 0) {
taosMemoryFree(pData);
return -1;
}
taosMemoryFree(pData);
return 0;
_err:
taosCloseFile(&pFile);
taosMemoryFree(pData);
return -1;
}
int vnodeAsyncCommit(SVnode *pVnode) {
......@@ -137,12 +179,131 @@ static int vnodeEndCommit(SVnode *pVnode) {
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, uint8_t **ppData, int *len) {
// TODO
static int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
const SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
return 0;
}
static int vnodeDecodeInfo(uint8_t *pData, int len, SVnodeInfo *pInfo) {
// TODO
static int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
SVnodeCfg *pCfg = (SVnodeCfg *)pObj;
if (tjsonGetNumberValue(pJson, "vgId", pCfg->vgId) < 0) return -1;
if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1;
if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
if (tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1;
return 0;
}
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
const SVState *pState = (SVState *)pObj;
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
return 0;
}
static int vnodeDecodeState(const SJson *pJson, void *pObj) {
SVState *pState = (SVState *)pObj;
if (tjsonGetNumberValue(pJson, "commit version", pState->committed) < 0) return -1;
return 0;
}
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
SJson *pJson;
char *pData;
*ppData = NULL;
pJson = tjsonCreateObject();
if (pJson == NULL) {
return -1;
}
if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
goto _err;
}
if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
goto _err;
}
pData = tjsonToString(pJson);
if (pData == NULL) {
goto _err;
}
tjsonDelete(pJson);
*ppData = pData;
return 0;
_err:
tjsonDelete(pJson);
return -1;
}
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
SJson *pJson = NULL;
pJson = tjsonCreateObject();
if (pJson == NULL) {
return -1;
}
if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
goto _err;
}
if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
goto _err;
}
tjsonDelete(pJson);
return 0;
_err:
tjsonDelete(pJson);
return -1;
}
......@@ -46,6 +46,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return -1;
}
vInfo("vgId: %d vnode is created", pCfg->vgId);
return 0;
}
......
......@@ -14,12 +14,12 @@
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "command.h"
SSchedulerMgmt schMgmt = {0};
......@@ -68,8 +68,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
}
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
int64_t startTs, bool syncSchedule) {
int32_t code = 0;
int64_t startTs, bool syncSchedule) {
int32_t code = 0;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
......@@ -141,7 +141,6 @@ _return:
SCH_RET(code);
}
void schFreeRpcCtx(SRpcCtx *pCtx) {
if (NULL == pCtx) {
return;
......@@ -1047,12 +1046,12 @@ _return:
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
atomic_store_ptr(&pJob->resData, pRsp);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
schProcessOnDataFetched(pJob);
return TSDB_CODE_SUCCESS;
......@@ -1146,7 +1145,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (!SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -1180,13 +1179,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
if (SCH_IS_EXPLAIN_JOB(pJob)) {
if (rsp->completed) {
if (rsp->completed) {
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
if (pRsp) {
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
}
return TSDB_CODE_SUCCESS;
}
......@@ -1238,23 +1237,24 @@ _return:
}
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
*pTask = *task;
*pTask = *task;
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) {
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
taosArrayGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -1264,7 +1264,6 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
return TSDB_CODE_SUCCESS;
}
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......@@ -1282,13 +1281,15 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
if (TDMT_VND_EXPLAIN_RSP == msgType) {
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
} else {
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
if (NULL == pTask) {
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
......@@ -1444,7 +1445,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
}
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
int32_t code = 0;
int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
......@@ -1565,7 +1566,7 @@ _return:
}
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
int32_t code = 0;
SMsgSendInfo *pReadyMsgSendInfo = NULL;
SMsgSendInfo *pExplainMsgSendInfo = NULL;
......@@ -1578,7 +1579,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
int32_t msgType = TDMT_VND_RES_READY_RSP;
int32_t msgType = TDMT_VND_RES_READY_RSP;
SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
......@@ -1599,7 +1600,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
_return:
taosHashCleanup(pCtx->args);
if (pReadyMsgSendInfo) {
taosMemoryFreeClear(pReadyMsgSendInfo->param);
taosMemoryFreeClear(pReadyMsgSendInfo);
......@@ -1818,7 +1819,7 @@ _return:
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
}
SCH_RET(code);
}
......@@ -2319,7 +2320,7 @@ _return:
}
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule) {
bool syncSchedule) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
int32_t code = 0;
......@@ -2608,7 +2609,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
tsem_wait(&pJob->rspSem);
}
}
} else {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
......@@ -2670,11 +2671,11 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
taosArrayPush(pSub, &subDesc);
}
}
......@@ -2682,7 +2683,6 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
return TSDB_CODE_SUCCESS;
}
int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
......@@ -2734,15 +2734,17 @@ void schedulerFreeTaskList(SArray *taskList) {
void schedulerDestroy(void) {
if (schMgmt.jobRef) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
int64_t refId = 0;
while (pJob) {
refId = pJob->refId;
taosRemoveRef(schMgmt.jobRef, pJob->refId);
pJob = taosIterateRef(schMgmt.jobRef, pJob->refId);
pJob = taosIterateRef(schMgmt.jobRef, refId);
}
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = 0;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册