From a5706ea7e5c35ca4fc9a528730a344ded32b8577 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 5 May 2022 18:57:09 +0800 Subject: [PATCH] feat(stream): insert data --- example/src/tstream.c | 7 +- include/common/tdatablock.h | 25 ++--- include/common/tdataformat.h | 15 ++- include/common/tmsg.h | 4 +- include/common/trow.h | 38 ++++---- include/libs/stream/tstream.h | 4 +- source/common/src/tdatablock.c | 102 +++++++++++++++++---- source/common/src/tdataformat.c | 47 +++++++++- source/common/src/tmsg.c | 19 ++++ source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndOffset.h | 2 + source/dnode/mnode/impl/inc/mndSubscribe.h | 2 + source/dnode/mnode/impl/src/mndDb.c | 8 +- source/dnode/mnode/impl/src/mndOffset.c | 33 +++++++ source/dnode/mnode/impl/src/mndScheduler.c | 4 + source/dnode/vnode/src/tq/tq.c | 6 ++ source/libs/stream/src/tstream.c | 14 ++- 17 files changed, 261 insertions(+), 70 deletions(-) diff --git a/example/src/tstream.c b/example/src/tstream.c index 77a002499e..5bd833213d 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -81,9 +81,10 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = taos_query( - pConn, - "create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)"); + pRes = + taos_query(pConn, + "create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k " + "from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 71e2e4fba3..a597123cd7 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -54,8 +54,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) -#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) -#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) +#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) +#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) #define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) @@ -63,16 +63,15 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); #define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes)) // SColumnInfoData, rowNumber -#define colDataGetData(p1_, r_) \ - ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) \ - : colDataGetNumData(p1_, r_)) +#define colDataGetData(p1_, r_) \ + ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_)) static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { - if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON){ - if(colDataIsNull_var(pColumnInfoData, row)){ + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) { + if (colDataIsNull_var(pColumnInfoData, row)) { return true; } - char *data = colDataGetVarData(pColumnInfoData, row); + char* data = colDataGetVarData(pColumnInfoData, row); return (*data == TSDB_DATA_TYPE_NULL); } @@ -80,7 +79,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, return false; } - if (pColumnInfoData->info.type== TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) { + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) { return colDataIsNull_var(pColumnInfoData, row); } else { if (pColumnInfoData->nullbitmap == NULL) { @@ -132,7 +131,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { for (int32_t i = start; i < start + nRows; ++i) { - colDataSetNull_var(pColumnInfoData,i); // it is a null value of VAR type. + colDataSetNull_var(pColumnInfoData, i); // it is a null value of VAR type. } } else { for (int32_t i = start; i < start + nRows; ++i) { @@ -225,6 +224,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema); + static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); } @@ -238,10 +239,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32 static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { - int32_t* actualLen = (int32_t*) data; + int32_t* actualLen = (int32_t*)data; data += sizeof(int32_t); - uint64_t* groupId = (uint64_t*) data; + uint64_t* groupId = (uint64_t*)data; data += sizeof(uint64_t); int32_t* colSizes = (int32_t*)data; diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index a0f4351bbf..a4c693c2e2 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -61,11 +61,11 @@ extern "C" { // ----------------- TSDB COLUMN DEFINITION #pragma pack(push, 1) typedef struct { - col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) - int32_t type : 8; // column type - int32_t bytes : 24; // column bytes (0~16M) - int32_t sma : 8; // block SMA: 0, no SMA, 1, sum/min/max, 2, ... - int32_t offset : 24; // point offset in STpRow after the header part. + col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) + int8_t type; // column type + int8_t sma; // block SMA: 0, no SMA, 1, sum/min/max, 2, ... + int32_t bytes; // column bytes (0~16M) + int32_t offset; // point offset in STpRow after the header part. } STColumn; #pragma pack(pop) @@ -387,8 +387,6 @@ typedef struct SDataCol { TSKEY ts; // only used in last NULL column } SDataCol; - - #define isAllRowsNull(pCol) ((pCol)->len == 0) #define isAllRowsNone(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } @@ -482,7 +480,8 @@ void tdResetDataCols(SDataCols *pCols); int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); -int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, TDRowVerT maxVer); +int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, + TDRowVerT maxVer); // ----------------- K-V data row structure /* |<-------------------------------------- len -------------------------------------------->| diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ea605090f9..b85b8ceb7a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -245,6 +245,8 @@ int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); +// for debug +int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); // TODO: KEEP one suite of iterator API finally. // 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts @@ -2137,7 +2139,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; - pSW->pSchema = (SSchema*)tCoderMalloc(pDecoder, sizeof(SSchema) * pSW->nCols); + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; for (int32_t i = 0; i < pSW->nCols; i++) { if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; diff --git a/include/common/trow.h b/include/common/trow.h index 02f84b372e..8b1d612433 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -622,7 +622,6 @@ static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols return TSDB_CODE_SUCCESS; } - /** * @brief To judge row type: STpRow/SKvRow * @@ -758,7 +757,6 @@ static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) { return TSDB_CODE_SUCCESS; } - /** * @brief 由调用方管理存储空间的分配及释放,一次输入多个参数 * @@ -1250,16 +1248,16 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in } /** - * @brief - * - * @param pRow - * @param colId - * @param colType - * @param flen - * @param offset + * @brief + * + * @param pRow + * @param colId + * @param colType + * @param flen + * @param offset * @param colIdx start from 0 - * @param pVal - * @return FORCE_INLINE + * @param pVal + * @return FORCE_INLINE */ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { @@ -1273,14 +1271,14 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t } /** - * @brief - * - * @param pRow - * @param colId - * @param offset + * @brief + * + * @param pRow + * @param colId + * @param offset * @param colIdx start from 0 - * @param pVal - * @return FORCE_INLINE + * @param pVal + * @return FORCE_INLINE */ static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { @@ -1397,14 +1395,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) { } } -static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) { +static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) { STSRowIter iter = {0}; tdSTSRowIterInit(&iter, pSchema); tdSTSRowIterReset(&iter, row); printf("%s >>>", tag); for (int i = 0; i < pSchema->numOfCols; ++i) { STColumn *stCol = pSchema->columns + i; - SCellVal sVal = { 255, NULL}; + SCellVal sVal = {255, NULL}; if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) { break; } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2e5574511b..42180833df 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -70,8 +70,10 @@ typedef struct { } STaskDispatcherShuffle; typedef struct { - int8_t reserved; + int8_t reserved; + SSchemaWrapper* pSchemaWrapper; // not applicable to encoder and decoder + STSchema* pTSchema; SHashObj* pHash; // groupId to tbuid } STaskSinkTb; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4a6fef4626..84db728ce4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -194,17 +194,17 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c int32_t i = 0; uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)]; - int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1); - while (i < len) { // size limit of pSource->nullbitmap + int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1); + while (i < len) { // size limit of pSource->nullbitmap if (i >= 1) { - start[i - 1] |= (p[i] >> remindBits); //copy remind bits + start[i - 1] |= (p[i] >> remindBits); // copy remind bits } - if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap + if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap return; } - start[i] |= (p[i] << shiftBits); //copy shift bits + start[i] |= (p[i] << shiftBits); // copy shift bits i += 1; } } @@ -354,7 +354,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd int32_t numOfCols = pDest->info.numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { int32_t mapIndex = i; - if(pIndexMap) { + if (pIndexMap) { mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); } SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); @@ -451,7 +451,6 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd // all fit in *stopIndex = numOfRows - 1; return TSDB_CODE_SUCCESS; - } SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { @@ -556,7 +555,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { if (IS_VAR_DATA_TYPE(pCol->info.type)) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); - char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small + char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -938,8 +937,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); - uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, - p3 - p2, p4 - p3, rows); + uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 + ", rows:%d\n", + p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); destroyTupleIndex(index); return TSDB_CODE_SUCCESS; @@ -1176,7 +1176,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) { } SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { - if(pDataBlock == NULL){ + if (pDataBlock == NULL) { return NULL; } @@ -1187,7 +1187,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; - pBlock->info.rowSize = pDataBlock->info.rows; + pBlock->info.rowSize = pDataBlock->info.rows; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; @@ -1217,7 +1217,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); + return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock)); } void colDataDestroy(SColumnInfoData* pColData) { @@ -1234,14 +1234,14 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { int32_t len = BitmapLen(total); int32_t newLen = BitmapLen(total - n); - if (n%8 == 0) { - memmove(nullBitmap, nullBitmap + n/8, newLen); + if (n % 8 == 0) { + memmove(nullBitmap, nullBitmap + n / 8, newLen); } else { int32_t tail = n % 8; int32_t i = 0; - uint8_t* p = (uint8_t*) nullBitmap; - while(i < len) { + uint8_t* p = (uint8_t*)nullBitmap; + while (i < len) { uint8_t v = p[i]; p[i] = 0; @@ -1268,7 +1268,7 @@ static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_ } } -int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) { +int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) { if (n == 0) { return TSDB_CODE_SUCCESS; } @@ -1276,7 +1276,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) { if (pBlock->info.rows <= n) { blockDataCleanup(pBlock); } else { - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows); } @@ -1462,3 +1462,67 @@ void blockDebugShowData(const SArray* dataBlocks) { } } +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { + SSubmitReq* ret = NULL; + + // cal size + int32_t cap = sizeof(SSubmitReq); + int32_t sz = taosArrayGetSize(pBlocks); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + int32_t rows = pDataBlock->info.rows; + // TODO min + int32_t rowSize = pDataBlock->info.rowSize; + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + cap += sizeof(SSubmitBlk) + rows * maxLen; + } + + // assign data + ret = taosMemoryCalloc(1, cap); + ret->version = htonl(1); + ret->length = htonl(cap - sizeof(SSubmitReq)); + ret->numOfBlocks = htonl(sz); + + void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq)); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + + SSubmitBlk* blkHead = submitBlk; + blkHead->numOfRows = htons(pDataBlock->info.rows); + blkHead->schemaLen = 0; + blkHead->sversion = htonl(pTSchema->version); + // TODO + blkHead->suid = 0; + blkHead->uid = htobe64(pDataBlock->info.uid); + + int32_t rows = pDataBlock->info.rows; + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + /*blkHead->dataLen = htonl(rows * maxLen);*/ + blkHead->dataLen = 0; + + void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); + STSRow* rowData = blockData; + + for (int32_t j = 0; j < pDataBlock->info.rows; j++) { + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTSchema->version); + tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); + tdSRowResetBuf(&rb, rowData); + + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn* pColumn = &pTSchema->columns[k]; + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); + void* data = colDataGetData(pColData, j); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + blkHead->dataLen += rowLen; + } + int32_t len = blkHead->dataLen; + blkHead->dataLen = htonl(len); + blkHead = POINTER_SHIFT(blkHead, len); + } + + return ret; +} diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 2ed08ac81f..f5fa1c6491 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "tdataformat.h" #include "tcoding.h" +#include "tdatablock.h" #include "tlog.h" static void dataColSetNEleNull(SDataCol *pCol, int nEle); @@ -128,6 +129,50 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) { return buf; } +#if 0 +int32_t tEncodeSTColumn(SCoder *pEncoder, const STColumn *pCol) { + if (tEncodeI16(pEncoder, pCol->colId) < 0) return -1; + if (tEncodeI8(pEncoder, pCol->type) < 0) return -1; + if (tEncodeI8(pEncoder, pCol->sma) < 0) return -1; + if (tEncodeI32(pEncoder, pCol->bytes) < 0) return -1; + if (tEncodeI32(pEncoder, pCol->offset) < 0) return -1; + return pEncoder->pos; +} + +int32_t tDecodeSTColumn(SCoder *pDecoder, STColumn *pCol) { + if (tDecodeI16(pDecoder, &pCol->colId) < 0) return -1; + if (tDecodeI8(pDecoder, &pCol->type) < 0) return -1; + if (tDecodeI8(pDecoder, &pCol->sma) < 0) return -1; + if (tDecodeI32(pDecoder, &pCol->bytes) < 0) return -1; + if (tDecodeI32(pDecoder, &pCol->offset) < 0) return -1; + return 0; +} + +int32_t tEncodeSchema(SCoder *pEncoder, const STSchema *pSchema) { + if (tEncodeI32(pEncoder, pSchema->numOfCols) < 0) return -1; + if (tEncodeI16(pEncoder, pSchema->version) < 0) return -1; + if (tEncodeU16(pEncoder, pSchema->flen) < 0) return -1; + if (tEncodeI32(pEncoder, pSchema->vlen) < 0) return -1; + if (tEncodeI32(pEncoder, pSchema->tlen) < 0) return -1; + + for (int32_t i = 0; i < schemaNCols(pSchema); i++) { + const STColumn *pCol = schemaColAt(pSchema, i); + if (tEncodeSTColumn(pEncoder, pCol) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSchema(SCoder *pDecoder, STSchema *pSchema) { + if (tDecodeI32(pDecoder, &pSchema->numOfCols) < 0) return -1; + if (tDecodeI16(pDecoder, &pSchema->version) < 0) return -1; + if (tDecodeU16(pDecoder, &pSchema->flen) < 0) return -1; + if (tDecodeI32(pDecoder, &pSchema->vlen) < 0) return -1; + if (tDecodeI32(pDecoder, &pSchema->tlen) < 0) return -1; + + return 0; +} +#endif + int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) { if (pBuilder == NULL) return -1; @@ -908,4 +953,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch taosArrayDestroy(stashRow); return buffer; } -#endif \ No newline at end of file +#endif diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 874e09b403..fc993511ab 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -174,6 +174,25 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { } } +int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) { + SSubmitMsgIter msgIter = {0}; + if (tInitSubmitMsgIterEx(pReq, &msgIter) < 0) return -1; + while (true) { + SSubmitBlk *pBlock = NULL; + if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + SSubmitBlkIter blkIter = {0}; + tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter); + STSRowIter rowIter = {0}; + tdSTSRowIterInit(&rowIter, pTschema); + STSRow *row; + while ((row = tGetSubmitBlkNextEx(&blkIter)) != NULL) { + tdSRowPrint(row, pTschema, "stream"); + } + } + return 0; +} + int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 263fd0bad5..456a370db6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -415,6 +415,7 @@ typedef struct { typedef struct { char key[TSDB_PARTITION_KEY_LEN]; + int64_t dbUid; int64_t offset; } SMqOffsetObj; diff --git a/source/dnode/mnode/impl/inc/mndOffset.h b/source/dnode/mnode/impl/inc/mndOffset.h index 556bfe6a53..b468496165 100644 --- a/source/dnode/mnode/impl/inc/mndOffset.h +++ b/source/dnode/mnode/impl/inc/mndOffset.h @@ -37,6 +37,8 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName); } +int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 7915006f27..5ca672e8dd 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -31,6 +31,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); +int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 442bfb5c8a..347e959d3c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -17,9 +17,12 @@ #include "mndDb.h" #include "mndAuth.h" #include "mndDnode.h" +#include "mndOffset.h" #include "mndShow.h" #include "mndSma.h" #include "mndStb.h" +#include "mndSubscribe.h" +#include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -1027,6 +1030,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) { if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; + /*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ + /*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ + /*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; int32_t rspLen = 0; @@ -1387,7 +1393,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in bool sysDb) { int32_t cols = 0; - int32_t bytes = pShow->pMeta->pSchemas[cols].bytes; + int32_t bytes = pShow->pMeta->pSchemas[cols].bytes; char *buf = taosMemoryMalloc(bytes); const char *name = mndGetDbStr(pDb->name); if (name != NULL) { diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index f075348704..e0ba4076f5 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -231,3 +231,36 @@ static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) { + SSdbRaw *pCommitRaw = mndOffsetActionEncode(pOffset); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + +int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqOffsetObj *pOffset = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset); + if (pIter == NULL) break; + + if (pOffset->dbUid != pDb->uid) { + sdbRelease(pSdb, pOffset); + continue; + } + + if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) { + goto END; + } + } + + code = 0; +END: + return code; +} diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b760dd6aa8..9274d35b8b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -204,6 +204,8 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + ASSERT(pTask->tbSink.pSchemaWrapper); } // dispatch @@ -242,6 +244,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } // // dispatch @@ -316,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } #endif } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 411d5c451c..261b4aa8ae 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -926,6 +926,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { pTask->ahandle = pTq->pVnode; if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaHandle = smaHandleRes; + } else if (pTask->sinkType == TASK_SINK__TABLE) { + ASSERT(pTask->tbSink.pSchemaWrapper); + ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); + pTask->tbSink.pTSchema = + tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols); + ASSERT(pTask->tbSink.pTSchema); } taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index cf3cd162aa..b8198364c2 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,8 +152,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - // - blockDebugShowData(pRes); + /*blockDebugShowData(pRes);*/ + ASSERT(pTask->tbSink.pTSchema); + SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema); + tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); // @@ -274,7 +276,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; + /*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/ + if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { @@ -318,7 +321,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; + /*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/ + pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->tbSink.pSchemaWrapper == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { -- GitLab