From 3d9cc764898ca38ec281cd30244b47d2dea7cf5c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 8 Dec 2022 15:56:17 +0800 Subject: [PATCH] add uid check --- source/dnode/vnode/src/tq/tqSink.c | 98 ++++++++++++++++++------------ 1 file changed, 60 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3e772b1e1c..a35c2c3ef0 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -750,12 +750,12 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d } void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { - const SArray* pBlocks = (const SArray*)data; - SVnode* pVnode = (SVnode*)vnode; - int64_t suid = pTask->tbSink.stbUid; - char* stbFullName = pTask->tbSink.stbFullName; - STSchema* pTSchema = pTask->tbSink.pTSchema; - SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper; + const SArray* pBlocks = (const SArray*)data; + SVnode* pVnode = (SVnode*)vnode; + int64_t suid = pTask->tbSink.stbUid; + char* stbFullName = pTask->tbSink.stbFullName; + STSchema* pTSchema = pTask->tbSink.pTSchema; + /*SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;*/ int32_t blockSz = taosArrayGetSize(pBlocks); @@ -780,7 +780,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } for (int32_t i = 0; i < blockSz; i++) { - bool createTb = true; SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { SBatchDeleteReq deleteReq = {0}; @@ -818,13 +817,31 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tqDebug("failed to put delete req into write-queue since %s", terrstr()); } } else { - SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData)); - if (!pTbData) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + SSubmitTbData tbData = {0}; + int32_t rows = pDataBlock->info.rows; + tqDebug("tq sink, convert block1 %d, rows: %d", i, rows); + + if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; } - // create table req - if (createTb) { + + tbData.suid = suid; + tbData.uid = 0; // uid is assigned by vnode + tbData.sver = pTSchema->version; + + char* ctbName = NULL; + if (pDataBlock->info.parTbName[0]) { + ctbName = strdup(pDataBlock->info.parTbName); + } else { + ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + if (metaGetTableEntryByName(&mr, ctbName) < 0) { + metaReaderClear(&mr); + tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); + SVCreateTbReq* pCreateTbReq = NULL; if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) { @@ -867,31 +884,36 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* pCreateTbReq->ctb.tagName = tagName; // set table name - if (pDataBlock->info.parTbName[0]) { - pCreateTbReq->name = strdup(pDataBlock->info.parTbName); - } else { - pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); - } - pTbData->pCreateTbReq = pCreateTbReq; - pTbData->flags = SUBMIT_REQ_AUTO_CREATE_TABLE; - } + pCreateTbReq->name = ctbName; + ctbName = NULL; - pTbData->suid = suid; - pTbData->uid = 0; // uid is assigned by vnode - pTbData->sver = pTSchema->version; + tbData.pCreateTbReq = pCreateTbReq; + tbData.flags = SUBMIT_REQ_AUTO_CREATE_TABLE; + } else { + if (mr.me.type != TSDB_CHILD_TABLE) { + tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, + mr.me.type); + metaReaderClear(&mr); + taosMemoryFree(ctbName); + continue; + } - int32_t rows = pDataBlock->info.rows; + if (mr.me.ctbEntry.suid != suid) { + tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 + ", actual suid %" PRId64 "", + TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); + metaReaderClear(&mr); + taosMemoryFree(ctbName); + } - if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) { - taosMemoryFreeClear(pTbData); - goto _end; + tbData.uid = mr.me.uid; + metaReaderClear(&mr); + taosMemoryFreeClear(ctbName); } - tqDebug("tq sink, convert block1 %d, rows: %d", i, rows); - + // rows if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) { - taosArrayDestroy(pTbData->aRowP); - taosMemoryFree(pTbData); + taosArrayDestroy(tbData.aRowP); goto _end; } @@ -904,14 +926,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { - void* data = colDataGetData(pColData, j); + void* colData = colDataGetData(pColData, j); if (IS_STR_DATA_TYPE(pCol->type)) { - SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value + SValue sv = + (SValue){.nData = varDataLen(colData), .pData = varDataVal(colData)}; // address copy, no value SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } else { SValue sv; - memcpy(&sv.val, data, tDataTypes[pCol->type].bytes); + memcpy(&sv.val, colData, tDataTypes[pCol->type].bytes); SColVal cv = COL_VAL_VALUE(pCol->colId, pCol->type, sv); taosArrayPush(pVals, &cv); } @@ -919,15 +942,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } SRow* pRow = NULL; if ((terrno = tRowBuild(pVals, (STSchema*)pTSchema, &pRow)) < 0) { - tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE); + tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); goto _end; } ASSERT(pRow); - taosArrayPush(pTbData->aRowP, &pRow); + taosArrayPush(tbData.aRowP, &pRow); } - taosArrayPush(pReq->aSubmitTbData, pTbData); - taosMemoryFree(pTbData); + taosArrayPush(pReq->aSubmitTbData, &tbData); // encode int32_t len; -- GitLab