diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index af46535c948a2aa0be0282f580b68ade9955c8fa..709462a7449dade291fc06166b7fc08e80f96cb4 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -234,9 +234,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, - const char* stbFullName, int32_t vgId); - static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock->info.numOfCols) + blockDataGetSize(pBlock); } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4d1f5a34b185538c6eddfd92c242cb403c3dd397..b4ece426b398bff07515343b58ccdcc38ca52b58 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1728,173 +1728,6 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { return rname.childTableName; } -SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, - const char* stbFullName, int32_t vgId) { - SSubmitReq* ret = NULL; - SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); - if (!tagArray) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - // cal size - int32_t cap = sizeof(SSubmitReq); - int32_t sz = taosArrayGetSize(pBlocks); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - int32_t rows = pDataBlock->info.rows; - // TODO min - int32_t rowSize = pDataBlock->info.rowSize; - int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); - int32_t schemaLen = 0; - - if (createTb) { - SVCreateTbReq createTbReq = {0}; - char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); - createTbReq.name = cname; - createTbReq.flags = 0; - createTbReq.type = TSDB_CHILD_TABLE; - createTbReq.ctb.suid = suid; - - STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .pData = (uint8_t*)&pDataBlock->info.groupId, - .nData = sizeof(uint64_t)}; - STag* pTag = NULL; - taosArrayClear(tagArray); - taosArrayPush(tagArray, &tagVal); - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - return NULL; - } - createTbReq.ctb.pTag = (uint8_t*)pTag; - - int32_t code; - tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); - - tdDestroySVCreateTbReq(&createTbReq); - if (code < 0) { - taosArrayDestroy(tagArray); - return NULL; - } - } - - cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; - } - - // assign data - // TODO - ret = taosMemoryCalloc(1, cap + 46); - ret = POINTER_SHIFT(ret, 46); - ret->header.vgId = vgId; - ret->version = htonl(1); - ret->length = sizeof(SSubmitReq); - ret->numOfBlocks = htonl(sz); - - void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq)); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); - - SSubmitBlk* blkHead = submitBlk; - blkHead->numOfRows = htons(pDataBlock->info.rows); - blkHead->sversion = htonl(pTSchema->version); - // TODO - blkHead->suid = htobe64(suid); - // uid is assigned by vnode - blkHead->uid = 0; - - int32_t rows = pDataBlock->info.rows; - /*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/ - /*blkHead->dataLen = htonl(rows * maxLen);*/ - blkHead->dataLen = 0; - - void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); - - int32_t schemaLen = 0; - if (createTb) { - SVCreateTbReq createTbReq = {0}; - char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); - createTbReq.name = cname; - createTbReq.flags = 0; - createTbReq.type = TSDB_CHILD_TABLE; - createTbReq.ctb.suid = suid; - - STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, - .type = TSDB_DATA_TYPE_UBIGINT, - .pData = (uint8_t*)&pDataBlock->info.groupId, - .nData = sizeof(uint64_t)}; - taosArrayClear(tagArray); - taosArrayPush(tagArray, &tagVal); - STag* pTag = NULL; - tTagNew(tagArray, 1, false, &pTag); - if (pTag == NULL) { - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - taosMemoryFreeClear(ret); - return NULL; - } - createTbReq.ctb.pTag = (uint8_t*)pTag; - - int32_t code; - tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); - if (code < 0) { - tdDestroySVCreateTbReq(&createTbReq); - taosArrayDestroy(tagArray); - taosMemoryFreeClear(ret); - return NULL; - } - - SEncoder encoder = {0}; - tEncoderInit(&encoder, blockData, schemaLen); - code = tEncodeSVCreateTbReq(&encoder, &createTbReq); - tEncoderClear(&encoder); - tdDestroySVCreateTbReq(&createTbReq); - - if (code < 0) { - taosArrayDestroy(tagArray); - taosMemoryFreeClear(ret); - return NULL; - } - } - blkHead->schemaLen = htonl(schemaLen); - - STSRow* rowData = POINTER_SHIFT(blockData, schemaLen); - - for (int32_t j = 0; j < rows; j++) { - SRowBuilder rb = {0}; - tdSRowInit(&rb, pTSchema->version); - tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); - tdSRowResetBuf(&rb, rowData); - - for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn* pColumn = &pTSchema->columns[k]; - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); - if (colDataIsNull_s(pColData, j)) { - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k); - } else { - void* data = colDataGetData(pColData, j); - tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); - } - } - int32_t rowLen = TD_ROW_LEN(rowData); - rowData = POINTER_SHIFT(rowData, rowLen); - blkHead->dataLen += rowLen; - } - int32_t dataLen = blkHead->dataLen; - blkHead->dataLen = htonl(dataLen); - - ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; - blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen); - /*submitBlk = blkHead;*/ - } - - ret->length = htonl(ret->length); - taosArrayDestroy(tagArray); - return ret; -} - void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { // todo extract method diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 22685c1e194d35dce4b079b5edfd6ad9c2ca351c..9f34ae39c06fe284d4c2402fea04b703f70adf4c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -47,7 +47,7 @@ void tqCleanUp() { } STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { - STQ* pTq = taosMemoryMalloc(sizeof(STQ)); + STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 2db49d7cf53f284aeac1bc44cb3db787a50aff94..41444b72881f493039e48e35a2884cf52b7d5724 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -17,21 +17,33 @@ #include "tq.h" struct STqOffsetStore { + char* fname; STQ* pTq; SHashObj* pHash; // SHashObj }; +static char* buildFileName(const char* path) { + int32_t len = strlen(path); + char* fname = taosMemoryCalloc(1, len + 20); + snprintf(fname, len + 20, "%s/offset", path); + return fname; +} + STqOffsetStore* tqOffsetOpen(STQ* pTq) { STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore)); if (pStore == NULL) { return NULL; } + pStore->pTq = pTq; + pTq->pOffsetStore = pStore; + pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); if (pStore->pHash == NULL) { if (pStore->pHash) taosHashCleanup(pStore->pHash); return NULL; } - TdFilePtr pFile = taosOpenFile(pStore->pTq->path, TD_FILE_READ); + char* fname = buildFileName(pStore->pTq->path); + TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); if (pFile != NULL) { STqOffsetHead head = {0}; int64_t code; @@ -65,6 +77,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { } taosCloseFile(&pFile); + taosMemoryFree(fname); } return pStore; } @@ -85,7 +98,8 @@ int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { // open file // TODO file name should be with a version - TdFilePtr pFile = taosOpenFile(pStore->pTq->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + char* fname = buildFileName(pStore->pTq->path); + TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pFile == NULL) { ASSERT(0); return -1; @@ -124,5 +138,6 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { } // close and rename file taosCloseFile(&pFile); + taosMemoryFree(fname); return 0; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5c0bf971fb8702ffbb73ed92feb8c97d1f4032d1..391a00844004e97a2dda7b57304706b66d831f75 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -15,6 +15,175 @@ #include "tq.h" +static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, + const char* stbFullName, int32_t vgId); + +static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, + const char* stbFullName, int32_t vgId) { + SSubmitReq* ret = NULL; + SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); + if (!tagArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + // cal size + int32_t cap = sizeof(SSubmitReq); + int32_t sz = taosArrayGetSize(pBlocks); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + int32_t rows = pDataBlock->info.rows; + // TODO min + int32_t rowSize = pDataBlock->info.rowSize; + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + int32_t schemaLen = 0; + + if (createTb) { + SVCreateTbReq createTbReq = {0}; + char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + createTbReq.name = cname; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .pData = (uint8_t*)&pDataBlock->info.groupId, + .nData = sizeof(uint64_t)}; + STag* pTag = NULL; + taosArrayClear(tagArray); + taosArrayPush(tagArray, &tagVal); + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + return NULL; + } + createTbReq.ctb.pTag = (uint8_t*)pTag; + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + + tdDestroySVCreateTbReq(&createTbReq); + if (code < 0) { + taosArrayDestroy(tagArray); + return NULL; + } + } + + cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen; + } + + // assign data + // TODO + ret = rpcMallocCont(cap); + ret->header.vgId = vgId; + ret->version = htonl(1); + ret->length = sizeof(SSubmitReq); + ret->numOfBlocks = htonl(sz); + + void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq)); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + + SSubmitBlk* blkHead = submitBlk; + blkHead->numOfRows = htons(pDataBlock->info.rows); + blkHead->sversion = htonl(pTSchema->version); + // TODO + blkHead->suid = htobe64(suid); + // uid is assigned by vnode + blkHead->uid = 0; + + int32_t rows = pDataBlock->info.rows; + /*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/ + /*blkHead->dataLen = htonl(rows * maxLen);*/ + blkHead->dataLen = 0; + + void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); + + int32_t schemaLen = 0; + if (createTb) { + SVCreateTbReq createTbReq = {0}; + char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId); + createTbReq.name = cname; + createTbReq.flags = 0; + createTbReq.type = TSDB_CHILD_TABLE; + createTbReq.ctb.suid = suid; + + STagVal tagVal = {.cid = pDataBlock->info.numOfCols + 1, + .type = TSDB_DATA_TYPE_UBIGINT, + .pData = (uint8_t*)&pDataBlock->info.groupId, + .nData = sizeof(uint64_t)}; + taosArrayClear(tagArray); + taosArrayPush(tagArray, &tagVal); + STag* pTag = NULL; + tTagNew(tagArray, 1, false, &pTag); + if (pTag == NULL) { + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + taosMemoryFreeClear(ret); + return NULL; + } + createTbReq.ctb.pTag = (uint8_t*)pTag; + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code); + if (code < 0) { + tdDestroySVCreateTbReq(&createTbReq); + taosArrayDestroy(tagArray); + taosMemoryFreeClear(ret); + return NULL; + } + + SEncoder encoder = {0}; + tEncoderInit(&encoder, blockData, schemaLen); + code = tEncodeSVCreateTbReq(&encoder, &createTbReq); + tEncoderClear(&encoder); + tdDestroySVCreateTbReq(&createTbReq); + + if (code < 0) { + taosArrayDestroy(tagArray); + taosMemoryFreeClear(ret); + return NULL; + } + } + blkHead->schemaLen = htonl(schemaLen); + + STSRow* rowData = POINTER_SHIFT(blockData, schemaLen); + + for (int32_t j = 0; j < rows; j++) { + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTSchema->version); + tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); + tdSRowResetBuf(&rb, rowData); + + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn* pColumn = &pTSchema->columns[k]; + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); + if (colDataIsNull_s(pColData, j)) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k); + } else { + void* data = colDataGetData(pColData, j); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); + } + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + blkHead->dataLen += rowLen; + } + int32_t dataLen = blkHead->dataLen; + blkHead->dataLen = htonl(dataLen); + + ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen); + /*submitBlk = blkHead;*/ + } + + ret->length = htonl(ret->length); + taosArrayDestroy(tagArray); + return ret; +} + void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pRes = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode;