diff --git a/include/common/taosdef.h b/include/common/taosdef.h index dd1661f87103b02f1bdfc3b3d0cd4eede53c41c0..e1f8832edf146f71deed320a21c24c5df5a25292 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -79,11 +79,11 @@ typedef enum { } ETsdbSmaType; typedef enum { - TSDB_RSMA_RETENTION_0 = 0, - TSDB_RSMA_RETENTION_1 = 1, - TSDB_RSMA_RETENTION_2 = 2, - TSDB_RSMA_RETENTION_MAX = 3 -} ERSmaRetention; + TSDB_RETENTION_L0 = 0, + TSDB_RETENTION_L1 = 1, + TSDB_RETENTION_L2 = 2, + TSDB_RETENTION_MAX = 3 +} ERetentionLevel; extern char *qtypeStr[]; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 0d5262fe291981a87502f6b64fcad03763ff7396..d86586a378827f2e131f9d8e4a508dfb05d56608 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -225,6 +225,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); +void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, + tb_uid_t uid, tb_uid_t suid); + static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index b0439c722cb187da01ea5e14f451a3d11e1d17d1..36bef5e85a905bae1f4c84f71e352257791a600e 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -202,6 +202,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) // sync integration TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e703063dfa210a95469384d4b1d0ff8bd1f1632f..e4f344b88feca68d5fdc789cae88afed4bc3a8f5 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -187,17 +187,17 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c int32_t i = 0; uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)]; - int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1); - while (i < len) { // size limit of pSource->nullbitmap + int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1); + while (i < len) { // size limit of pSource->nullbitmap if (i >= 1) { - start[i - 1] |= (p[i] >> remindBits); //copy remind bits + start[i - 1] |= (p[i] >> remindBits); // copy remind bits } - if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap + if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap return; } - start[i] |= (p[i] << shiftBits); //copy shift bits + start[i] |= (p[i] << shiftBits); // copy shift bits i += 1; } } @@ -453,7 +453,6 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd // all fit in *stopIndex = numOfRows - 1; return TSDB_CODE_SUCCESS; - } SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { @@ -558,7 +557,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { if (IS_VAR_DATA_TYPE(pCol->info.type)) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); - char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small + char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -940,8 +939,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); - uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, - p3 - p2, p4 - p3, rows); + uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 + ", rows:%d\n", + p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); destroyTupleIndex(index); return TSDB_CODE_SUCCESS; @@ -1178,7 +1178,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) { } SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { - if(pDataBlock == NULL){ + if (pDataBlock == NULL) { return NULL; } @@ -1189,7 +1189,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; - pBlock->info.rowSize = pDataBlock->info.rows; + pBlock->info.rowSize = pDataBlock->info.rows; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; @@ -1219,7 +1219,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); + return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock)); } void colDataDestroy(SColumnInfoData* pColData) { @@ -1236,14 +1236,14 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { int32_t len = BitmapLen(total); int32_t newLen = BitmapLen(total - n); - if (n%8 == 0) { - memmove(nullBitmap, nullBitmap + n/8, newLen); + if (n % 8 == 0) { + memmove(nullBitmap, nullBitmap + n / 8, newLen); } else { int32_t tail = n % 8; int32_t i = 0; - uint8_t* p = (uint8_t*) nullBitmap; - while(i < len) { + uint8_t* p = (uint8_t*)nullBitmap; + while (i < len) { uint8_t v = p[i]; p[i] = 0; @@ -1270,7 +1270,7 @@ static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_ } } -int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) { +int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) { if (n == 0) { return TSDB_CODE_SUCCESS; } @@ -1278,7 +1278,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) { if (pBlock->info.rows <= n) { blockDataCleanup(pBlock); } else { - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows); } @@ -1464,3 +1464,122 @@ void blockDebugShowData(const SArray* dataBlocks) { } } +/** + * @brief TODO: Assume that the final generated result it less than 3M + * + * @param pReq + * @param pDataBlocks + * @param vgId + * @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in + * SDataBlock->info.uid + * @param suid // TODO: check with Liao whether suid response is reasonable + * + * TODO: colId should be set + */ +void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid, + tb_uid_t suid) { + int32_t sz = taosArrayGetSize(pDataBlocks); + int32_t bufSize = sizeof(SSubmitReq); + for (int32_t i = 0; i < sz; ++i) { + SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; + bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols)); + bufSize += sizeof(SSubmitBlk); + } + + ASSERT(bufSize < 3 * 1024 * 1024); + + *pReq = taosMemoryCalloc(1, bufSize); + void* pDataBuf = *pReq; + + int32_t msgLen = sizeof(SSubmitReq); + int32_t numOfBlks = 0; + SRowBuilder rb = {0}; + tdSRowInit(&rb, 0); // TODO: use the latest version + + for (int32_t i = 0; i < sz; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); + int32_t colNum = pDataBlock->info.numOfCols; + int32_t rows = pDataBlock->info.rows; + int32_t rowSize = pDataBlock->info.rowSize; + int64_t groupId = pDataBlock->info.groupId; + + if(rb.nCols != colNum) { + tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); + } + + SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); + pSubmitBlk->suid = suid; + pSubmitBlk->uid = uid; + pSubmitBlk->numOfRows = rows; + + ++numOfBlks; + + msgLen += sizeof(SSubmitBlk); + int32_t dataLen = 0; + for (int32_t j = 0; j < rows; ++j) { // iterate by row + tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf + printf("|"); + bool isStartKey = false; + for (int32_t k = 0; k < colNum; ++k) { // iterate by column + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + switch (pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + if (!isStartKey) { + isStartKey = true; + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 0, 0); + } else { + tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 8, k); + break; + } + break; + case TSDB_DATA_TYPE_NCHAR: { + tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, 8, k); + break; + } + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY + tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, 8, k); + break; + } + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + case TSDB_DATA_TYPE_MEDIUMBLOB: + printf("the column type %" PRIi16 " is defined but not implemented yet\n", pColInfoData->info.type); + TASSERT(0); + break; + default: + if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { + tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, 8, k); + } else { + printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); + TASSERT(0); + } + break; + } + } + dataLen += TD_ROW_LEN(rb.pBuf); + } + pSubmitBlk->dataLen = dataLen; + msgLen += pSubmitBlk->dataLen; + } + + (*pReq)->length = msgLen; + + (*pReq)->header.vgId = htonl(vgId); + (*pReq)->header.contLen = htonl(msgLen); + (*pReq)->length = (*pReq)->header.contLen; + (*pReq)->numOfBlocks = htonl(numOfBlks); + SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); + while (numOfBlks--) { + int32_t dataLen = blk->dataLen; + blk->uid = htobe64(blk->uid); + blk->suid = htobe64(blk->suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(blk->sversion); + blk->dataLen = htonl(blk->dataLen); + blk->schemaLen = htonl(blk->schemaLen); + blk->numOfRows = htons(blk->numOfRows); + blk = (SSubmitBlk*)(blk->data + dataLen); + } +} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index b63a7f5b32be3dc86e3119c6161ea47072c66770..e3816396c9bd766923a7346b934d8ee394e2004c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -284,6 +284,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f4fccc2f497ff698eb627161f872133d25c6d414..62deb200c95e716c07dd0bc146cd44f9221f0123 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -37,7 +37,7 @@ #ifdef __cplusplus extern "C" { #endif -#define TSDB_VNODE_SMA_DEBUG // TODO: evaluate to remove the macro and the relative codes + // vnode typedef struct SVnode SVnode; typedef struct STsdbCfg STsdbCfg; // todo: remove @@ -145,7 +145,7 @@ struct STsdbCfg { int32_t keep2; // TODO: save to tsdb cfg file int8_t type; // ETsdbType - SRetention retentions[TSDB_RSMA_RETENTION_MAX]; + SRetention retentions[TSDB_RETENTION_MAX]; }; typedef enum { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 427ce8c1b63b7c8cbc5ded903c569da62669e47b..33ffe7eda6b20121c37c05188bdf9033f2928d60 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -72,6 +72,7 @@ struct STsdb { char *path; SVnode *pVnode; bool repoLocked; + int8_t level; // retention level TdThreadMutex mutex; STsdbMemTable *mem; STsdbMemTable *imem; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a9ba2e1de32e67aa0f1f6f036f58aa71d704e4a1..1c7f6034ee74599b545c8a0bce76b2aa9542a0d8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -124,6 +124,7 @@ int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore); void tsdbUidStoreDestory(STbUidStore* pStore); void* tsdbUidStoreFree(STbUidStore* pStore); int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType); +int32_t tsdbProcessSubmitReq(STsdb* pTsdb, int64_t version, void* pReq); typedef struct { int8_t streamType; // sma or other @@ -181,6 +182,7 @@ struct SVnode { struct STbUidStore { tb_uid_t suid; + tb_uid_t uid; // TODO: just for debugging, remove when uid provided in SSDataBlock SArray* tbUids; SHashObj* uidHash; }; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index b1af3e046161300b8d104dd2a4968ee418d7ea56..25566d6dada3fe3ea835653ee72fb75c69726b96 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -27,7 +27,13 @@ static const char *TSDB_FNAME_SUFFIX[] = { "rsma", // TSDB_FILE_RSMA }; -static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); +static const char *TSDB_DIR_NAME[] = { + "tsdb", + "rsma1", + "rsma2", +}; + +static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char* dname, char *fname); // static int tsdbRollBackMFile(SMFile *pMFile); static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo); static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo); @@ -45,7 +51,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t pDFile->info.magic = TSDB_FILE_INIT_MAGIC; pDFile->info.fver = tsdbGetDFSVersion(ftype); - tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, fname); + tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_DIR_NAME[pRepo->level], fname); tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname); } @@ -431,14 +437,15 @@ int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype return 0; } -static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) { +static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname) { ASSERT(ftype != TSDB_FILE_MAX); if (ftype < TSDB_FILE_MAX) { if (ver == 0) { - snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]); + snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s", vid, dname, vid, fid, + TSDB_FNAME_SUFFIX[ftype]); } else { - snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-ver%" PRIu32, vid, vid, fid, + snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s-ver%" PRIu32, vid, dname, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver); } } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index da363d9bc99b8d5aec832ba2ae65828b2dc7b352..e18c01dc016fc1d63d14ca49440f9cb1528d62fa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -15,21 +15,21 @@ #include "tsdb.h" -static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir); +static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level); int tsdbOpen(SVnode *pVnode, int8_t type) { switch (type) { case TSDB_TYPE_TSDB: - return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR); + return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0); case TSDB_TYPE_TSMA: ASSERT(0); break; case TSDB_TYPE_RSMA_L0: - return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR); + return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0); case TSDB_TYPE_RSMA_L1: - return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR); + return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR, TSDB_RETENTION_L1); case TSDB_TYPE_RSMA_L2: - return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR); + return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR, TSDB_RETENTION_L2); default: ASSERT(0); break; @@ -37,7 +37,17 @@ int tsdbOpen(SVnode *pVnode, int8_t type) { return 0; } -int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) { +/** + * @brief + * + * @param pVnode + * @param type + * @param ppTsdb + * @param dir + * @param level retention level + * @return int + */ +int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) { STsdb *pTsdb = NULL; int slen = 0; @@ -55,6 +65,7 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) { sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP, dir); pTsdb->pVnode = pVnode; + pTsdb->level = level; pTsdb->repoLocked = false; taosThreadMutexInit(&pTsdb->mutex, NULL); pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb)); @@ -79,6 +90,7 @@ _err: int tsdbClose(STsdb *pTsdb) { if (pTsdb) { + // TODO: destroy mem/imem tsdbCloseFS(pTsdb); tsdbFreeFS(pTsdb->fs); taosMemoryFree(pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index d4532cc3ac03f5ba83677032ba9936b865c2ebb0..0e38deb17d865f535059ec8ae9ca8a32a73a488f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -105,7 +105,7 @@ typedef struct { #define RSMA_TASK_INFO_HASH_SLOT 8 struct SRSmaInfo { - void *taskInfo[TSDB_RSMA_RETENTION_2]; // qTaskInfo_t + void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t }; struct SSmaStat { @@ -127,7 +127,7 @@ static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) { } static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) { - for (int32_t i = 0; i < TSDB_RSMA_RETENTION_MAX; ++i) { + for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { if (pInfo->taskInfo[i]) { tsdbFreeTaskHandle(pInfo->taskInfo[i]); } @@ -175,7 +175,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids); static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType, - qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention); + qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); @@ -1976,6 +1976,7 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { if (!pBlock) break; tsdbUidStorePut(pStore, msgIter.suid, NULL); + pStore->uid = msgIter.uid; // TODO: remove, just for debugging } if (terrno != TSDB_CODE_SUCCESS) return -1; @@ -1983,13 +1984,15 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { } static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType, - qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention) { + qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, + tb_uid_t uid, int8_t level) { SArray *pResult = NULL; - tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), retention, taskInfo, + tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, suid); + qSetStreamInput(taskInfo, pMsg, inputType); while (1) { - SSDataBlock *output; + SSDataBlock *output = NULL; uint64_t ts; if (qExecTask(taskInfo, &output, &ts) < 0) { ASSERT(false); @@ -2010,22 +2013,28 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, if (taosArrayGetSize(pResult) > 0) { blockDebugShowData(pResult); + STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2); + SSubmitReq *pReq = NULL; + buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode),uid, suid); + tsdbProcessSubmitReq(sinkTsdb, INT64_MAX, pReq); } else { - tsdbWarn("vgId:%d no rsma_1 data generated since %s", REPO_ID(pTsdb), tstrerror(terrno)); + tsdbWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", REPO_ID(pTsdb), level, tstrerror(terrno)); } - + taosArrayDestroy(pResult); return TSDB_CODE_SUCCESS; } -int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_uid_t suid) { +static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_uid_t suid, tb_uid_t uid) { SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); if (!pEnv) { // only applicable when rsma env exists return TSDB_CODE_SUCCESS; } + ASSERT(uid != 0); // TODO: remove later + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; @@ -2037,8 +2046,11 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_ui } if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { - tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], suid, TSDB_RSMA_RETENTION_1); - tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], suid, TSDB_RSMA_RETENTION_2); + // TODO: use the proper schema instead of 0, and cache STSchema in cache + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0); + tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1); + tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2); + taosMemoryFree(pTSchema); } return TSDB_CODE_SUCCESS; @@ -2056,12 +2068,12 @@ int32_t tsdbTriggerRSma(STsdb *pTsdb, void *pMsg, int32_t inputType) { tsdbFetchSubmitReqSuids(pMsg, &uidStore); if (uidStore.suid != 0) { - tsdbExecuteRSma(pTsdb, pMsg, inputType, uidStore.suid); + tsdbExecuteRSma(pTsdb, pMsg, inputType, uidStore.suid, uidStore.uid); void *pIter = taosHashIterate(uidStore.uidHash, NULL); while (pIter) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tsdbExecuteRSma(pTsdb, pMsg, inputType, *pTbSuid); + tsdbExecuteRSma(pTsdb, pMsg, inputType, *pTbSuid, 0); pIter = taosHashIterate(uidStore.uidHash, pIter); } diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 8130a00e04f83f97d6f7618070e43af52a949f22..7eac0389d22858f94df2666acf5c7daa46cfee35 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -66,7 +66,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; -#ifdef TSDB_VNODE_SMA_DEBUG if (pCfg->tsdbCfg.retentions[0].freq > 0) { int32_t nRetention = 1; if (pCfg->tsdbCfg.retentions[1].freq > 0) { @@ -87,7 +86,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { tjsonAddItemToArray(pNodeRetentions, pNodeRetention); } } -#endif if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; @@ -135,11 +133,11 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; -#ifdef TSDB_VNODE_SMA_DEBUG SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions"); - int nRetention = tjsonGetArraySize(pNodeRetentions); - ASSERT(nRetention <= TSDB_RSMA_RETENTION_MAX); - + int32_t nRetention = tjsonGetArraySize(pNodeRetentions); + if (nRetention > TSDB_RETENTION_MAX) { + nRetention = TSDB_RETENTION_MAX; + } for (int32_t i = 0; i < nRetention; ++i) { SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); ASSERT(pNodeRetention != NULL); @@ -148,7 +146,6 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep); tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit); } -#endif if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 7026673231a4b07865d009215dd3826b99e3d119..8bf68c5d29367010847e3fc93d7ec353b661ede5 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -47,9 +47,26 @@ int vnodeBegin(SVnode *pVnode) { } // begin tsdb - if (tsdbBegin(pVnode->pTsdb) < 0) { - vError("vgId:%d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; + if (vnodeIsRollup(pVnode)) { + if (tsdbBegin(VND_RSMA0(pVnode)) < 0) { + vError("vgId:%d failed to begin rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } + + if (tsdbBegin(VND_RSMA1(pVnode)) < 0) { + vError("vgId:%d failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } + + if (tsdbBegin(VND_RSMA2(pVnode)) < 0) { + vError("vgId:%d failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } + } else { + if (tsdbBegin(pVnode->pTsdb) < 0) { + vError("vgId:%d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); + return -1; + } } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index eddd295e2ebc1df21e70fbac07281f2baad9c412..91a470b118731db07964660835a212e382fbb9d1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -461,7 +461,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in SSubmitRsp rsp = {0}; pRsp->code = 0; - tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); + // handle the request if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) { pRsp->code = terrno; @@ -470,12 +470,28 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in // pRsp->msgType = TDMT_VND_SUBMIT_RSP; // vnodeProcessSubmitReq(pVnode, ptr, pRsp); - // tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); // encode the response (TODO) pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); memcpy(pRsp->pCont, &rsp, sizeof(rsp)); pRsp->contLen = sizeof(SSubmitRsp); + tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK); + return 0; } + +int32_t tsdbProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { + if(!pReq) { + terrno = TSDB_CODE_INVALID_PTR; + return TSDB_CODE_FAILED; + } + + SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; + + if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; +}