提交 1a84d071 编写于 作者: L Liu Jicong

fix(stream): msg deserialize

上级 858868d7
...@@ -99,7 +99,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData) { ...@@ -99,7 +99,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData) {
// TODO // TODO
} }
int32_t getJsonValueLen(const char *data) { int32_t getJsonValueLen(const char* data) {
int32_t dataLen = 0; int32_t dataLen = 0;
if (*data == TSDB_DATA_TYPE_NULL) { if (*data == TSDB_DATA_TYPE_NULL) {
dataLen = CHAR_BYTES; dataLen = CHAR_BYTES;
...@@ -137,7 +137,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con ...@@ -137,7 +137,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t dataLen = 0; int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) { if (type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData); dataLen = getJsonValueLen(pData);
}else { } else {
dataLen = varDataTLen(pData); dataLen = varDataTLen(pData);
} }
...@@ -1301,7 +1301,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { ...@@ -1301,7 +1301,7 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
} }
} else if (n > 8) { } else if (n > 8) {
int32_t gap = len - newLen; int32_t gap = len - newLen;
while(i < newLen) { while (i < newLen) {
uint8_t v = p[i + gap]; uint8_t v = p[i + gap];
p[i] = (v << tail); p[i] = (v << tail);
...@@ -1316,7 +1316,6 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { ...@@ -1316,7 +1316,6 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
} }
} }
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) { static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t)); memmove(pColInfoData->varmeta.offset, &pColInfoData->varmeta.offset[n], (total - n) * sizeof(int32_t));
...@@ -1544,7 +1543,8 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) { ...@@ -1544,7 +1543,8 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) {
* *
* TODO: colId should be set * TODO: colId should be set
*/ */
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t suid) { int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks); int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq); int32_t bufSize = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; ++i) { for (int32_t i = 0; i < sz; ++i) {
...@@ -1600,15 +1600,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1600,15 +1600,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
offset, k); offset, k);
} else { } else {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, offset, k); tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
true, offset, k);
} }
break; break;
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, offset, k); tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break; break;
} }
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, offset, k); tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break; break;
} }
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
...@@ -1620,7 +1623,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -1620,7 +1623,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
break; break;
default: default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, var, true, offset, k); tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pColInfoData->info.type, TD_VTYPE_NORM, var,
true, offset, k);
} else { } else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0); TASSERT(0);
...@@ -1667,7 +1671,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1667,7 +1671,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
const char* stbFullName, int32_t vgId) { const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if(!tagArray) { if (!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -1692,8 +1696,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1692,8 +1696,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE; createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid; createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = 1, STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT, .type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId, .pData = (uint8_t*)&pDataBlock->info.groupId,
...@@ -1835,7 +1837,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -1835,7 +1837,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
return ret; return ret;
} }
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) {
int32_t* actualLen = (int32_t*)data; int32_t* actualLen = (int32_t*)data;
data += sizeof(int32_t); data += sizeof(int32_t);
......
...@@ -27,6 +27,8 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); ...@@ -27,6 +27,8 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -35,18 +35,19 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) { ...@@ -35,18 +35,19 @@ int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb) {
return 0; return 0;
} }
#if 1
int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
int8_t status; int8_t status;
// enqueue // enqueue
if (pBlock != NULL) { if (pData != NULL) {
pBlock->type = STREAM_DATA_TYPE_SSDATA_BLOCK; pData->type = STREAM_DATA_TYPE_SSDATA_BLOCK;
pBlock->sourceVg = pReq->sourceVg; pData->sourceVg = pReq->sourceVg;
pBlock->blocks = pReq->data; // decode
/*pData->blocks = pReq->data;*/
/*pBlock->sourceVer = pReq->sourceVer;*/ /*pBlock->sourceVer = pReq->sourceVer;*/
if (streamTaskInput(pTask, (SStreamQueueItem*)pBlock) == 0) { streamDispatchReqToData(pReq, pData);
if (streamTaskInput(pTask, (SStreamQueueItem*)pData) == 0) {
status = TASK_INPUT_STATUS__NORMAL; status = TASK_INPUT_STATUS__NORMAL;
} else { } else {
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__FAILED;
...@@ -68,7 +69,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -68,7 +69,6 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
#endif
int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
// 1. handle input // 1. handle input
......
...@@ -36,6 +36,29 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) { ...@@ -36,6 +36,29 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput) {
} }
#endif #endif
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) {
int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit(blockNum, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
}
taosArraySetSize(pArray, blockNum);
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data));
ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen));
for (int32_t i = 0; i < blockNum; i++) {
int32_t len = *(int32_t*)taosArrayGet(pReq->dataLen, i);
SRetrieveTableRsp* pRetrieve = taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockCompressDecode(pDataBlock, htonl(pRetrieve->numOfCols), htonl(pRetrieve->numOfRows), pRetrieve->data);
// TODO: refactor
pDataBlock->info.childId = pReq->sourceChildId;
}
pData->blocks = pArray;
return 0;
}
SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pDataSubmit == NULL) return NULL; if (pDataSubmit == NULL) return NULL;
......
...@@ -71,6 +71,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis ...@@ -71,6 +71,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->completed = 1; pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->numOfRows = htonl(pBlock->info.rows);
pRetrieve->numOfCols = htonl(pBlock->info.numOfCols);
int32_t actualLen = 0; int32_t actualLen = 0;
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false); blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册