提交 834e6b57 编写于 作者: C Cary Xu

feat: rollup refactor

上级 20085fea
......@@ -230,12 +230,12 @@ typedef struct {
int32_t totalLen;
int32_t len;
// head of SSubmitBlk
int64_t uid; // table unique id
int64_t suid; // stable id
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
// int64_t uid; // table unique id
// int64_t suid; // stable id
// int32_t sversion; // data schema version
// int32_t dataLen; // data part length, not including the SSubmitBlk head
// int32_t schemaLen; // schema length, if length is 0, no schema exists
// int16_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk
const void* pMsg;
} SSubmitMsgIter;
......@@ -249,10 +249,10 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter);
// int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
// int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
// int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
// STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter);
typedef struct {
int32_t index; // index of failed block in submit blocks
......
......@@ -93,7 +93,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
return row;
}
}
#if 0
// TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
......@@ -173,7 +173,7 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) {
return row;
}
}
#endif
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
......
......@@ -56,7 +56,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
typedef enum {
TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data
......
......@@ -55,7 +55,7 @@ void *tsdbUidStoreFree(STbUidStore *pStore);
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq);
int32_t tsdbFetchTbUidList(void *pTsdb, void **result, void *suid, void *uid);
int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore);
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType);
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType);
#ifdef __cplusplus
}
......
......@@ -33,15 +33,24 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate
if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
// iterate and convert
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break;
pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
}
if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
return 0;
......@@ -49,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) {
if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
......@@ -57,8 +66,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash);
// void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
......@@ -80,14 +88,14 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
// TODO set to real sversion
int32_t sversion = 0;
if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion);
tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->msgIter.uid);
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid);
if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid;
} else {
quid = pHandle->msgIter.uid;
quid = pHandle->pBlock->uid;
}
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
pHandle->sver = sversion;
......@@ -96,7 +104,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
*pNumOfRows = pHandle->msgIter.numOfRows;
*pNumOfRows = pHandle->pBlock->numOfRows;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
if (colNumNeed > pSchemaWrapper->nCols) {
......@@ -143,8 +151,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
tdSTSRowIterInit(&iter, pTschema);
STSRow* row;
int32_t curRow = 0;
tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) {
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
// get all wanted col of that block
for (int32_t i = 0; i < colActual; i++) {
......
......@@ -227,6 +227,34 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0;
}
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->suid = htobe64(pBlock->suid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
......
......@@ -3636,6 +3636,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
goto _error;
} else {
tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
}
if (pTbCfg->type != META_SUPER_TABLE) {
......
......@@ -678,9 +678,9 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
return TSDB_CODE_FAILED;
}
// if (tdScanAndConvertSubmitMsg(pMsg) != TSDB_CODE_SUCCESS) {
// return TSDB_CODE_FAILED;
// }
if (tdScanAndConvertSubmitMsg(pMsg) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED;
......@@ -705,25 +705,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
SInterval interval = {0};
TSKEY lastWinSKey = INT64_MIN;
if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
while (true) {
tGetSubmitMsgNextEx(&msgIter, &pBlock);
tGetSubmitMsgNext(&msgIter, &pBlock);
if (!pBlock) break;
STSmaWrapper *pSW = NULL;
STSma *pTSma = NULL;
SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
pSW = tdFreeTSmaWrapper(pSW);
break;
}
while (true) {
STSRow *row = tGetSubmitBlkNextEx(&blkIter);
STSRow *row = tGetSubmitBlkNext(&blkIter);
if (!row) {
tdFreeTSmaWrapper(pSW);
break;
......@@ -1763,6 +1763,8 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
} else {
tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.suid);
}
return TSDB_CODE_SUCCESS;
......@@ -1915,10 +1917,17 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid
if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) {
tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
return TSDB_CODE_FAILED;
} else {
tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb),
pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
}
if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) {
tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno));
return TSDB_CODE_FAILED;
} else {
tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb),
pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0));
}
return TSDB_CODE_SUCCESS;
......@@ -1955,7 +1964,7 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbFetchSubmitReqSuids(const SSubmitReq *pMsg, STbUidStore *pStore) {
static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
......@@ -1966,11 +1975,12 @@ static int32_t tsdbFetchSubmitReqSuids(const SSubmitReq *pMsg, STbUidStore *pSto
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1;
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (!pBlock) break;
tsdbUidStorePut(pStore, msgIter.suid, NULL);
tsdbUidStorePut(pStore, pBlock->suid, NULL);
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
......@@ -2039,7 +2049,7 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in
return TSDB_CODE_SUCCESS;
}
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType) {
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType) {
SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
if (!pEnv) {
// only applicable when rsma env exists
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册