diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 73f7c2bcc4115ba784ae98b2b74e46a74f8cb263..51005e3c30b0a00fce5a8ddb10735549f07c5261 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -709,6 +709,7 @@ void tmqSendHbReq(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { + taosMemoryFree(param); return; } int64_t consumerId = tmq->consumerId; @@ -939,10 +940,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { return NULL; } - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - if (pTmq->hbBgEnable) { + int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); + *pRefId = pTmq->refId; pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer); } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index f96afe6fba1b081d9511a3fdd6503be73fa1bdc3..69e6cdce9f25159247ffc4b331d7a7ef701e58fa 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -115,7 +115,6 @@ typedef struct { typedef struct { SMqDataRsp dataRsp; - SMqRspHead rspHead; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; SRpcHandleInfo pInfo; } STqPushEntry; @@ -183,6 +182,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore); // tqSink void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data); // tqOffset char* tqOffsetBuildFName(const char* path, int32_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aaa57d49edaa30a423239722890b373fb9a9e26e..5e1cc150637f9acd2d6ff9e60717911cd4f252f6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -192,7 +192,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { return -1; } - memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead)); + memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); @@ -215,7 +215,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s", - TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); return 0; } @@ -560,9 +560,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); dataRsp.withTbName = 0; memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); - pPushEntry->rspHead.consumerId = consumerId; - pPushEntry->rspHead.epoch = reqEpoch; - pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + pPushEntry->dataRsp.head.consumerId = consumerId; + pPushEntry->dataRsp.head.epoch = reqEpoch; + pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey, TD_VID(pTq->pVnode)); @@ -924,7 +924,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->smaSink.smaSink = smaHandleRes; } else if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqTableSink; + pTask->tbSink.tbSinkFunc = tqTableSink1; ASSERT(pTask->tbSink.pSchemaWrapper); ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 521d12fdab18f44f63832622eb7b28934761e35c..76bfa72caa70e90c78be6ce63b1cbadd09df75ec 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -284,6 +284,212 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem return ret; } +void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { + const SArray* pBlocks = (const SArray*)data; + SVnode* pVnode = (SVnode*)vnode; + int64_t suid = pTask->tbSink.stbUid; + char* stbFullName = pTask->tbSink.stbFullName; + STSchema* pTSchema = pTask->tbSink.pTSchema; + SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper; + + int32_t blockSz = taosArrayGetSize(pBlocks); + bool createTb = true; + + SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return; + } + + tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); + for (int32_t i = 0; i < blockSz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + if (pDataBlock->info.type == STREAM_DELETE_RESULT) { + SBatchDeleteReq deleteReq = {0}; + deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); + deleteReq.suid = suid; + tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq); + + int32_t len; + int32_t code; + tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code); + if (code < 0) { + // + ASSERT(0); + } + SEncoder encoder; + void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead)); + void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, len); + tEncodeSBatchDeleteReq(&encoder, &deleteReq); + tEncoderClear(&encoder); + taosArrayDestroy(deleteReq.deleteReqs); + + ((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId; + + SRpcMsg msg = { + .msgType = TDMT_VND_BATCH_DEL, + .pCont = serializedDeleteReq, + .contLen = len + sizeof(SMsgHead), + }; + if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(serializedDeleteReq); + tqDebug("failed to put delete req into write-queue since %s", terrstr()); + } + } else { + SVCreateTbReq createTbReq = {0}; + + // set const + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + // set super table name + SName name = {0}; + tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName); + + // set tag content + taosArrayClear(tagArray); + STagVal tagVal = { + .cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .i64 = (int64_t)pDataBlock->info.groupId, + }; + taosArrayPush(tagArray, &tagVal); + createTbReq.ctb.tagNum = taosArrayGetSize(tagArray); + + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(tagArray); + tdDestroySVCreateTbReq(&createTbReq); + return; + } + createTbReq.ctb.pTag = (uint8_t*)pTag; + + // set tag name + SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN); + char tagNameStr[TSDB_COL_NAME_LEN] = {0}; + strcpy(tagNameStr, "group_id"); + taosArrayPush(tagName, tagNameStr); + createTbReq.ctb.tagName = tagName; + + // set table name + if (pDataBlock->info.parTbName[0]) { + createTbReq.name = strdup(pDataBlock->info.parTbName); + } else { + createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + } + + int32_t schemaLen; + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) { + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + return; + } + + // save schema str + void* schemaStr = taosMemoryMalloc(schemaLen); + if (schemaStr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + return; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, schemaStr, schemaLen); + code = tEncodeSVCreateTbReq(&encoder, &createTbReq); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + tEncoderClear(&encoder); + taosMemoryFree(schemaStr); + return; + } + tEncoderClear(&encoder); + tdDestroySVCreateTbReq(&createTbReq); + + int32_t cap = sizeof(SSubmitReq); + + int32_t rows = pDataBlock->info.rows; + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + + cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; + + SSubmitReq* ret = rpcMallocCont(cap); + ret->header.vgId = pVnode->config.vgId; + ret->length = sizeof(SSubmitReq); + ret->numOfBlocks = htonl(1); + + SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); + + blkHead->numOfRows = htonl(pDataBlock->info.rows); + blkHead->sversion = htonl(pTSchema->version); + blkHead->suid = htobe64(suid); + // uid is assigned by vnode + blkHead->uid = 0; + blkHead->schemaLen = 0; + + tqDebug("tq sink, convert block %d, rows: %d", i, rows); + + int32_t dataLen = 0; + void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); + STSRow* rowData = blkSchema; + if (createTb) { + memcpy(blkSchema, schemaStr, schemaLen); + blkHead->schemaLen = htonl(schemaLen); + rowData = POINTER_SHIFT(blkSchema, schemaLen); + } + + taosMemoryFree(schemaStr); + + for (int32_t j = 0; j < rows; j++) { + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTSchema->version); + tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); + tdSRowResetBuf(&rb, rowData); + + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn* pColumn = &pTSchema->columns[k]; + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); + if (colDataIsNull_s(pColData, j)) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); + } else { + void* colData = colDataGetData(pColData, j); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k); + } + } + tdSRowEnd(&rb); + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + dataLen += rowLen; + } + blkHead->dataLen = htonl(dataLen); + + ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + ret->length = htonl(ret->length); + + SRpcMsg msg = { + .msgType = TDMT_VND_SUBMIT, + .pCont = ret, + .contLen = ntohl(ret->length), + }; + + if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { + rpcFreeCont(ret); + tqDebug("failed to put into write-queue since %s", terrstr()); + } + } + } + taosArrayDestroy(tagArray); +} + void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pRes = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; diff --git a/tests/system-test/7-tmq/tmqAutoCreateTbl.py b/tests/system-test/7-tmq/tmqAutoCreateTbl.py index a613f11267ef769ef9162543a5818a0881f23abd..de95fef148aa29316e0a0a967ddf7e2acc787905 100644 --- a/tests/system-test/7-tmq/tmqAutoCreateTbl.py +++ b/tests/system-test/7-tmq/tmqAutoCreateTbl.py @@ -52,7 +52,7 @@ class TDTestCase: paraDict['rowsPerTbl'] = self.rowsPerTbl tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1,wal_retention_period=-1) tdLog.info("create stb") tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) # tdLog.info("create ctb")