提交 2e5ce3eb 编写于 作者: L liuyq-617

adaption for cacheLast

上级 1c4d38d0
......@@ -1667,7 +1667,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
// pBuilder->size = 0;
// }
#define KvRowNColsThresh 1 // default value: 128 TODO: for test, restore to default value after test finished
#define KvRowNColsThresh 1 // default value: 1200 TODO: for test, restore to default value after test finished
static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen,
uint16_t* nColsNotNull) {
......
......@@ -280,13 +280,43 @@ typedef struct SDataCol {
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
static const void *tdGetNullVal(int8_t type) {
FORCE_INLINE void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
// value from timestamp should be TKEY here instead of TSKEY
FORCE_INLINE void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
if (pCol->len == 0) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return;
}
if (numOfRows > 0) {
// Find the first not null value, fill all previous values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset
pCol->dataOff[numOfRows] = pCol->len;
// Copy data
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Update the length
pCol->len += varDataTLen(value);
} else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
}
}
static FORCE_INLINE const void *tdGetNullVal(int8_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return &BoolNull;
......@@ -460,6 +490,10 @@ static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) {
return kvRowColVal(row, (SColIdx *)ret);
}
static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
}
// offset here not include kvRow header length
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t offset) {
ASSERT(value != NULL);
......@@ -582,15 +616,13 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int8_t type, int32_t offset) {
return POINTER_SHIFT(row, offset);
}
static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); }
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t offset) {
if (isDataRow(row)) {
return tdGetRowDataOfCol(row, type, offset);
} else if (isKvRow(row)) {
return tdGetKvRowDataOfCol(row, type, offset);
return tdGetKvRowDataOfCol(row, offset);
} else {
ASSERT(0);
}
......
......@@ -240,7 +240,7 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
*pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize);
}
}
#if 0
// value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
......@@ -270,6 +270,7 @@ void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxP
pCol->len += pCol->bytes;
}
}
#endif
bool isNEleNull(SDataCol *pCol, int nEle) {
for (int i = 0; i < nEle; i++) {
......@@ -278,7 +279,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
return true;
}
void dataColSetNullAt(SDataCol *pCol, int index) {
FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
......@@ -475,38 +476,6 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
pCols->numOfRows++;
}
// static void tdGetKVRowColInfo(const STSchema *pSchema, SColIdx *pColIdx, int nRowCols, STColumn *pSTColumn,
// int *nColMatched) {
// int nSchema = schemaNCols(pSchema);
// int iCol = 0;
// int iSchema = 0;
// int nColMatch = 0;
// SColIdx * pIdx = pColIdx;
// const STColumn *pColumn = NULL;
// while (iCol < nRowCols && iSchema < nSchema) {
// pColumn = &pSchema->columns[iSchema];
// if (pIdx->colId == pColumn->colId) {
// pSTColumn[nColMatch].colId = pIdx->colId;
// pSTColumn[nColMatch].type = pColumn->type;
// pSTColumn[nColMatch].bytes = pColumn->bytes;
// pSTColumn[nColMatch].offset = pIdx->offset;
// pIdx += sizeof(SColIdx);
// ++iCol;
// ++iSchema;
// ++nColMatch;
// } else if (pIdx->colId > pColumn->colId) {
// ++iSchema;
// } else {
// pIdx += sizeof(SColIdx);
// ++iCol;
// }
// }
// *nColMatched = nColMatch;
// }
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
......@@ -523,33 +492,28 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
}
}
} else {
int nRowCols = kvRowNCols(row);
// int nRowColsMatched = 0;
// STColumn stColumn[nRowCols];
// tdGetKVRowColInfo(pSchema, kvRowColIdx(row), nRowCols, stColumn, &nRowColsMatched);
// uInfo("prop:kvRow: nRowCols=%d, nRowColsMatched=%d, nSchemaCols=%d", nRowCols, nRowColsMatched,
// schemaNCols(pSchema));
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
++dcol;
continue;
}
SColIdx *colIdx = kvRowColIdxAt(row, rcol);
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, pDataCol->type, colIdx->offset);
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
rcol++;
++rcol;
} else {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
++dcol;
}
}
}
......
......@@ -121,7 +121,7 @@ typedef struct tstr {
#define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) < UINT32_MAX)
#define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) < UINT64_MAX)
static FORCE_INLINE bool isNull(const char *val, int32_t type) {
FORCE_INLINE bool isNull(const char *val, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return *(uint8_t *)val == TSDB_DATA_BOOL_NULL;
......
......@@ -661,7 +661,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
err = -1;
goto out;
}
tdInitDataRow(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), pSchema);
memRowSetType(row, SMEM_ROW_DATA);
tdInitDataRow(memRowBody(row), pSchema);
// first load block index info
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
......@@ -718,10 +720,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// OK,let's load row from backward to get not-null column
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type,
pCol->bytes, pCol->offset);
tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
void *value = tdGetMemRowDataOfCol(row, (int8_t)pCol->type, TD_MEM_ROW_HEAD_SIZE + pCol->offset);
void *value = tdGetRowDataOfCol(memRowBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
if (isNull(value, pCol->type)) {
continue;
}
......@@ -741,8 +742,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// save row ts(in column 0)
pDataCol = pReadh->pDCols[0]->cols + 0;
pCol = schemaColAt(pSchema, 0);
tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type,
pCol->bytes, pCol->offset);
tdAppendColVal(memRowBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset);
pLastCol->ts = memRowKey(row);
pTable->restoreColumnNum += 1;
......@@ -779,18 +779,18 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh,
// Get the data in row
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(dataRowMaxBytesFromSchema(pSchema));
pTable->lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tdInitDataRow(pTable->lastRow, pSchema);
memRowSetType(pTable->lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowBody(pTable->lastRow), pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
pCol->offset);
tdAppendColVal(memRowBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->bytes, pCol->offset);
}
return 0;
......
......@@ -933,12 +933,12 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
tSkipListPutBatch(pTableData->pData, rows, rowCounter);
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
if (pMemTable->keyFirst > dataRowKey(rows[0])) pMemTable->keyFirst = dataRowKey(rows[0]);
if (pMemTable->keyLast < dataRowKey(rows[rowCounter - 1])) pMemTable->keyLast = dataRowKey(rows[rowCounter - 1]);
if (pMemTable->keyFirst > memRowKey(rows[0])) pMemTable->keyFirst = memRowKey(rows[0]);
if (pMemTable->keyLast < memRowKey(rows[rowCounter - 1])) pMemTable->keyLast = memRowKey(rows[rowCounter - 1]);
pMemTable->numOfRows += dsize;
if (pTableData->keyFirst > dataRowKey(rows[0])) pTableData->keyFirst = dataRowKey(rows[0]);
if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]);
if (pTableData->keyFirst > memRowKey(rows[0])) pTableData->keyFirst = memRowKey(rows[0]);
if (pTableData->keyLast < memRowKey(rows[rowCounter - 1])) pTableData->keyLast = memRowKey(rows[rowCounter - 1]);
pTableData->numOfRows += dsize;
// update table latest info
......@@ -1004,6 +1004,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
SDataCol *pLatestCols = pTable->lastCols;
bool isDataRow = isDataRow(row);
void *rowBody = memRowBody(row);
for (int16_t j = 0; j < schemaNCols(pSchema); j++) {
STColumn *pTCol = schemaColAt(pSchema, j);
// ignore not exist colId
......@@ -1012,8 +1014,20 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
continue;
}
void *value = tdGetMemRowDataOfCol(row, (int8_t)pTCol->type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (isNull(value, pTCol->type)) {
void *value = NULL;
if (isDataRow) {
value = tdGetRowDataOfCol(rowBody, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset);
} else {
// SKVRow
SColIdx *pColIdx = tdGetKVRowIdxOfCol(rowBody, pTCol->colId);
if(pColIdx) {
value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset);
}
}
if ((value == NULL) || isNull(value, pTCol->type)) {
continue;
}
......
......@@ -1548,56 +1548,52 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
}
if (pColIdx->colId == pColInfo->info.colId) {
STColumn* pSTColumn = tdGetColOfID(pSchema, pColIdx->colId);
if (pSTColumn != NULL) {
// offset of pColIdx including the TD_KV_ROW_HEAD_SIZE
void* value = tdGetKvRowDataOfCol(kvRow, pSTColumn->type, pColIdx->offset);
switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t*)pData = *(uint8_t*)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t*)pData = *(uint16_t*)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t*)pData = *(uint32_t*)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t*)pData = *(uint64_t*)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
*(TSKEY*)pData = tdGetKey(*(TKEY*)value);
} else {
*(TSKEY*)pData = *(TSKEY*)value;
}
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
}
++k;
++i;
continue;
// offset of pColIdx for SKVRow including the TD_KV_ROW_HEAD_SIZE
void* value = tdGetKvRowDataOfCol(kvRow, pColIdx->offset);
switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
memcpy(pData, value, varDataTLen(value));
break;
case TSDB_DATA_TYPE_NULL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
*(uint8_t*)pData = *(uint8_t*)value;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
*(uint16_t*)pData = *(uint16_t*)value;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
*(uint32_t*)pData = *(uint32_t*)value;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
*(uint64_t*)pData = *(uint64_t*)value;
break;
case TSDB_DATA_TYPE_FLOAT:
SET_FLOAT_PTR(pData, value);
break;
case TSDB_DATA_TYPE_DOUBLE:
SET_DOUBLE_PTR(pData, value);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
*(TSKEY*)pData = tdGetKey(*(TKEY*)value);
} else {
*(TSKEY*)pData = *(TSKEY*)value;
}
break;
default:
memcpy(pData, value, pColInfo->info.bytes);
}
++k; // pSTColumn is NULL
++k;
++i;
continue;
}
// If (pColInfo->info.colId < pColIdx->colId) or pSTColumn is NULL, it is a NULL data
// If (pColInfo->info.colId < pColIdx->colId), it is NULL data
if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pData, pColInfo->info.type);
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册