提交 d5cd2d36 编写于 作者: X Xiaoyu Wang

feat: compatible with older versions of wal

上级 b7f4b93d
...@@ -207,8 +207,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -207,8 +207,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead)); void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SMsgHead); int32_t len = msgLen - sizeof(SSubmitReq2Msg);
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
TMSG_INFO(msgType), msg, pReq, len); TMSG_INFO(msgType), msg, pReq, len);
......
...@@ -71,7 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -71,7 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data; const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
...@@ -324,7 +323,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -324,7 +323,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
} }
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret); tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
if (ret < 0) { if (ret < 0) {
...@@ -340,7 +339,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t v ...@@ -340,7 +339,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t v
((SMsgHead*)(*pBuf))->vgId = vgId; ((SMsgHead*)(*pBuf))->vgId = vgId;
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen); ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
SEncoder coder = {0}; SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) ); tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) { if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
rpcFreeCont(*pBuf); rpcFreeCont(*pBuf);
*pBuf = NULL; *pBuf = NULL;
...@@ -440,7 +439,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -440,7 +439,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
goto _end; goto _end;
} }
for (int32_t rowId = 0; rowId < rows; rowId++) { for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq; SVCreateTbReq* pCreateTbReq = &createTbReq;
if (!pCreateTbReq) { if (!pCreateTbReq) {
goto _end; goto _end;
...@@ -482,9 +481,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -482,9 +481,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} }
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
STagVal tagVal = { STagVal tagVal = {
.cid = pTSchema->numOfCols + step, .cid = pTSchema->numOfCols + step,
.type = pTagData->info.type, .type = pTagData->info.type,
}; };
void* pData = colDataGetData(pTagData, rowId); void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) { if (colDataIsNull_s(pTagData, rowId)) {
...@@ -514,7 +513,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -514,7 +513,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
if (colDataIsNull_s(pTbColInfo, rowId)) { if (colDataIsNull_s(pTbColInfo, rowId)) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
} else { } else {
void* pTbData = colDataGetData(pTbColInfo, rowId); void* pTbData = colDataGetData(pTbColInfo, rowId);
...@@ -639,16 +638,16 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -639,16 +638,16 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayClear(pVals); taosArrayClear(pVals);
int32_t dataIndex = 0; int32_t dataIndex = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) { for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k]; const STColumn* pCol = &pTSchema->columns[k];
if (k == 0) { if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j); void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
} }
if (IS_SET_NULL(pCol)) { if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else{ } else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) { if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
...@@ -692,14 +691,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -692,14 +691,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int32_t code; int32_t code;
tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code); tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code);
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
goto _end; goto _end;
} }
((SMsgHead*)pBuf)->vgId = TD_VID(pVnode); ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
((SMsgHead*)pBuf)->contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) { if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr()); tqError("failed to encode submit req since %s", terrstr());
......
...@@ -129,14 +129,15 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int ...@@ -129,14 +129,15 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SSubmitReq2Msg);
pBuf = taosMemoryMalloc(len); pBuf = taosMemoryMalloc(len);
if (NULL == pBuf) { if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
((SMsgHead*)pBuf)->vgId = htonl(vgId); ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
((SMsgHead*)pBuf)->contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
code = tEncodeSSubmitReq2(&encoder, pReq); code = tEncodeSSubmitReq2(&encoder, pReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册