From fe77479a565cb415118aea681893ed7181bfc870 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 1 Jul 2021 16:01:33 +0800 Subject: [PATCH] CQ adaption to SMemRow --- src/client/inc/tscUtil.h | 35 ----------------------------------- src/client/src/tscUtil.c | 22 ++-------------------- src/common/inc/tdataformat.h | 35 ++++++++++++++++++----------------- src/cq/src/cqMain.c | 8 +++++--- src/query/src/qAggMain.c | 4 ++-- src/tsdb/src/tsdbCommit.c | 1 - src/tsdb/src/tsdbMemTable.c | 5 ++--- src/tsdb/src/tsdbRead.c | 2 +- 8 files changed, 30 insertions(+), 82 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 3c5cb4f23a..93684fa02c 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -348,10 +348,7 @@ typedef struct { int16_t sversion; int32_t flen; // for SKVRow - uint16_t tCols; uint16_t nCols; - SColIdx* pColIdx; - uint16_t alloc; uint16_t size; void* buf; @@ -359,40 +356,8 @@ typedef struct { SSubmitBlk* pSubmitBlk; } SMemRowBuilder; -// int tdInitMemRowBuilder(SMemRowBuilder* pBuilder); -// void tdDestroyMemRowBuilder(SMemRowBuilder* pBuilder); -// void tdResetMemRowBuilder(SMemRowBuilder* pBuilder); SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder); -// static FORCE_INLINE int tdAddColToMemRow(SMemRowBuilder* pBuilder, int16_t colId, int8_t type, void* value) { -// // TODO - -// if (pBuilder->nCols >= pBuilder->tCols) { -// pBuilder->tCols *= 2; -// pBuilder->pColIdx = (SColIdx*)realloc((void*)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); -// if (pBuilder->pColIdx == NULL) return -1; -// } - -// pBuilder->pColIdx[pBuilder->nCols].colId = colId; -// pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size; - -// pBuilder->nCols++; - -// int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; -// if (tlen > pBuilder->alloc - pBuilder->size) { -// while (tlen > pBuilder->alloc - pBuilder->size) { -// pBuilder->alloc *= 2; -// } -// pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc); -// if (pBuilder->buf == NULL) return -1; -// } - -// memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen); -// pBuilder->size += tlen; - -// return 0; -// } - #ifdef __cplusplus } #endif diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a688d34d41..d51ff3d3ed 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1716,6 +1716,7 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32 SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { SSchema* pSchema = pBuilder->pSchema; char* p = (char*)pBuilder->buf; + int toffset = 0; if(pBuilder->nCols <= 0){ return NULL; @@ -1723,7 +1724,6 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { uint16_t nColsNotNull = 0; uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull); - tscDebug("prop:memType is %d", memRowType); SMemRow* memRow = (SMemRow)pBuilder->pDataBlock; memRowSetType(memRow, memRowType); @@ -1733,7 +1733,6 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); dataRowSetVersion(trow, pBuilder->sversion); - int toffset = 0; p = (char*)pBuilder->buf; for (int32_t j = 0; j < pBuilder->nCols; ++j) { tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); @@ -1748,7 +1747,6 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { kvRowSetLen(kvRow, tlen); kvRowSetNCols(kvRow, nColsNotNull); - int toffset = 0; p = (char*)pBuilder->buf; for (int32_t j = 0; j < pBuilder->nCols; ++j) { if(!isNull(p, pSchema[j].type)) { @@ -1821,22 +1819,6 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo mRowBuilder.size = 0; for (int32_t i = 0; i < numOfRows; ++i) { -#if 0 - SDataRow trow = (SDataRow)pDataBlock; // generate each SDataRow one by one - dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen)); - dataRowSetVersion(trow, pTableMeta->sversion); - - // scan each column data and generate the data row - int toffset = 0; - for (int32_t j = 0; j < tinfo.numOfColumns; j++) { - tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); - toffset += TYPE_BYTES[pSchema[j].type]; - p += pSchema[j].bytes; - } - - pDataBlock = (char*)pDataBlock + dataRowLen(trow); // next SDataRow - pBlock->dataLen += dataRowLen(trow); // SSubmitBlk data length -#endif tdGenMemRowFromBuilder(&mRowBuilder); } @@ -1849,7 +1831,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo static int32_t getRowExpandSize(STableMeta* pTableMeta) { int32_t result = TD_DATA_ROW_HEAD_SIZE; - int32_t columns = tscGetNumOfColumns(pTableMeta); + int32_t columns = tscGetNumOfColumns(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta); for(int32_t i = 0; i < columns; i++) { if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index c2460ec7da..4ab44a7918 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -73,7 +73,7 @@ typedef struct { int8_t type; // Column type int16_t colId; // column ID uint16_t bytes; // column bytes - uint16_t offset; // point offset in SDataRow/SKVRow after the header part. + uint16_t offset; // point offset in SDataRow after the header part. } STColumn; #define colType(col) ((col)->type) @@ -184,20 +184,6 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { return 0; } } -// ----------------- Sequential Data row structure - -/* A sequential data row, the format is like below: - * |<--------------------+--------------------------- len ---------------------------------->| - * |<-- Head -->|<--------- flen -------------->| | - * +---------------------+---------------------------------+---------------------------------+ - * | uint16_t | int16_t | | | - * +----------+----------+---------------------------------+---------------------------------+ - * | len | sversion | First part | Second part | - * +----------+----------+---------------------------------+---------------------------------+ - * - * NOTE: timestamp in this row structure is TKEY instead of TSKEY - */ -typedef void *SDataRow; /* A memory data row, the format is like below: *|---------+---------------------+--------------------------- len ---------------------------------->| *|<- type->|<-- Head -->|<--------- flen -------------->| | @@ -211,10 +197,25 @@ typedef void *SDataRow; */ typedef void *SMemRow; +// ----------------- Data row structure + +/* A data row, the format is like below: + * |<--------------------+--------------------------- len ---------------------------------->| + * |<-- Head -->|<--------- flen -------------->| | + * +---------------------+---------------------------------+---------------------------------+ + * | uint16_t | int16_t | | | + * +----------+----------+---------------------------------+---------------------------------+ + * | len | sversion | First part | Second part | + * +----------+----------+---------------------------------+---------------------------------+ + * + * NOTE: timestamp in this row structure is TKEY instead of TSKEY + */ +typedef void *SDataRow; + #define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) #define dataRowLen(r) (*(uint16_t *)(r)) -#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) +#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r))) #define dataRowKey(r) tdGetKey(dataRowTKey(r)) @@ -254,7 +255,7 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t } // NOTE: offset here including the header size -static FORCE_INLINE void *tdGetRowDataOfCol(void *row, int8_t type, int32_t offset) { +static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow *row, int8_t type, int32_t offset) { if (IS_VAR_DATA_TYPE(type)) { return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset)); } else { diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index a460b1d619..6fe530dce5 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -476,7 +476,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr); - int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize; + int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_HEAD_SIZE + pObj->rowSize; char *buffer = calloc(size, 1); SWalHead *pHead = (SWalHead *)buffer; @@ -484,7 +484,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); SMemRow trow = (SMemRow)pBlk->data; - tdInitDataRow(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), pSchema); + SDataRow dataRow = (SDataRow)memRowBody(trow); + memRowSetType(trow, SMEM_ROW_DATA); + tdInitDataRow(dataRow, pSchema); for (int32_t i = 0; i < pSchema->numOfCols; i++) { STColumn *c = pSchema->columns + i; @@ -500,7 +502,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { memcpy((char *)val + sizeof(VarDataLenT), buf, len); varDataLen(val) = len; } - tdAppendColVal(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), val, c->type, c->bytes, c->offset); + tdAppendColVal(dataRow, val, c->type, c->bytes, c->offset); } pBlk->dataLen = htonl(memRowTLen(trow)); pBlk->schemaLen = 0; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 676e5b6ce6..ef1408ab28 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -4002,7 +4002,7 @@ void blockInfo_func(SQLFunctionCtx* pCtx) { int32_t len = *(int32_t*) pCtx->pInput; blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist); - pDist->rowSize = (int16_t) pCtx->param[0].i64; + pDist->rowSize = (uint16_t)pCtx->param[0].i64; memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len); @@ -4149,7 +4149,7 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); - pDist->rowSize = (int16_t)pCtx->param[0].i64; + pDist->rowSize = (uint16_t)pCtx->param[0].i64; generateBlockDistResult(pDist, pCtx->pOutput); // cannot set the numOfIteratedElems again since it is set during previous iteration diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index b566f2095b..5700b87d5e 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -920,7 +920,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo SDataCol * pDataCol = pDataCols->cols + ncol; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; - // if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it if (isAllRowOfColNull(pDataCol)) { // all data to commit are NULL, just ignore it continue; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 41e28ac909..4314514105 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -1021,10 +1021,9 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro } else { // SKVRow SColIdx *pColIdx = tdGetKVRowIdxOfCol(rowBody, pTCol->colId); - if(pColIdx) { -value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset); + if (pColIdx) { + value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset); } - } if ((value == NULL) || isNull(value, pTCol->type)) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 395598f527..195e9f8584 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -145,7 +145,7 @@ typedef struct STableGroupSupporter { static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); -static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); +static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); -- GitLab