未验证 提交 ac1f6021 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13542 from taosdata/feature/stream

fix(stream): msg deserialize
...@@ -1849,11 +1849,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1849,11 +1849,12 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
} }
ret->length = htonl(ret->length); ret->length = htonl(ret->length);
taosArrayDestroy(tagArray); taosArrayDestroy(tagArray);
return ret; return ret;
} }
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) {
int32_t* actualLen = (int32_t*)data; int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t); data += sizeof(int32_t);
...@@ -1947,4 +1948,4 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t ...@@ -1947,4 +1948,4 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
ASSERT(pStart - pData == dataLen); ASSERT(pStart - pData == dataLen);
return pStart; return pStart;
} }
\ No newline at end of file
...@@ -27,6 +27,8 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); ...@@ -27,6 +27,8 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -35,18 +35,19 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { ...@@ -35,18 +35,19 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
return 0; return 0;
} }
#if 1
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status; int8_t status;
// enqueue // enqueue
if (pBlock != NULL) { if (pData != NULL) {
pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pBlock->sourceVg = pReq->sourceVg; pData->sourceVg = pReq->sourceVg;
pBlock->blocks = pReq->data; // decode
/*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/ /*pBlock->sourceVer = pReq->sourceVer;*/
if (streamTaskInput(pTask, (SStreamQueueItem*)pBlock) == 0) { streamDispatchReqToData(pReq, pData);
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { } else {
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__FAILED;
...@@ -68,7 +69,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -68,7 +69,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
#endif
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
// 1. handle input // 1. handle input
......
...@@ -36,6 +36,29 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { ...@@ -36,6 +36,29 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
} }
#endif #endif
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
}
taosArraySetSize(pArray, blockNum);
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
for (int32_t i = 0; i < blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor
pDataBlock->info.childId = pReq->sourceChildId;
}
pData->blocks = pArray;
return 0;
}
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pDataSubmit == NULL) return NULL; if (pDataSubmit == NULL) return NULL;
......
...@@ -71,6 +71,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -71,6 +71,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->numOfCols = htonl(pBlock->info.numOfCols);
int32_t actualLen = 0; int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false); blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册