提交 5b711788 编写于 作者: C Cary Xu

feat: rollup data storage

上级 c7ca57f5
......@@ -225,7 +225,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks);
void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
......
......@@ -1476,7 +1476,7 @@ void blockDebugShowData(const SArray* dataBlocks) {
*
* TODO: colId should be set
*/
void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid,
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid,
tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq);
......@@ -1489,6 +1489,10 @@ void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, S
ASSERT(bufSize < 3 * 1024 * 1024);
*pReq = taosMemoryCalloc(1, bufSize);
if(!(*pReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
void* pDataBuf = *pReq;
int32_t msgLen = sizeof(SSubmitReq);
......@@ -1582,4 +1586,6 @@ void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, S
blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
return TSDB_CODE_SUCCESS;
}
......@@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"rsma", // TSDB_FILE_RSMA
};
static const char *TSDB_DIR_NAME[] = {
static const char *TSDB_LEVEL_DNAME[] = {
"tsdb",
"rsma1",
"rsma2",
......@@ -51,7 +51,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_DIR_NAME[pRepo->level], fname);
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_LEVEL_DNAME[pRepo->level], fname);
tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname);
}
......
......@@ -175,7 +175,8 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids);
static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType,
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level);
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
int8_t level);
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
......@@ -1976,7 +1977,7 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
if (!pBlock) break;
tsdbUidStorePut(pStore, msgIter.suid, NULL);
pStore->uid = msgIter.uid; // TODO: remove, just for debugging
pStore->uid = msgIter.uid; // TODO: remove, just for debugging
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
......@@ -1984,8 +1985,8 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
}
static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType,
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid,
tb_uid_t uid, int8_t level) {
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
int8_t level) {
SArray *pResult = NULL;
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
suid);
......@@ -2013,10 +2014,18 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
if (taosArrayGetSize(pResult) > 0) {
blockDebugShowData(pResult);
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2);
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2);
SSubmitReq *pReq = NULL;
buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode),uid, suid);
tsdbProcessSubmitReq(sinkTsdb, INT64_MAX, pReq);
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode), uid, suid) != 0) {
taosArrayDestroy(pResult);
return TSDB_CODE_FAILED;
}
if (tsdbProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) {
taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED;
}
taosMemoryFreeClear(pReq);
} else {
tsdbWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", REPO_ID(pTsdb), level, tstrerror(terrno));
}
......@@ -2033,7 +2042,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
return TSDB_CODE_SUCCESS;
}
ASSERT(uid != 0); // TODO: remove later
ASSERT(uid != 0); // TODO: remove later
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册