提交 f3a1f84a 编写于 作者: C Cary Xu

support SKVRow for 4096 columns

上级 0c7f46e4
......@@ -346,8 +346,8 @@ typedef struct {
int16_t sversion;
int32_t flen;
// for SKVRow
int16_t tCols;
int16_t nCols;
uint16_t tCols;
uint16_t nCols;
SColIdx* pColIdx;
uint16_t alloc;
uint16_t size;
......
......@@ -1664,7 +1664,7 @@ void tdResetMemRowBuilder(SMemRowBuilder* pBuilder) {
}
#define KvRowNullColRatio 0.75 // If nullable column ratio larger than 0.75, utilize SKVRow, otherwise SDataRow.
#define KvRowNColsThresh 4096 // default value: 32
#define KvRowNColsThresh 1 // default value: 32
static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen,
uint16_t* nColsNotNull) {
......@@ -1672,22 +1672,22 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32
if (nCols < KvRowNColsThresh) {
return SMEM_ROW_DATA;
}
int32_t dataRowLen = flen;
int32_t kvRowLen = 0;
int32_t dataRowLength = flen;
int32_t kvRowLength = 0;
uint16_t nColsNull = 0;
char* p = (char*)pData;
for (int i = 0; i < nCols; ++i) {
if (IS_VAR_DATA_TYPE(pSchema[i].type)) {
dataRowLen += varDataTLen(p);
dataRowLength += varDataTLen(p);
if (!isNull(p, pSchema[i].type)) {
kvRowLen += sizeof(SColIdx) + varDataTLen(p);
kvRowLength += (sizeof(SColIdx) + varDataTLen(p));
} else {
++nColsNull;
}
} else {
if (!isNull(p, pSchema[i].type)) {
kvRowLen += sizeof(SColIdx) + varDataTLen(p);
kvRowLength += (sizeof(SColIdx) + TYPE_BYTES[pSchema[i].type]);
} else {
++nColsNull;
}
......@@ -1697,9 +1697,9 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32
p += pSchema[i].bytes;
}
tscDebug("prop:nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", nColsNull, nCols, kvRowLen, dataRowLen);
tscInfo("prop:nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", nColsNull, nCols, kvRowLength, dataRowLength);
if (kvRowLen < dataRowLen) {
if (kvRowLength < dataRowLength) {
if (nColsNotNull) {
*nColsNotNull = nCols - nColsNull;
}
......@@ -1713,20 +1713,24 @@ SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf;
if(pBuilder->nCols <= 0){
return NULL;
}
uint16_t nColsNotNull = 0;
uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull);
tscDebug("prop:memType is %d", memRowType);
memRowType = SMEM_ROW_DATA;
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType);
if (memRowType == SMEM_ROW_DATA) {
int toffset = 0;
SDataRow trow = (SDataRow)memRowBody(memRow);
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion);
int toffset = 0;
p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) {
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
......@@ -1734,35 +1738,34 @@ SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder) {
p += pSchema[j].bytes;
}
pBuilder->buf = p;
} else {
} else if (memRowType == SMEM_ROW_KV) {
ASSERT(nColsNotNull < pBuilder->nCols);
uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull + pBuilder->size;
SKVRow row = (SKVRow)pBuilder->pDataBlock;
SKVRow kvRow = (SKVRow)memRowBody(memRow);
kvRowSetNCols(row, nColsNotNull);
kvRowSetLen(row, tlen);
kvRowSetNCols(kvRow, nColsNotNull);
kvRowSetLen(kvRow, tlen);
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
int toffset = 0;
p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) {
if(!isNull(p, pSchema[j].type)) {
tdAppendKvColVal(kvRow, p, pSchema[j].colId, pSchema[j].type, toffset);
toffset += sizeof(SColIdx);
}
p += pSchema[j].bytes;
}
pBuilder->buf = p;
} else {
ASSERT(0);
}
pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + memRowTLen(memRow); // next row
pBuilder->pSubmitBlk->dataLen += memRowTLen(memRow);
// int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
// if (tlen == 0) return NULL;
// tlen += TD_KV_ROW_HEAD_SIZE;
// SKVRow row = malloc(tlen);
// if (row == NULL) return NULL;
// kvRowSetNCols(row, pBuilder->nCols);
// kvRowSetLen(row, tlen);
// memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
// memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return NULL;
return memRow;
}
// Erase the empty space reserved for binary data
......
......@@ -404,10 +404,10 @@ typedef struct {
uint16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t))
#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define kvRowLen(r) (*(uint16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
......@@ -445,6 +445,33 @@ static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) {
return kvRowColVal(row, (SColIdx *)ret);
}
// offset here not include kvRow header length
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t offset) {
ASSERT(value != NULL);
int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE;
SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset);
char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row));
pColIdx->colId = colId;
pColIdx->offset = kvRowLen(row);
if (IS_VAR_DATA_TYPE(type)) {
memcpy(ptr, value, varDataTLen(value));
kvRowLen(row) += varDataTLen(value);
} else {
if (offset == 0) {
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]);
} else {
memcpy(ptr, value, TYPE_BYTES[type]);
}
kvRowLen(row) += TYPE_BYTES[type];
}
return 0;
}
// ----------------- K-V data row builder
typedef struct {
int16_t tCols;
......@@ -540,10 +567,10 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_HEAD_SIZE)
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
#define memRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE + sizeof(int16_t))) // for SKVRow
#define memRowSetNCols(r, n) memRowNCols(r) = (n) // for SKVRow
#define memRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE) // for SKVRow
#define memRowValues(r) POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE + sizeof(SColIdx) * memRowNCols(r)) // for SKVRow
// #define memRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE + sizeof(int16_t))) // for SKVRow
// #define memRowSetNCols(r, n) memRowNCols(r) = (n) // for SKVRow
// #define memRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE) // for SKVRow
// #define memRowValues(r) POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE + sizeof(SColIdx) * memRowNCols(r)) // for SKVRow
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(void *row, int8_t type, int32_t offset) {
......
......@@ -1377,12 +1377,12 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
return numOfRows + num;
}
#if 0
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SMemRow row,
int32_t numOfCols, STable* pTable, STSchema* pSchema) {
char* pData = NULL;
// the schema version info is embeded in SDataRow
// the schema version info is embedded in SDataRow, and use latest schema version for SKVRow
int32_t numOfRowCols = 0;
if (pSchema == NULL) {
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
......@@ -1477,7 +1477,190 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
i++;
}
}
#endif
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SMemRow row,
int32_t numOfCols, STable* pTable, STSchema* pSchema) {
char* pData = NULL;
// the schema version info is embedded in SDataRow, and use latest schema version for SKVRow
int32_t numOfRowCols = 0;
if (pSchema == NULL) {
pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
numOfRowCols = schemaNCols(pSchema);
} else {
numOfRowCols = schemaNCols(pSchema);
}
int32_t i = 0;
if (isDataRow(row)) {
int32_t j = 0;
while (i < numOfCols && j < numOfRowCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (pSchema->columns[j].colId < pColInfo->info.colId) {
j++;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value =
tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset);
switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t*)pData = *(uint8_t*)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t*)pData = *(uint16_t*)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t*)pData = *(uint32_t*)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t*)pData = *(uint64_t*)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
*(TSKEY*)pData = tdGetKey(*(TKEY*)value);
} else {
*(TSKEY*)pData = *(TSKEY*)value;
}
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
}
j++;
i++;
} else { // pColInfo->info.colId < pSchema->columns[j].colId, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
}
} else if (isKvRow(row)) {
SKVRow kvRow = memRowBody(row);
int32_t k = 0;
int32_t nKvRowCols = kvRowNCols(kvRow);
while (i < numOfCols && k < nKvRowCols) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
SColIdx* pColIdx = kvRowColIdxAt(kvRow, k);
if (pColIdx->colId < pColInfo->info.colId) {
++k;
continue;
}
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColIdx->colId == pColInfo->info.colId) {
STColumn* pSTColumn = tdGetColOfID(pSchema, pColIdx->colId);
if (pSTColumn != NULL) {
void* value = tdGetKvRowDataOfCol(row, pSTColumn->type, TD_MEM_ROW_HEAD_SIZE + pColIdx->offset);
switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t*)pData = *(uint8_t*)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t*)pData = *(uint16_t*)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t*)pData = *(uint32_t*)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t*)pData = *(uint64_t*)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
*(TSKEY*)pData = tdGetKey(*(TKEY*)value);
} else {
*(TSKEY*)pData = *(TSKEY*)value;
}
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
}
++k;
++i;
continue;
}
++k; // pSTColumn is NULL
}
// If (pColInfo->info.colId < pColIdx->colId) or pSTColumn is NULL, it is a NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
++i;
}
} else {
ASSERT(0);
}
while (i < numOfCols) { // the remain columns are all null data
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes;
} else {
pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes;
}
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
setNull(pData, pColInfo->info.type, pColInfo->info.bytes);
}
i++;
}
}
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols) {
if (numOfRows == 0 || ASCENDING_TRAVERSE(pQueryHandle->order)) {
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册