From 2e5ce3ebfe53d07703c10de033666c46ea6a464c Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Thu, 1 Jul 2021 06:28:22 +0000 Subject: [PATCH] adaption for cacheLast --- src/client/src/tscUtil.c | 2 +- src/common/inc/tdataformat.h | 44 ++++++++++++++--- src/common/src/tdataformat.c | 56 ++++------------------ src/inc/ttype.h | 2 +- src/tsdb/src/tsdbMain.c | 22 ++++----- src/tsdb/src/tsdbMemTable.c | 26 +++++++--- src/tsdb/src/tsdbRead.c | 92 +++++++++++++++++------------------- 7 files changed, 125 insertions(+), 119 deletions(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c05c5e3d3b..a688d34d41 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1667,7 +1667,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i // pBuilder->size = 0; // } -#define KvRowNColsThresh 1 // default value: 128 TODO: for test, restore to default value after test finished +#define KvRowNColsThresh 1 // default value: 1200 TODO: for test, restore to default value after test finished static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen, uint16_t* nColsNotNull) { diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index ca8b4d33da..c2460ec7da 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -280,13 +280,43 @@ typedef struct SDataCol { static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); -void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); -static const void *tdGetNullVal(int8_t type) { +FORCE_INLINE void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); +// value from timestamp should be TKEY here instead of TSKEY +FORCE_INLINE void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { + ASSERT(pCol != NULL && value != NULL); + + if (pCol->len == 0) { + if (isNull(value, pCol->type)) { + // all null value yet, just return + return; + } + + if (numOfRows > 0) { + // Find the first not null value, fill all previous values as NULL + dataColSetNEleNull(pCol, numOfRows, maxPoints); + } + } + + if (IS_VAR_DATA_TYPE(pCol->type)) { + // set offset + pCol->dataOff[numOfRows] = pCol->len; + // Copy data + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); + // Update the length + pCol->len += varDataTLen(value); + } else { + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); + pCol->len += pCol->bytes; + } +} + +static FORCE_INLINE const void *tdGetNullVal(int8_t type) { switch (type) { case TSDB_DATA_TYPE_BOOL: return &BoolNull; @@ -460,6 +490,10 @@ static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) { return kvRowColVal(row, (SColIdx *)ret); } +static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) { + return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ); +} + // offset here not include kvRow header length static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t offset) { ASSERT(value != NULL); @@ -582,15 +616,13 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) // NOTE: offset here including the header size -static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int8_t type, int32_t offset) { - return POINTER_SHIFT(row, offset); -} +static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); } // NOTE: offset here including the header size static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t offset) { if (isDataRow(row)) { return tdGetRowDataOfCol(row, type, offset); } else if (isKvRow(row)) { - return tdGetKvRowDataOfCol(row, type, offset); + return tdGetKvRowDataOfCol(row, offset); } else { ASSERT(0); } diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index 74f9893608..d346b5c463 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -240,7 +240,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize); } } - +#if 0 // value from timestamp should be TKEY here instead of TSKEY void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { ASSERT(pCol != NULL && value != NULL); @@ -270,6 +270,7 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP pCol->len += pCol->bytes; } } +#endif bool isNEleNull(SDataCol *pCol, int nEle) { for (int i = 0; i < nEle; i++) { @@ -278,7 +279,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) { return true; } -void dataColSetNullAt(SDataCol *pCol, int index) { +FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); @@ -475,38 +476,6 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols pCols->numOfRows++; } -// static void tdGetKVRowColInfo(const STSchema *pSchema, SColIdx *pColIdx, int nRowCols, STColumn *pSTColumn, -// int *nColMatched) { -// int nSchema = schemaNCols(pSchema); -// int iCol = 0; -// int iSchema = 0; -// int nColMatch = 0; -// SColIdx * pIdx = pColIdx; -// const STColumn *pColumn = NULL; - -// while (iCol < nRowCols && iSchema < nSchema) { -// pColumn = &pSchema->columns[iSchema]; -// if (pIdx->colId == pColumn->colId) { -// pSTColumn[nColMatch].colId = pIdx->colId; -// pSTColumn[nColMatch].type = pColumn->type; -// pSTColumn[nColMatch].bytes = pColumn->bytes; -// pSTColumn[nColMatch].offset = pIdx->offset; - -// pIdx += sizeof(SColIdx); - -// ++iCol; -// ++iSchema; -// ++nColMatch; -// } else if (pIdx->colId > pColumn->colId) { -// ++iSchema; -// } else { -// pIdx += sizeof(SColIdx); -// ++iCol; -// } -// } -// *nColMatched = nColMatch; -// } - static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols) { ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row)); @@ -523,33 +492,28 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo } } } else { - int nRowCols = kvRowNCols(row); - // int nRowColsMatched = 0; - // STColumn stColumn[nRowCols]; - // tdGetKVRowColInfo(pSchema, kvRowColIdx(row), nRowCols, stColumn, &nRowColsMatched); - // uInfo("prop:kvRow: nRowCols=%d, nRowColsMatched=%d, nSchemaCols=%d", nRowCols, nRowColsMatched, - // schemaNCols(pSchema)); + int nRowCols = kvRowNCols(row); while (dcol < pCols->numOfCols) { SDataCol *pDataCol = &(pCols->cols[dcol]); if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - dcol++; + ++dcol; continue; } SColIdx *colIdx = kvRowColIdxAt(row, rcol); if (colIdx->colId == pDataCol->colId) { - void *value = tdGetKvRowDataOfCol(row, pDataCol->type, colIdx->offset); + void *value = tdGetKvRowDataOfCol(row, colIdx->offset); dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); - dcol++; - rcol++; + ++dcol; + ++rcol; } else if (colIdx->colId < pDataCol->colId) { - rcol++; + ++rcol; } else { dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints); - dcol++; + ++dcol; } } } diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 80b8ddcd4e..b858d2a7f5 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -121,7 +121,7 @@ typedef struct tstr { #define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) < UINT32_MAX) #define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) < UINT64_MAX) -static FORCE_INLINE bool isNull(const char *val, int32_t type) { +FORCE_INLINE bool isNull(const char *val, int32_t type) { switch (type) { case TSDB_DATA_TYPE_BOOL: return *(uint8_t *)val == TSDB_DATA_BOOL_NULL; diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c0c65cecd5..b34308fd76 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -661,7 +661,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea err = -1; goto out; } - tdInitDataRow(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), pSchema); + + memRowSetType(row, SMEM_ROW_DATA); + tdInitDataRow(memRowBody(row), pSchema); // first load block index info if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { @@ -718,10 +720,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // OK,let's load row from backward to get not-null column for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; - tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type, - pCol->bytes, pCol->offset); + tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); //SDataCol *pDataCol = readh.pDCols[0]->cols + j; - void *value = tdGetMemRowDataOfCol(row, (int8_t)pCol->type, TD_MEM_ROW_HEAD_SIZE + pCol->offset); + void *value = tdGetRowDataOfCol(memRowBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); if (isNull(value, pCol->type)) { continue; } @@ -741,8 +742,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea // save row ts(in column 0) pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); - tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type, - pCol->bytes, pCol->offset); + tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); pLastCol->ts = memRowKey(row); pTable->restoreColumnNum += 1; @@ -779,18 +779,18 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, // Get the data in row STSchema *pSchema = tsdbGetTableSchema(pTable); - pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); + pTable->lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema)); if (pTable->lastRow == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - - tdInitDataRow(pTable->lastRow, pSchema); + memRowSetType(pTable->lastRow, SMEM_ROW_DATA); + tdInitDataRow(memRowBody(pTable->lastRow), pSchema); for (int icol = 0; icol < schemaNCols(pSchema); icol++) { STColumn *pCol = schemaColAt(pSchema, icol); SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol; - tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes, - pCol->offset); + tdAppendColVal(memRowBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, + pCol->bytes, pCol->offset); } return 0; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 7967462ffe..41e28ac909 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -933,12 +933,12 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro tSkipListPutBatch(pTableData->pData, rows, rowCounter); int64_t dsize = SL_SIZE(pTableData->pData) - osize; - if (pMemTable->keyFirst > dataRowKey(rows[0])) pMemTable->keyFirst = dataRowKey(rows[0]); - if (pMemTable->keyLast < dataRowKey(rows[rowCounter - 1])) pMemTable->keyLast = dataRowKey(rows[rowCounter - 1]); + if (pMemTable->keyFirst > memRowKey(rows[0])) pMemTable->keyFirst = memRowKey(rows[0]); + if (pMemTable->keyLast < memRowKey(rows[rowCounter - 1])) pMemTable->keyLast = memRowKey(rows[rowCounter - 1]); pMemTable->numOfRows += dsize; - if (pTableData->keyFirst > dataRowKey(rows[0])) pTableData->keyFirst = dataRowKey(rows[0]); - if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]); + if (pTableData->keyFirst > memRowKey(rows[0])) pTableData->keyFirst = memRowKey(rows[0]); + if (pTableData->keyLast < memRowKey(rows[rowCounter - 1])) pTableData->keyLast = memRowKey(rows[rowCounter - 1]); pTableData->numOfRows += dsize; // update table latest info @@ -1004,6 +1004,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro SDataCol *pLatestCols = pTable->lastCols; + bool isDataRow = isDataRow(row); + void *rowBody = memRowBody(row); for (int16_t j = 0; j < schemaNCols(pSchema); j++) { STColumn *pTCol = schemaColAt(pSchema, j); // ignore not exist colId @@ -1012,8 +1014,20 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro continue; } - void *value = tdGetMemRowDataOfCol(row, (int8_t)pTCol->type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset); - if (isNull(value, pTCol->type)) { + void *value = NULL; + + if (isDataRow) { + value = tdGetRowDataOfCol(rowBody, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); + } else { + // SKVRow + SColIdx *pColIdx = tdGetKVRowIdxOfCol(rowBody, pTCol->colId); + if(pColIdx) { +value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset); + } + + } + + if ((value == NULL) || isNull(value, pTCol->type)) { continue; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 27502f67f7..395598f527 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1548,56 +1548,52 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, } if (pColIdx->colId == pColInfo->info.colId) { - STColumn* pSTColumn = tdGetColOfID(pSchema, pColIdx->colId); - if (pSTColumn != NULL) { - // offset of pColIdx including the TD_KV_ROW_HEAD_SIZE - void* value = tdGetKvRowDataOfCol(kvRow, pSTColumn->type, pColIdx->offset); - switch (pColInfo->info.type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - memcpy(pData, value, varDataTLen(value)); - break; - case TSDB_DATA_TYPE_NULL: - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_UTINYINT: - *(uint8_t*)pData = *(uint8_t*)value; - break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: - *(uint16_t*)pData = *(uint16_t*)value; - break; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: - *(uint32_t*)pData = *(uint32_t*)value; - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - *(uint64_t*)pData = *(uint64_t*)value; - break; - case TSDB_DATA_TYPE_FLOAT: - SET_FLOAT_PTR(pData, value); - break; - case TSDB_DATA_TYPE_DOUBLE: - SET_DOUBLE_PTR(pData, value); - break; - case TSDB_DATA_TYPE_TIMESTAMP: - if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - *(TSKEY*)pData = tdGetKey(*(TKEY*)value); - } else { - *(TSKEY*)pData = *(TSKEY*)value; - } - break; - default: - memcpy(pData, value, pColInfo->info.bytes); - } - ++k; - ++i; - continue; + // offset of pColIdx for SKVRow including the TD_KV_ROW_HEAD_SIZE + void* value = tdGetKvRowDataOfCol(kvRow, pColIdx->offset); + switch (pColInfo->info.type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + memcpy(pData, value, varDataTLen(value)); + break; + case TSDB_DATA_TYPE_NULL: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t*)pData = *(uint8_t*)value; + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + *(uint16_t*)pData = *(uint16_t*)value; + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + *(uint32_t*)pData = *(uint32_t*)value; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t*)pData = *(uint64_t*)value; + break; + case TSDB_DATA_TYPE_FLOAT: + SET_FLOAT_PTR(pData, value); + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_PTR(pData, value); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + *(TSKEY*)pData = tdGetKey(*(TKEY*)value); + } else { + *(TSKEY*)pData = *(TSKEY*)value; + } + break; + default: + memcpy(pData, value, pColInfo->info.bytes); } - ++k; // pSTColumn is NULL + ++k; + ++i; + continue; } - // If (pColInfo->info.colId < pColIdx->colId) or pSTColumn is NULL, it is a NULL data + // If (pColInfo->info.colId < pColIdx->colId), it is NULL data if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { setVardataNull(pData, pColInfo->info.type); } else { -- GitLab