From f3a1f84a7442f367a14135dfc560cc6d2d909dd4 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 30 Jun 2021 10:39:42 +0800 Subject: [PATCH] support SKVRow for 4096 columns --- src/client/inc/tscUtil.h | 4 +- src/client/src/tscUtil.c | 65 ++++++------ src/common/inc/tdataformat.h | 39 ++++++-- src/tsdb/src/tsdbRead.c | 187 ++++++++++++++++++++++++++++++++++- 4 files changed, 254 insertions(+), 41 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 9d9eacb332..ca00d03343 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -346,8 +346,8 @@ typedef struct { int16_t sversion; int32_t flen; // for SKVRow - int16_t tCols; - int16_t nCols; + uint16_t tCols; + uint16_t nCols; SColIdx* pColIdx; uint16_t alloc; uint16_t size; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index c7241860bc..e6444fb4a4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1664,7 +1664,7 @@ void tdResetMemRowBuilder(SMemRowBuilder* pBuilder) { } #define KvRowNullColRatio 0.75 // If nullable column ratio larger than 0.75, utilize SKVRow, otherwise SDataRow. -#define KvRowNColsThresh 4096 // default value: 32 +#define KvRowNColsThresh 1 // default value: 32 static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen, uint16_t* nColsNotNull) { @@ -1672,22 +1672,22 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32 if (nCols < KvRowNColsThresh) { return SMEM_ROW_DATA; } - int32_t dataRowLen = flen; - int32_t kvRowLen = 0; + int32_t dataRowLength = flen; + int32_t kvRowLength = 0; uint16_t nColsNull = 0; char* p = (char*)pData; for (int i = 0; i < nCols; ++i) { if (IS_VAR_DATA_TYPE(pSchema[i].type)) { - dataRowLen += varDataTLen(p); + dataRowLength += varDataTLen(p); if (!isNull(p, pSchema[i].type)) { - kvRowLen += sizeof(SColIdx) + varDataTLen(p); + kvRowLength += (sizeof(SColIdx) + varDataTLen(p)); } else { ++nColsNull; } } else { if (!isNull(p, pSchema[i].type)) { - kvRowLen += sizeof(SColIdx) + varDataTLen(p); + kvRowLength += (sizeof(SColIdx) + TYPE_BYTES[pSchema[i].type]); } else { ++nColsNull; } @@ -1697,9 +1697,9 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32 p += pSchema[i].bytes; } - tscDebug("prop:nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", nColsNull, nCols, kvRowLen, dataRowLen); + tscInfo("prop:nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", nColsNull, nCols, kvRowLength, dataRowLength); - if (kvRowLen < dataRowLen) { + if (kvRowLength < dataRowLength) { if (nColsNotNull) { *nColsNotNull = nCols - nColsNull; } @@ -1713,20 +1713,24 @@ SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder) { SSchema* pSchema = pBuilder->pSchema; char* p = (char*)pBuilder->buf; + if(pBuilder->nCols <= 0){ + return NULL; + } + uint16_t nColsNotNull = 0; uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull); tscDebug("prop:memType is %d", memRowType); - memRowType = SMEM_ROW_DATA; + SMemRow* memRow = (SMemRow)pBuilder->pDataBlock; memRowSetType(memRow, memRowType); if (memRowType == SMEM_ROW_DATA) { - int toffset = 0; SDataRow trow = (SDataRow)memRowBody(memRow); dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); dataRowSetVersion(trow, pBuilder->sversion); + int toffset = 0; p = (char*)pBuilder->buf; for (int32_t j = 0; j < pBuilder->nCols; ++j) { tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); @@ -1734,35 +1738,34 @@ SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder) { p += pSchema[j].bytes; } pBuilder->buf = p; - } else { + } else if (memRowType == SMEM_ROW_KV) { + ASSERT(nColsNotNull < pBuilder->nCols); + uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull + pBuilder->size; - SKVRow row = (SKVRow)pBuilder->pDataBlock; + SKVRow kvRow = (SKVRow)memRowBody(memRow); - kvRowSetNCols(row, nColsNotNull); - kvRowSetLen(row, tlen); + kvRowSetNCols(kvRow, nColsNotNull); + kvRowSetLen(kvRow, tlen); - memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); - memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); + int toffset = 0; + p = (char*)pBuilder->buf; + for (int32_t j = 0; j < pBuilder->nCols; ++j) { + if(!isNull(p, pSchema[j].type)) { + tdAppendKvColVal(kvRow, p, pSchema[j].colId, pSchema[j].type, toffset); + toffset += sizeof(SColIdx); + } + p += pSchema[j].bytes; + } + pBuilder->buf = p; + + } else { + ASSERT(0); } pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + memRowTLen(memRow); // next row pBuilder->pSubmitBlk->dataLen += memRowTLen(memRow); - // int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; - // if (tlen == 0) return NULL; - - // tlen += TD_KV_ROW_HEAD_SIZE; - - // SKVRow row = malloc(tlen); - // if (row == NULL) return NULL; - - // kvRowSetNCols(row, pBuilder->nCols); - // kvRowSetLen(row, tlen); - - // memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); - // memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); - - return NULL; + return memRow; } // Erase the empty space reserved for binary data diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 294d8b2cf8..bdb1d78cdb 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -404,10 +404,10 @@ typedef struct { uint16_t offset; } SColIdx; -#define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t)) +#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) #define kvRowLen(r) (*(uint16_t *)(r)) -#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) +#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t))) #define kvRowSetLen(r, len) kvRowLen(r) = (len) #define kvRowSetNCols(r, n) kvRowNCols(r) = (n) #define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE) @@ -445,6 +445,33 @@ static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) { return kvRowColVal(row, (SColIdx *)ret); } +// offset here not include kvRow header length +static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t offset) { + ASSERT(value != NULL); + int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE; + SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset); + char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row)); + + pColIdx->colId = colId; + pColIdx->offset = kvRowLen(row); + + if (IS_VAR_DATA_TYPE(type)) { + memcpy(ptr, value, varDataTLen(value)); + kvRowLen(row) += varDataTLen(value); + } else { + if (offset == 0) { + ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP); + TKEY tvalue = tdGetTKEY(*(TSKEY *)value); + memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]); + } else { + memcpy(ptr, value, TYPE_BYTES[type]); + } + kvRowLen(row) += TYPE_BYTES[type]; + } + + return 0; +} + // ----------------- K-V data row builder typedef struct { int16_t tCols; @@ -540,10 +567,10 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, #define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_HEAD_SIZE) #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) -#define memRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE + sizeof(int16_t))) // for SKVRow -#define memRowSetNCols(r, n) memRowNCols(r) = (n) // for SKVRow -#define memRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE) // for SKVRow -#define memRowValues(r) POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE + sizeof(SColIdx) * memRowNCols(r)) // for SKVRow +// #define memRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE + sizeof(int16_t))) // for SKVRow +// #define memRowSetNCols(r, n) memRowNCols(r) = (n) // for SKVRow +// #define memRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE) // for SKVRow +// #define memRowValues(r) POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE + sizeof(SColIdx) * memRowNCols(r)) // for SKVRow // NOTE: offset here including the header size static FORCE_INLINE void *tdGetRowDataOfCol(void *row, int8_t type, int32_t offset) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 32232e6815..fffe5b28ef 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1377,12 +1377,12 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity return numOfRows + num; } - +#if 0 static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SMemRow row, int32_t numOfCols, STable* pTable, STSchema* pSchema) { char* pData = NULL; - // the schema version info is embeded in SDataRow + // the schema version info is embedded in SDataRow, and use latest schema version for SKVRow int32_t numOfRowCols = 0; if (pSchema == NULL) { pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); @@ -1477,7 +1477,190 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, i++; } } +#endif + +static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SMemRow row, + int32_t numOfCols, STable* pTable, STSchema* pSchema) { + char* pData = NULL; + + // the schema version info is embedded in SDataRow, and use latest schema version for SKVRow + int32_t numOfRowCols = 0; + if (pSchema == NULL) { + pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row)); + numOfRowCols = schemaNCols(pSchema); + } else { + numOfRowCols = schemaNCols(pSchema); + } + int32_t i = 0; + + if (isDataRow(row)) { + int32_t j = 0; + while (i < numOfCols && j < numOfRowCols) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (pSchema->columns[j].colId < pColInfo->info.colId) { + j++; + continue; + } + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + + if (pSchema->columns[j].colId == pColInfo->info.colId) { + void* value = + tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset); + switch (pColInfo->info.type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + memcpy(pData, value, varDataTLen(value)); + break; + case TSDB_DATA_TYPE_NULL: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t*)pData = *(uint8_t*)value; + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + *(uint16_t*)pData = *(uint16_t*)value; + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + *(uint32_t*)pData = *(uint32_t*)value; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t*)pData = *(uint64_t*)value; + break; + case TSDB_DATA_TYPE_FLOAT: + SET_FLOAT_PTR(pData, value); + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_PTR(pData, value); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + *(TSKEY*)pData = tdGetKey(*(TKEY*)value); + } else { + *(TSKEY*)pData = *(TSKEY*)value; + } + break; + default: + memcpy(pData, value, pColInfo->info.bytes); + } + + j++; + i++; + } else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pData, pColInfo->info.type); + } else { + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); + } + i++; + } + } + } else if (isKvRow(row)) { + SKVRow kvRow = memRowBody(row); + int32_t k = 0; + int32_t nKvRowCols = kvRowNCols(kvRow); + + while (i < numOfCols && k < nKvRowCols) { + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + SColIdx* pColIdx = kvRowColIdxAt(kvRow, k); + + if (pColIdx->colId < pColInfo->info.colId) { + ++k; + continue; + } + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + + if (pColIdx->colId == pColInfo->info.colId) { + STColumn* pSTColumn = tdGetColOfID(pSchema, pColIdx->colId); + if (pSTColumn != NULL) { + void* value = tdGetKvRowDataOfCol(row, pSTColumn->type, TD_MEM_ROW_HEAD_SIZE + pColIdx->offset); + switch (pColInfo->info.type) { + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + memcpy(pData, value, varDataTLen(value)); + break; + case TSDB_DATA_TYPE_NULL: + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT: + *(uint8_t*)pData = *(uint8_t*)value; + break; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + *(uint16_t*)pData = *(uint16_t*)value; + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + *(uint32_t*)pData = *(uint32_t*)value; + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + *(uint64_t*)pData = *(uint64_t*)value; + break; + case TSDB_DATA_TYPE_FLOAT: + SET_FLOAT_PTR(pData, value); + break; + case TSDB_DATA_TYPE_DOUBLE: + SET_DOUBLE_PTR(pData, value); + break; + case TSDB_DATA_TYPE_TIMESTAMP: + if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + *(TSKEY*)pData = tdGetKey(*(TKEY*)value); + } else { + *(TSKEY*)pData = *(TSKEY*)value; + } + break; + default: + memcpy(pData, value, pColInfo->info.bytes); + } + ++k; + ++i; + continue; + } + ++k; // pSTColumn is NULL + } + // If (pColInfo->info.colId < pColIdx->colId) or pSTColumn is NULL, it is a NULL data + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pData, pColInfo->info.type); + } else { + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); + } + ++i; + } + } else { + ASSERT(0); + } + + while (i < numOfCols) { // the remain columns are all null data + SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i); + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; + } else { + pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; + } + + if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { + setVardataNull(pData, pColInfo->info.type); + } else { + setNull(pData, pColInfo->info.type, pColInfo->info.bytes); + } + + i++; + } +} static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) { if (numOfRows == 0 || ASCENDING_TRAVERSE(pQueryHandle->order)) { return; -- GitLab