From 524bb214e758eccb1219e6ec4ca52774cdc33c43 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 10 Apr 2023 19:18:56 +0800 Subject: [PATCH] delete invalid code --- source/dnode/vnode/src/tq/tqSink.c | 251 ----------------------------- 1 file changed, 251 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 93132293b1..aaf3dc3936 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -71,257 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl return 0; } -void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { - const SArray* pBlocks = (const SArray*)data; - SVnode* pVnode = (SVnode*)vnode; - int64_t suid = pTask->tbSink.stbUid; - char* stbFullName = pTask->tbSink.stbFullName; - STSchema* pTSchema = pTask->tbSink.pTSchema; - SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper; - - int32_t blockSz = taosArrayGetSize(pBlocks); - - SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return; - } - - tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); - for (int32_t i = 0; i < blockSz; i++) { - bool createTb = true; - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - SBatchDeleteReq deleteReq = {0}; - deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); - deleteReq.suid = suid; - tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); - if (taosArrayGetSize(deleteReq.deleteReqs) == 0) { - taosArrayDestroy(deleteReq.deleteReqs); - continue; - } - - int32_t len; - int32_t code; - tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return; - } - SEncoder encoder; - void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); - void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, len); - tEncodeSBatchDeleteReq(&encoder, &deleteReq); - tEncoderClear(&encoder); - taosArrayDestroy(deleteReq.deleteReqs); - - ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; - - SRpcMsg msg = { - .msgType = TDMT_VND_BATCH_DEL, - .pCont = serializedDeleteReq, - .contLen = len + sizeof(SMsgHead), - }; - if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - tqDebug("failed to put delete req into write-queue since %s", terrstr()); - } - } else { - char* ctbName = NULL; - // set child table name - if (pDataBlock->info.parTbName[0]) { - ctbName = taosStrdup(pDataBlock->info.parTbName); - } else { - ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); - } - - int32_t schemaLen = 0; - void* schemaStr = NULL; - - int64_t uid = 0; - SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); - if (metaGetTableEntryByName(&mr, ctbName) < 0) { - metaReaderClear(&mr); - tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); - - SVCreateTbReq createTbReq = {0}; - - // set const - createTbReq.flags = 0; - createTbReq.type = TSDB_CHILD_TABLE; - createTbReq.ctb.suid = suid; - - // set super table name - SName name = {0}; - tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - createTbReq.ctb.stbName = taosStrdup((char*)tNameGetTableName(&name)); // taosStrdup(stbFullName); - createTbReq.name = ctbName; - ctbName = NULL; - - // set tag content - taosArrayClear(tagArray); - STagVal tagVal = { - .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .i64 = (int64_t)pDataBlock->info.id.groupId, - }; - taosArrayPush(tagArray, &tagVal); - createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); - - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(tagArray); - tdDestroySVCreateTbReq(&createTbReq); - return; - } - createTbReq.ctb.pTag = (uint8_t*)pTag; - - // set tag name - SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); - char tagNameStr[TSDB_COL_NAME_LEN] = {0}; - strcpy(tagNameStr, "group_id"); - taosArrayPush(tagName, tagNameStr); - createTbReq.ctb.tagName = tagName; - - int32_t code; - tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); - if (code < 0) { - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - return; - } - - // set schema str - schemaStr = taosMemoryMalloc(schemaLen); - if (schemaStr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - return; - } - - SEncoder encoder = {0}; - tEncoderInit(&encoder, schemaStr, schemaLen); - code = tEncodeSVCreateTbReq(&encoder, &createTbReq); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - tEncoderClear(&encoder); - taosMemoryFree(schemaStr); - return; - } - tEncoderClear(&encoder); - tdDestroySVCreateTbReq(&createTbReq); - } else { - if (mr.me.type != TSDB_CHILD_TABLE) { - tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, - mr.me.type); - metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; - } - if (mr.me.ctbEntry.suid != suid) { - tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %" PRId64 - ", actual suid %" PRId64 "", - TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); - metaReaderClear(&mr); - taosMemoryFree(ctbName); - continue; - } - - createTb = false; - uid = mr.me.uid; - metaReaderClear(&mr); - - tqDebug("vgId:%d, stream write, table %s, uid %" PRId64 " already exist, skip create", TD_VID(pVnode), ctbName, - uid); - - taosMemoryFreeClear(ctbName); - } - - int32_t cap = sizeof(SSubmitReq); - - int32_t rows = pDataBlock->info.rows; - int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); - - cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; - - SSubmitReq* pSubmit = rpcMallocCont(cap); - pSubmit->header.vgId = pVnode->config.vgId; - pSubmit->length = sizeof(SSubmitReq); - pSubmit->numOfBlocks = htonl(1); - - SSubmitBlk* blkHead = POINTER_SHIFT(pSubmit, sizeof(SSubmitReq)); - - blkHead->numOfRows = htonl(pDataBlock->info.rows); - blkHead->sversion = htonl(pTSchema->version); - blkHead->suid = htobe64(suid); - // uid is assigned by vnode - blkHead->uid = 0; - blkHead->schemaLen = 0; - - tqDebug("tq sink pipe1, convert block2 %d, rows: %d", i, rows); - - int32_t dataLen = 0; - void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); - STSRow* rowData = blkSchema; - if (createTb) { - memcpy(blkSchema, schemaStr, schemaLen); - blkHead->schemaLen = htonl(schemaLen); - rowData = POINTER_SHIFT(blkSchema, schemaLen); - } else { - blkHead->uid = htobe64(uid); - } - - taosMemoryFreeClear(schemaStr); - - for (int32_t j = 0; j < rows; j++) { - SRowBuilder rb = {0}; - tdSRowInit(&rb, pTSchema->version); - tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); - tdSRowResetBuf(&rb, rowData); - - for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn* pColumn = &pTSchema->columns[k]; - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); - if (colDataIsNull_s(pColData, j)) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); - } else { - void* colData = colDataGetData(pColData, j); - if (k == 0) { - tqDebug("tq sink pipe1, row %d ts %" PRId64, j, *(int64_t*)colData); - } - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k); - } - } - tdSRowEnd(&rb); - int32_t rowLen = TD_ROW_LEN(rowData); - rowData = POINTER_SHIFT(rowData, rowLen); - dataLen += rowLen; - } - blkHead->dataLen = htonl(dataLen); - - pSubmit->length += sizeof(SSubmitBlk) + schemaLen + dataLen; - pSubmit->length = htonl(pSubmit->length); - - SRpcMsg msg = { - .msgType = TDMT_VND_SUBMIT, - .pCont = pSubmit, - .contLen = ntohl(pSubmit->length), - }; - - if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { - tqDebug("failed to put into write-queue since %s", terrstr()); - } - } - } - taosArrayDestroy(tagArray); -} - static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { int32_t ret = 0; -- GitLab