提交 173abdee 编写于 作者: C Cary Xu

refactor

上级 86fb5708
...@@ -229,6 +229,14 @@ typedef struct { ...@@ -229,6 +229,14 @@ typedef struct {
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
int32_t len; int32_t len;
// head of SSubmitBlk
int64_t uid; // table unique id
int64_t suid; // stable id
int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk
const void* pMsg; const void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
...@@ -237,6 +245,15 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); ...@@ -237,6 +245,15 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
// TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter);
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks int32_t index; // index of failed block in submit blocks
int32_t vnode; // vnode index of failed block int32_t vnode; // vnode index of failed block
......
...@@ -57,7 +57,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); ...@@ -57,7 +57,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
* @param type * @param type
* @return * @return
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
...@@ -67,7 +67,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool ...@@ -67,7 +67,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool
* @param type * @param type
* @return * @return
*/ */
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted); int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
......
...@@ -94,6 +94,86 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -94,6 +94,86 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
} }
} }
// TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = htonl(pMsg->length);
ASSERT(pIter->totalLen > 0);
pIter->len = 0;
pIter->pMsg = pMsg;
if (pIter->totalLen <= sizeof(SSubmitReq)) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(pIter->len >= 0);
if (pIter->len == 0) {
pIter->len += sizeof(SSubmitReq);
} else {
if (pIter->len >= pIter->totalLen) {
ASSERT(0);
}
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen);
ASSERT(pIter->len > 0);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
if (pIter->len == pIter->totalLen) {
*pPBlock = NULL;
} else {
*pPBlock = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->uid = htobe64((*pPBlock)->uid);
pIter->suid = htobe64((*pPBlock)->suid);
pIter->sversion = htonl((*pPBlock)->sversion);
pIter->dataLen = htonl((*pPBlock)->dataLen);
pIter->schemaLen = htonl((*pPBlock)->schemaLen);
pIter->numOfRows = htons((*pPBlock)->numOfRows);
}
return 0;
}
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pMsgIter->dataLen <= 0) return -1;
pIter->totalLen = pMsgIter->dataLen;
pIter->len = 0;
pIter->row = (STSRow *)(pBlock->data + pMsgIter->schemaLen);
return 0;
}
STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) {
STSRow *row = pIter->row;
if (pIter->len >= pIter->totalLen) {
return NULL;
} else {
pIter->len += TD_ROW_LEN(row);
if (pIter->len < pIter->totalLen) {
pIter->row = POINTER_SHIFT(row, TD_ROW_LEN(row));
}
return row;
}
}
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
...@@ -126,7 +206,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) { ...@@ -126,7 +206,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) {
return 0; return 0;
} }
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse); tlen += taosEncodeFixedI8(buf, pEp->inUse);
......
...@@ -112,7 +112,6 @@ void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) ...@@ -112,7 +112,6 @@ void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList)
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
int32_t tqReadHandleSetMsgEx(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows, int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows,
int16_t *pNumOfCols); int16_t *pNumOfCols);
......
...@@ -56,7 +56,6 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); ...@@ -56,7 +56,6 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle); void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
typedef enum { typedef enum {
TSDB_FILE_HEAD = 0, // .head TSDB_FILE_HEAD = 0, // .head
......
...@@ -48,7 +48,8 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { ...@@ -48,7 +48,8 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t uid); int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
void tsdbUidStoreDestory(STbUidStore *pStore);
void *tsdbUidStoreFree(STbUidStore *pStore); void *tsdbUidStoreFree(STbUidStore *pStore);
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq); int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq);
......
...@@ -83,7 +83,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -83,7 +83,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId]; qTaskInfo_t task = pExec->task[workerId];
ASSERT(task); ASSERT(task);
qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); qSetStreamInput(task, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
...@@ -454,7 +454,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -454,7 +454,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
qTaskInfo_t task = pExec->task[workerId]; qTaskInfo_t task = pExec->task[workerId];
ASSERT(task); ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0; uint64_t ts = 0;
...@@ -648,7 +648,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -648,7 +648,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task; qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task); ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK, false); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) { while (1) {
SSDataBlock* pDataBlock = NULL; SSDataBlock* pDataBlock = NULL;
......
...@@ -33,40 +33,15 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { ...@@ -33,40 +33,15 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) { int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg; pReadHandle->pMsg = pMsg;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break;
pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
}
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
return 0;
}
int32_t tqReadHandleSetMsgEx(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ver) {
pReadHandle->pMsg = pMsg;
// iterate // iterate
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) { while (true) {
if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break; if (pReadHandle->pBlock == NULL) break;
} }
if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver; pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
return 0; return 0;
...@@ -74,7 +49,7 @@ int32_t tqReadHandleSetMsgEx(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64 ...@@ -74,7 +49,7 @@ int32_t tqReadHandleSetMsgEx(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) { while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
} }
if (pHandle->pBlock == NULL) return false; if (pHandle->pBlock == NULL) return false;
...@@ -82,7 +57,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -82,7 +57,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/ /*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash); ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); // void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret != NULL) { if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/ /*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/ /*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
...@@ -104,14 +80,14 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -104,14 +80,14 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
// TODO set to real sversion // TODO set to real sversion
int32_t sversion = 0; int32_t sversion = 0;
if (pHandle->sver != sversion) { if (pHandle->sver != sversion) {
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->pBlock->uid, sversion); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
tb_uid_t quid; tb_uid_t quid;
STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->pBlock->uid); STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->pVnodeMeta, pHandle->msgIter.uid);
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
quid = pTbCfg->ctbCfg.suid; quid = pTbCfg->ctbCfg.suid;
} else { } else {
quid = pHandle->pBlock->uid; quid = pHandle->msgIter.uid;
} }
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, quid, sversion, true);
pHandle->sver = sversion; pHandle->sver = sversion;
...@@ -120,7 +96,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -120,7 +96,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
STSchema* pTschema = pHandle->pSchema; STSchema* pTschema = pHandle->pSchema;
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
*pNumOfRows = pHandle->pBlock->numOfRows; *pNumOfRows = pHandle->msgIter.numOfRows;
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
if (colNumNeed > pSchemaWrapper->nCols) { if (colNumNeed > pSchemaWrapper->nCols) {
...@@ -167,8 +143,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -167,8 +143,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterInit(&iter, pTschema);
STSRow* row; STSRow* row;
int32_t curRow = 0; int32_t curRow = 0;
tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
// get all wanted col of that block // get all wanted col of that block
for (int32_t i = 0; i < colActual; i++) { for (int32_t i = 0; i < colActual; i++) {
......
...@@ -227,34 +227,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -227,34 +227,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0; return 0;
} }
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->suid = htobe64(pBlock->suid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
// STsdbMeta * pMeta = pTsdb->tsdbMeta; // STsdbMeta * pMeta = pTsdb->tsdbMeta;
......
...@@ -83,7 +83,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -83,7 +83,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
pRsp->msgType = TDMT_VND_SUBMIT_RSP; pRsp->msgType = TDMT_VND_SUBMIT_RSP;
vnodeProcessSubmitReq(pVnode, ptr, pRsp); vnodeProcessSubmitReq(pVnode, ptr, pRsp);
tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, pMsg->pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
break; break;
case TDMT_VND_MQ_VG_CHANGE: case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
...@@ -348,6 +348,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR ...@@ -348,6 +348,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
} }
} }
tsdbUpdateTbUidList(pVnode->pTsdb, ddlHandle.result);
vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray));
taosArrayDestroy(vCreateTbBatchReq.pArray); taosArrayDestroy(vCreateTbBatchReq.pArray);
if (vCreateTbBatchRsp.rspList) { if (vCreateTbBatchRsp.rspList) {
...@@ -360,8 +362,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR ...@@ -360,8 +362,6 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
pRsp->contLen = contLen; pRsp->contLen = contLen;
} }
tsdbUpdateTbUidList(pVnode->pTsdb, ddlHandle.result);
return 0; return 0;
} }
......
...@@ -407,7 +407,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ...@@ -407,7 +407,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
} }
} }
EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); // EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS);
EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg, 0), 0); EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg, 0), 0);
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "vnode.h" #include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id, bool converted) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -32,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -32,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id, converted); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else { } else {
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
...@@ -46,13 +46,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -46,13 +46,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (converted) { if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
if (tqReadHandleSetMsgEx(pInfo->readerHandle, input, 0) < 0) { qError("submit msg messed up when initing stream block, %s" PRIx64, id);
qError("converted: submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR;
}
} else if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) {
qError("raw msg: submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} else { } else {
...@@ -72,11 +67,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -72,11 +67,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool converted) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
return qSetMultiStreamInput(tinfo, input, 1, type, converted); return qSetMultiStreamInput(tinfo, input, 1, type);
} }
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type, bool converted) { int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -87,7 +82,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -87,7 +82,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo), converted); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -110,7 +110,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -110,7 +110,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
return -1; return -1;
} }
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
qSetStreamInput(exec, input, inputType, true); qSetStreamInput(exec, input, inputType);
while (1) { while (1) {
SSDataBlock* output; SSDataBlock* output;
uint64_t ts; uint64_t ts;
...@@ -128,7 +128,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -128,7 +128,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
/*for (int32_t i = 0; i < sz; i++) {*/ /*for (int32_t i = 0; i < sz; i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/ /*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/
/*qSetStreamInput(exec, pBlock, inputType);*/ /*qSetStreamInput(exec, pBlock, inputType);*/
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, true); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK);
while (1) { while (1) {
SSDataBlock* output; SSDataBlock* output;
uint64_t ts; uint64_t ts;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册