提交 ad9101d3 编写于 作者: C Cary Xu

feat: rollup sma data gen

上级 1d42f790
...@@ -230,12 +230,12 @@ typedef struct { ...@@ -230,12 +230,12 @@ typedef struct {
int32_t totalLen; int32_t totalLen;
int32_t len; int32_t len;
// head of SSubmitBlk // head of SSubmitBlk
// int64_t uid; // table unique id int64_t uid; // table unique id
// int64_t suid; // stable id int64_t suid; // stable id
// int32_t sversion; // data schema version int32_t sversion; // data schema version
// int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
// int32_t schemaLen; // schema length, if length is 0, no schema exists int32_t schemaLen; // schema length, if length is 0, no schema exists
// int16_t numOfRows; // total number of rows in current submit block int16_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk // head of SSubmitBlk
const void* pMsg; const void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
...@@ -249,10 +249,10 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); ...@@ -249,10 +249,10 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts // 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later // 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter // 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
// int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
// int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
// int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
// STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter);
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks int32_t index; // index of failed block in submit blocks
......
...@@ -93,7 +93,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -93,7 +93,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
return row; return row;
} }
} }
#if 0
// TODO: KEEP one suite of iterator API finally. // TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts // 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later // 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
...@@ -173,7 +173,7 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { ...@@ -173,7 +173,7 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) {
return row; return row;
} }
} }
#endif
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
......
...@@ -56,7 +56,6 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); ...@@ -56,7 +56,6 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle); void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
typedef enum { typedef enum {
TSDB_FILE_HEAD = 0, // .head TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data TSDB_FILE_DATA, // .data
......
...@@ -197,11 +197,10 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -197,11 +197,10 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
} }
memcpy(data, msg, msgLen); memcpy(data, msg, msgLen);
if (msgType == TDMT_VND_SUBMIT) { // make sure msgType == TDMT_VND_SUBMIT
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) { if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, ver) != 0) {
return -1; return -1;
} }
}
SRpcMsg req = { SRpcMsg req = {
.msgType = TDMT_VND_STREAM_TRIGGER, .msgType = TDMT_VND_STREAM_TRIGGER,
......
...@@ -33,24 +33,24 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { ...@@ -33,24 +33,24 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg; pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length); // pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert // iterate and convert
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) { while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break; if (pReadHandle->pBlock == NULL) break;
pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); // pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid); // pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion); // pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen); // pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen); // pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); // pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
} }
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver; pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
return 0; return 0;
...@@ -58,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ...@@ -58,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) { while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
} }
if (pHandle->pBlock == NULL) return false; if (pHandle->pBlock == NULL) return false;
...@@ -66,7 +66,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -66,7 +66,7 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/ /*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash); ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret != NULL) { if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/ /*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
...@@ -88,16 +88,18 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -88,16 +88,18 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
// TODO set to real sversion // TODO set to real sversion
int32_t sversion = 0; int32_t sversion = 0;
if (pHandle->sver != sversion) { if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
#if 0
tb_uid_t quid; tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->msgIter.uid);
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid; quid = pTbCfg->ctbCfg.suid;
} else { } else {
quid = pHandle->pBlock->uid; quid = pHandle->msgIter.uid;
} }
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
#endif
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
pHandle->sver = sversion; pHandle->sver = sversion;
} }
...@@ -151,8 +153,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -151,8 +153,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterInit(&iter, pTschema);
STSRow* row; STSRow* row;
int32_t curRow = 0; int32_t curRow = 0;
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
// get all wanted col of that block // get all wanted col of that block
for (int32_t i = 0; i < colActual; i++) { for (int32_t i = 0; i < colActual; i++) {
......
...@@ -678,9 +678,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers ...@@ -678,9 +678,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (tdScanAndConvertSubmitMsg(pMsg) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TDB_INIT_FAILED; terrno = TSDB_CODE_TDB_INIT_FAILED;
...@@ -705,25 +702,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers ...@@ -705,25 +702,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
SInterval interval = {0}; SInterval interval = {0};
TSKEY lastWinSKey = INT64_MIN; TSKEY lastWinSKey = INT64_MIN;
if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
while (true) { while (true) {
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNextEx(&msgIter, &pBlock);
if (!pBlock) break; if (!pBlock) break;
STSmaWrapper *pSW = NULL; STSmaWrapper *pSW = NULL;
STSma *pTSma = NULL; STSma *pTSma = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIter(pBlock, &blkIter) != TSDB_CODE_SUCCESS) { if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tdFreeTSmaWrapper(pSW);
break; break;
} }
while (true) { while (true) {
STSRow *row = tGetSubmitBlkNext(&blkIter); STSRow *row = tGetSubmitBlkNextEx(&blkIter);
if (!row) { if (!row) {
tdFreeTSmaWrapper(pSW); tdFreeTSmaWrapper(pSW);
break; break;
...@@ -1791,7 +1788,7 @@ int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { ...@@ -1791,7 +1788,7 @@ int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
if (!taosArrayPush(pStore->tbUids, &uid)) { if (!taosArrayPush(pStore->tbUids, uid)) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
...@@ -1806,14 +1803,14 @@ int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { ...@@ -1806,14 +1803,14 @@ int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) {
if (uid) { if (uid) {
SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t));
if (uidArray && ((uidArray = *(SArray **)uidArray))) { if (uidArray && ((uidArray = *(SArray **)uidArray))) {
taosArrayPush(uidArray, &uid); taosArrayPush(uidArray, uid);
} else { } else {
SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t)); SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t));
if (!pUidArray) { if (!pUidArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (!taosArrayPush(pUidArray, &uid)) { if (!taosArrayPush(pUidArray, uid)) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1975,12 +1972,12 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { ...@@ -1975,12 +1972,12 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
// pMsg->length = htonl(pMsg->length); // pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1;
while (true) { while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1;
if (!pBlock) break; if (!pBlock) break;
tsdbUidStorePut(pStore, pBlock->suid, NULL); tsdbUidStorePut(pStore, msgIter.suid, NULL);
} }
if (terrno != TSDB_CODE_SUCCESS) return -1; if (terrno != TSDB_CODE_SUCCESS) return -1;
...@@ -2014,6 +2011,8 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in ...@@ -2014,6 +2011,8 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (pRSmaInfo->taskInfo[0]) { if (pRSmaInfo->taskInfo[0]) {
tsdbDebug("vgId:%d execute rsma task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), pRSmaInfo->taskInfo[0],
*suid);
qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType); qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType);
while (1) { while (1) {
SSDataBlock *output; SSDataBlock *output;
...@@ -2026,7 +2025,11 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in ...@@ -2026,7 +2025,11 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t in
} }
taosArrayPush(pResult, output); taosArrayPush(pResult, output);
} }
if (taosArrayGetSize(pResult) > 0) {
blockDebugShowData(pResult); blockDebugShowData(pResult);
} else {
tsdbWarn("vgId:%d no sma data generated since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
} }
// if (pRSmaInfo->taskInfo[1]) { // if (pRSmaInfo->taskInfo[1]) {
......
...@@ -81,9 +81,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -81,9 +81,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
break; break;
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK);
pRsp->msgType = TDMT_VND_SUBMIT_RSP; pRsp->msgType = TDMT_VND_SUBMIT_RSP;
vnodeProcessSubmitReq(pVnode, ptr, pRsp); vnodeProcessSubmitReq(pVnode, ptr, pRsp);
tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK);
break; break;
case TDMT_VND_MQ_VG_CHANGE: case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册