提交 0c7f46e4 编写于 作者: C Cary Xu

utilize SMemRow

上级 280f8062
...@@ -339,6 +339,58 @@ char* strdup_throw(const char* str); ...@@ -339,6 +339,58 @@ char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
typedef struct {
// for SDataRow
STSchema* pTSchema;
SSchema* pSchema;
int16_t sversion;
int32_t flen;
// for SKVRow
int16_t tCols;
int16_t nCols;
SColIdx* pColIdx;
uint16_t alloc;
uint16_t size;
void* buf;
void* pDataBlock;
SSubmitBlk* pSubmitBlk;
} SMemRowBuilder;
int tdInitMemRowBuilder(SMemRowBuilder* pBuilder);
void tdDestroyMemRowBuilder(SMemRowBuilder* pBuilder);
void tdResetMemRowBuilder(SMemRowBuilder* pBuilder);
SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder);
static FORCE_INLINE int tdAddColToMemRow(SMemRowBuilder* pBuilder, int16_t colId, int8_t type, void* value) {
// TODO
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
pBuilder->pColIdx = (SColIdx*)realloc((void*)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
}
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
pBuilder->nCols++;
int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc);
if (pBuilder->buf == NULL) return -1;
}
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
pBuilder->size += tlen;
return 0;
}
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "tscUtil.h" #include "tscUtil.h"
#include "hash.h" #include "hash.h"
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "texpr.h" #include "texpr.h"
...@@ -1636,6 +1636,136 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -1636,6 +1636,136 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int tdInitMemRowBuilder(SMemRowBuilder* pBuilder) {
pBuilder->pSchema = NULL;
pBuilder->sversion = 0;
pBuilder->tCols = 128;
pBuilder->nCols = 0;
pBuilder->pColIdx = (SColIdx*)malloc(sizeof(SColIdx) * pBuilder->tCols);
if (pBuilder->pColIdx == NULL) return -1;
pBuilder->alloc = 1024;
pBuilder->size = 0;
pBuilder->buf = malloc(pBuilder->alloc);
if (pBuilder->buf == NULL) {
free(pBuilder->pColIdx);
return -1;
}
return 0;
}
void tdDestroyMemRowBuilder(SMemRowBuilder* pBuilder) {
tfree(pBuilder->pColIdx);
tfree(pBuilder->buf);
}
void tdResetMemRowBuilder(SMemRowBuilder* pBuilder) {
pBuilder->nCols = 0;
pBuilder->size = 0;
}
#define KvRowNullColRatio 0.75 // If nullable column ratio larger than 0.75, utilize SKVRow, otherwise SDataRow.
#define KvRowNColsThresh 4096 // default value: 32
static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32_t nCols, int32_t flen,
uint16_t* nColsNotNull) {
ASSERT(pData != NULL);
if (nCols < KvRowNColsThresh) {
return SMEM_ROW_DATA;
}
int32_t dataRowLen = flen;
int32_t kvRowLen = 0;
uint16_t nColsNull = 0;
char* p = (char*)pData;
for (int i = 0; i < nCols; ++i) {
if (IS_VAR_DATA_TYPE(pSchema[i].type)) {
dataRowLen += varDataTLen(p);
if (!isNull(p, pSchema[i].type)) {
kvRowLen += sizeof(SColIdx) + varDataTLen(p);
} else {
++nColsNull;
}
} else {
if (!isNull(p, pSchema[i].type)) {
kvRowLen += sizeof(SColIdx) + varDataTLen(p);
} else {
++nColsNull;
}
}
// next column
p += pSchema[i].bytes;
}
tscDebug("prop:nColsNull %d, nCols: %d, kvRowLen: %d, dataRowLen: %d", nColsNull, nCols, kvRowLen, dataRowLen);
if (kvRowLen < dataRowLen) {
if (nColsNotNull) {
*nColsNotNull = nCols - nColsNull;
}
return SMEM_ROW_KV;
}
return SMEM_ROW_DATA;
}
SMemRow tdGetMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf;
uint16_t nColsNotNull = 0;
uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull);
tscDebug("prop:memType is %d", memRowType);
memRowType = SMEM_ROW_DATA;
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType);
if (memRowType == SMEM_ROW_DATA) {
int toffset = 0;
SDataRow trow = (SDataRow)memRowBody(memRow);
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion);
p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) {
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p += pSchema[j].bytes;
}
pBuilder->buf = p;
} else {
uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull + pBuilder->size;
SKVRow row = (SKVRow)pBuilder->pDataBlock;
kvRowSetNCols(row, nColsNotNull);
kvRowSetLen(row, tlen);
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
}
pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + memRowTLen(memRow); // next row
pBuilder->pSubmitBlk->dataLen += memRowTLen(memRow);
// int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
// if (tlen == 0) return NULL;
// tlen += TD_KV_ROW_HEAD_SIZE;
// SKVRow row = malloc(tlen);
// if (row == NULL) return NULL;
// kvRowSetNCols(row, pBuilder->nCols);
// kvRowSetLen(row, tlen);
// memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
// memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
return NULL;
}
// Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) { static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bool includeSchema) {
// TODO: optimize this function, handle the case while binary is not presented // TODO: optimize this function, handle the case while binary is not presented
STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
...@@ -1675,12 +1805,24 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo ...@@ -1675,12 +1805,24 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
char* p = pTableDataBlock->pData + sizeof(SSubmitBlk); char* p = pTableDataBlock->pData + sizeof(SSubmitBlk);
pBlock->dataLen = 0; pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows); int32_t numOfRows = htons(pBlock->numOfRows);
SMemRowBuilder mRowBuilder;
mRowBuilder.pSchema = pSchema;
mRowBuilder.sversion = pTableMeta->sversion;
mRowBuilder.flen = flen;
mRowBuilder.nCols = tinfo.numOfColumns;
mRowBuilder.pDataBlock = pDataBlock;
mRowBuilder.pSubmitBlk = pBlock;
mRowBuilder.buf = p;
mRowBuilder.size = 0;
for (int32_t i = 0; i < numOfRows; ++i) { for (int32_t i = 0; i < numOfRows; ++i) {
SDataRow trow = (SDataRow) pDataBlock; #if 0
SDataRow trow = (SDataRow)pDataBlock; // generate each SDataRow one by one
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen)); dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen));
dataRowSetVersion(trow, pTableMeta->sversion); dataRowSetVersion(trow, pTableMeta->sversion);
// scan each column data and generate the data row
int toffset = 0; int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; j++) { for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset); tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
...@@ -1688,8 +1830,10 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo ...@@ -1688,8 +1830,10 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
p += pSchema[j].bytes; p += pSchema[j].bytes;
} }
pDataBlock = (char*)pDataBlock + dataRowLen(trow); pDataBlock = (char*)pDataBlock + dataRowLen(trow); // next SDataRow
pBlock->dataLen += dataRowLen(trow); pBlock->dataLen += dataRowLen(trow); // SSubmitBlk data length
#endif
tdGetMemRowFromBuilder(&mRowBuilder);
} }
int32_t len = pBlock->dataLen + pBlock->schemaLen; int32_t len = pBlock->dataLen + pBlock->schemaLen;
...@@ -1701,13 +1845,14 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo ...@@ -1701,13 +1845,14 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
static int32_t getRowExpandSize(STableMeta* pTableMeta) { static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_DATA_ROW_HEAD_SIZE; int32_t result = TD_DATA_ROW_HEAD_SIZE;
int32_t columns = tscGetNumOfColumns(pTableMeta); int32_t columns = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta);
for(int32_t i = 0; i < columns; i++) { for(int32_t i = 0; i < columns; i++) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
} }
} }
result += TD_MEM_ROW_TYPE_SIZE; // add len of SMemRow flag
return result; return result;
} }
......
...@@ -24,6 +24,31 @@ ...@@ -24,6 +24,31 @@
extern "C" { extern "C" {
#endif #endif
typedef struct {
VarDataLenT len;
uint8_t data;
} SBinaryNullT;
typedef struct {
VarDataLenT len;
uint32_t data;
} SNCharNullT;
extern const uint8_t BoolNull;
extern const uint8_t TinyintNull;
extern const uint16_t SmallintNull;
extern const uint32_t IntNull;
extern const uint64_t BigintNull;
extern const uint64_t TimestampNull;
extern const uint8_t UTinyintNull;
extern const uint16_t USmallintNull;
extern const uint32_t UIntNull;
extern const uint64_t UBigintNull;
extern const uint32_t FloatNull;
extern const uint64_t DoubleNull;
extern const SBinaryNullT BinaryNull;
extern const SNCharNullT NcharNull;
#define STR_TO_VARSTR(x, str) \ #define STR_TO_VARSTR(x, str) \
do { \ do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \ VarDataLenT __len = (VarDataLenT)strlen(str); \
...@@ -45,10 +70,10 @@ extern "C" { ...@@ -45,10 +70,10 @@ extern "C" {
// ----------------- TSDB COLUMN DEFINITION // ----------------- TSDB COLUMN DEFINITION
typedef struct { typedef struct {
int8_t type; // Column type int8_t type; // Column type
int16_t colId; // column ID int16_t colId; // column ID
int16_t bytes; // column bytes uint16_t bytes; // column bytes
int16_t offset; // point offset in SDataRow after the header part uint16_t offset; // point offset in SDataRow/SKVRow after the header part.
} STColumn; } STColumn;
#define colType(col) ((col)->type) #define colType(col) ((col)->type)
...@@ -159,9 +184,9 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { ...@@ -159,9 +184,9 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
return 0; return 0;
} }
} }
// ----------------- Data row structure // ----------------- Sequential Data row structure
/* A data row, the format is like below: /* A sequential data row, the format is like below:
* |<--------------------+--------------------------- len ---------------------------------->| * |<--------------------+--------------------------- len ---------------------------------->|
* |<-- Head -->|<--------- flen -------------->| | * |<-- Head -->|<--------- flen -------------->| |
* +---------------------+---------------------------------+---------------------------------+ * +---------------------+---------------------------------+---------------------------------+
...@@ -173,6 +198,18 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { ...@@ -173,6 +198,18 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
* NOTE: timestamp in this row structure is TKEY instead of TSKEY * NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/ */
typedef void *SDataRow; typedef void *SDataRow;
/* A memory data row, the format is like below:
*|---------+---------------------+--------------------------- len ---------------------------------->|
*|<- type->|<-- Head -->|<--------- flen -------------->| |
*|---------+---------------------+---------------------------------+---------------------------------+
*| uint8_t | uint16_t | int16_t | | |
*|---------+----------+----------+---------------------------------+---------------------------------+
*| flag | len | sversion | First part | Second part |
*|---------+----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/
typedef void *SMemRow;
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) #define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
...@@ -187,13 +224,14 @@ typedef void *SDataRow; ...@@ -187,13 +224,14 @@ typedef void *SDataRow;
#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) #define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE)
#define dataRowDeleted(r) TKEY_IS_DELETED(dataRowTKey(r)) #define dataRowDeleted(r) TKEY_IS_DELETED(dataRowTKey(r))
SDataRow tdNewDataRowFromSchema(STSchema *pSchema); // SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row); // void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema); void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); // SDataRow tdDataRowDup(SDataRow row);
SMemRow tdMemRowDup(SMemRow row);
// offset here not include dataRow header length // offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int32_t offset) { static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t bytes, int32_t offset) {
ASSERT(value != NULL); ASSERT(value != NULL);
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE; int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row)); char * ptr = (char *)POINTER_SHIFT(row, dataRowLen(row));
...@@ -215,15 +253,6 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i ...@@ -215,15 +253,6 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, void *value, int8_t type, i
return 0; return 0;
} }
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
if (IS_VAR_DATA_TYPE(type)) {
return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset));
} else {
return POINTER_SHIFT(row, offset);
}
}
// ----------------- Data column structure // ----------------- Data column structure
typedef struct SDataCol { typedef struct SDataCol {
int8_t type; // column type int8_t type; // column type
...@@ -237,17 +266,56 @@ typedef struct SDataCol { ...@@ -237,17 +266,56 @@ typedef struct SDataCol {
TSKEY ts; // only used in last NULL column TSKEY ts; // only used in last NULL column
} SDataCol; } SDataCol;
#define isAllRowOfColNull(pCol) ((pCol)->len == 0)
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints); void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
static const void *tdGetNullVal(int8_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return &BoolNull;
case TSDB_DATA_TYPE_TINYINT:
return &TinyintNull;
case TSDB_DATA_TYPE_SMALLINT:
return &SmallintNull;
case TSDB_DATA_TYPE_INT:
return &IntNull;
case TSDB_DATA_TYPE_BIGINT:
return &BigintNull;
case TSDB_DATA_TYPE_FLOAT:
return &FloatNull;
case TSDB_DATA_TYPE_DOUBLE:
return &DoubleNull;
case TSDB_DATA_TYPE_BINARY:
return &BinaryNull;
case TSDB_DATA_TYPE_TIMESTAMP:
return &TimestampNull;
case TSDB_DATA_TYPE_NCHAR:
return &NcharNull;
case TSDB_DATA_TYPE_UTINYINT:
return &UTinyintNull;
case TSDB_DATA_TYPE_USMALLINT:
return &USmallintNull;
case TSDB_DATA_TYPE_UINT:
return &UIntNull;
case TSDB_DATA_TYPE_UBIGINT:
return &UBigintNull;
default:
ASSERT(0);
return NULL;
}
}
// Get the data pointer from a column-wised data // Get the data pointer from a column-wised data
static FORCE_INLINE void *tdGetColDataOfRow(SDataCol *pCol, int row) { static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) {
if (isAllRowOfColNull(pCol)) {
return tdGetNullVal(pCol->type);
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
} else { } else {
...@@ -279,7 +347,7 @@ typedef struct { ...@@ -279,7 +347,7 @@ typedef struct {
} SDataCols; } SDataCols;
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] #define dataColsTKeyAt(pCols, idx) ((TKEY *)(keyCol(pCols)->pData))[(idx)] // the idx row of column-wised data
#define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx)) #define dataColsKeyAt(pCols, idx) tdGetKey(dataColsTKeyAt(pCols, idx))
static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) { static FORCE_INLINE TKEY dataColsTKeyFirst(SDataCols *pCols) {
if (pCols->numOfRows) { if (pCols->numOfRows) {
...@@ -318,13 +386,13 @@ void tdResetDataCols(SDataCols *pCols); ...@@ -318,13 +386,13 @@ void tdResetDataCols(SDataCols *pCols);
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
SDataCols *tdFreeDataCols(SDataCols *pCols); SDataCols *tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols); void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols);
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset); int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset);
// ----------------- K-V data row structure // ----------------- K-V data row structure
/* /*
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
* | int16_t | int16_t | | | * | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part | * | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+ * +----------+----------+---------------------------------+---------------------------------+
...@@ -332,13 +400,13 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge ...@@ -332,13 +400,13 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge
typedef void *SKVRow; typedef void *SKVRow;
typedef struct { typedef struct {
int16_t colId; int16_t colId;
int16_t offset; uint16_t offset;
} SColIdx; } SColIdx;
#define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t)) #define TD_KV_ROW_HEAD_SIZE (2 * sizeof(int16_t))
#define kvRowLen(r) (*(int16_t *)(r)) #define kvRowLen(r) (*(uint16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) #define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len) #define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n) #define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
...@@ -349,6 +417,11 @@ typedef struct { ...@@ -349,6 +417,11 @@ typedef struct {
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) tfree(r) #define kvRowFree(r) tfree(r)
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r)) #define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#define kvRowVersion(r) (-1)
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r)))
#define kvRowKey(r) tdGetKey(kvRowTKey(r))
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
SKVRow tdKVRowDup(SKVRow row); SKVRow tdKVRowDup(SKVRow row);
int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value); int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
...@@ -377,8 +450,8 @@ typedef struct { ...@@ -377,8 +450,8 @@ typedef struct {
int16_t tCols; int16_t tCols;
int16_t nCols; int16_t nCols;
SColIdx *pColIdx; SColIdx *pColIdx;
int16_t alloc; uint16_t alloc;
int16_t size; uint16_t size;
void * buf; void * buf;
} SKVRowBuilder; } SKVRowBuilder;
...@@ -414,6 +487,94 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, ...@@ -414,6 +487,94 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
return 0; return 0;
} }
// ----------------- Data row structure
// ----------------- Sequential Data row structure
/* A sequential data row, the format is like below:
* |<--------------------+--------------------------- len ---------------------------------->|
* |<-- Head -->|<--------- flen -------------->| |
* +---------------------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | sversion | First part | Second part |
* +----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/
// ----------------- K-V data row structure
/*
* +----------+----------+---------------------------------+---------------------------------+
* | int16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+
*/
#define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t)
#define TD_MEM_ROW_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + sizeof(uint16_t) + sizeof(int16_t))
#define SMEM_ROW_DATA 0U // SDataRow
#define SMEM_ROW_KV 1U // SKVRow
#define TD_DO_NOTHING \
do { \
} while (0)
#define memRowType(r) (*(uint8_t *)(r))
#define memRowBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)
#define memRowLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowTLen(r) (*(uint16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) + (uint16_t)TD_MEM_ROW_TYPE_SIZE)
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
#define memRowVersion(r) (isDataRow(r) ? dataRowVersion(memRowBody(r)) : kvRowVersion(r)) // schema version
#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowBody(r)) : kvRowValues(memRowBody(r)))
#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowBody(r)) : kvRowTKey(memRowBody(r)))
#define memRowKey(r) (isDataRow(r) ? dataRowKey(memRowBody(r)) : kvRowKey(memRowBody(r)))
#define memRowSetType(r, t) (memRowType(r) = (t))
#define memRowSetLen(r, l) (memRowLen(r) = (l))
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(r, v) : TD_DO_NOTHING)
#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r))
#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_HEAD_SIZE)
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
#define memRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE + sizeof(int16_t))) // for SKVRow
#define memRowSetNCols(r, n) memRowNCols(r) = (n) // for SKVRow
#define memRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE) // for SKVRow
#define memRowValues(r) POINTER_SHIFT(r, TD_MEM_ROW_HEAD_SIZE + sizeof(SColIdx) * memRowNCols(r)) // for SKVRow
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(void *row, int8_t type, int32_t offset) {
if (IS_VAR_DATA_TYPE(type)) {
return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset));
} else {
return POINTER_SHIFT(row, offset);
}
return NULL;
}
static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int8_t type, int32_t offset) {
return POINTER_SHIFT(row, offset);
}
static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t offset) {
if (isDataRow(row)) {
return tdGetRowDataOfCol(row, type, offset);
} else if (isKvRow(row)) {
return tdGetKvRowDataOfCol(row, type, offset);
} else {
ASSERT(0);
}
return NULL;
}
// #define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
// #define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
// #define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
// #define kvRowFree(r) tfree(r)
// #define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -18,6 +18,21 @@ ...@@ -18,6 +18,21 @@
#include "tcoding.h" #include "tcoding.h"
#include "wchar.h" #include "wchar.h"
const uint8_t BoolNull = TSDB_DATA_BOOL_NULL;
const uint8_t TinyintNull = TSDB_DATA_TINYINT_NULL;
const uint16_t SmallintNull = TSDB_DATA_SMALLINT_NULL;
const uint32_t IntNull = TSDB_DATA_INT_NULL;
const uint64_t BigintNull = TSDB_DATA_BIGINT_NULL;
const uint64_t TimestampNull = TSDB_DATA_BIGINT_NULL;
const uint8_t UTinyintNull = TSDB_DATA_UTINYINT_NULL;
const uint16_t USmallintNull = TSDB_DATA_USMALLINT_NULL;
const uint32_t UIntNull = TSDB_DATA_UINT_NULL;
const uint64_t UBigintNull = TSDB_DATA_UBIGINT_NULL;
const uint32_t FloatNull = TSDB_DATA_FLOAT_NULL;
const uint64_t DoubleNull = TSDB_DATA_DOUBLE_NULL;
const SBinaryNullT BinaryNull = {1, TSDB_DATA_BINARY_NULL};
const SNCharNullT NcharNull = {4, TSDB_DATA_NCHAR_NULL};
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows); int limit2, int tRows);
...@@ -173,36 +188,43 @@ void tdInitDataRow(SDataRow row, STSchema *pSchema) { ...@@ -173,36 +188,43 @@ void tdInitDataRow(SDataRow row, STSchema *pSchema) {
dataRowSetVersion(row, schemaVersion(pSchema)); dataRowSetVersion(row, schemaVersion(pSchema));
} }
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { // SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
int32_t size = dataRowMaxBytesFromSchema(pSchema); // int32_t size = dataRowMaxBytesFromSchema(pSchema);
SDataRow row = malloc(size); // SDataRow row = malloc(size);
if (row == NULL) return NULL; // if (row == NULL) return NULL;
tdInitDataRow(row, pSchema); // tdInitDataRow(row, pSchema);
return row; // return row;
} // }
/** /**
* Free the SDataRow object * Free the SDataRow object
*/ */
void tdFreeDataRow(SDataRow row) { // void tdFreeDataRow(SDataRow row) {
if (row) free(row); // if (row) free(row);
} // }
// SDataRow tdDataRowDup(SDataRow row) {
// SDataRow trow = malloc(dataRowLen(row));
// if (trow == NULL) return NULL;
SDataRow tdDataRowDup(SDataRow row) { // dataRowCpy(trow, row);
SDataRow trow = malloc(dataRowLen(row)); // return trow;
// }
SMemRow tdMemRowDup(SMemRow row) {
SMemRow trow = malloc(memRowTLen(row));
if (trow == NULL) return NULL; if (trow == NULL) return NULL;
dataRowCpy(trow, row); memRowCpy(trow, row);
return trow; return trow;
} }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) { void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) {
pDataCol->type = colType(pCol); pDataCol->type = colType(pCol);
pDataCol->colId = colColId(pCol); pDataCol->colId = colColId(pCol);
pDataCol->bytes = colBytes(pCol); pDataCol->bytes = colBytes(pCol);
pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; pDataCol->offset = colOffset(pCol) + TD_MEM_ROW_HEAD_SIZE;
pDataCol->len = 0; pDataCol->len = 0;
if (IS_VAR_DATA_TYPE(pDataCol->type)) { if (IS_VAR_DATA_TYPE(pDataCol->type)) {
...@@ -219,9 +241,21 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -219,9 +241,21 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
} }
// value from timestamp should be TKEY here instead of TSKEY // value from timestamp should be TKEY here instead of TSKEY
void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { void dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL); ASSERT(pCol != NULL && value != NULL);
if (pCol->len == 0) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return;
}
if (numOfRows > 0) {
// Find the first not null value, fill all previous values as NULL
dataColSetNEleNull(pCol, numOfRows, maxPoints);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) { if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset // set offset
pCol->dataOff[numOfRows] = pCol->len; pCol->dataOff[numOfRows] = pCol->len;
...@@ -399,11 +433,10 @@ void tdResetDataCols(SDataCols *pCols) { ...@@ -399,11 +433,10 @@ void tdResetDataCols(SDataCols *pCols) {
} }
} }
} }
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
int rcol = 0; int rcol = 0; // rowCol
int dcol = 0; int dcol = 0;
if (dataRowDeleted(row)) { if (dataRowDeleted(row)) {
...@@ -419,7 +452,7 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) ...@@ -419,7 +452,7 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
while (dcol < pCols->numOfCols) { while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]); SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) { if (rcol >= schemaNCols(pSchema)) {
dataColSetNullAt(pDataCol, pCols->numOfRows); dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
continue; continue;
} }
...@@ -432,8 +465,90 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) ...@@ -432,8 +465,90 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
rcol++; rcol++;
} else if (pRowCol->colId < pDataCol->colId) { } else if (pRowCol->colId < pDataCol->colId) {
rcol++; rcol++;
} else {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
}
}
}
pCols->numOfRows++;
}
static void tdGetKVRowColInfo(const STSchema *pSchema, SColIdx *pColIdx, int nRowCols, STColumn *pSTColumn,
int *nColMatched) {
int nSchema = schemaNCols(pSchema);
int iCol = 0;
int iSchema = 0;
int nColMatch = 0;
SColIdx * pIdx = pColIdx;
const STColumn *pColumn = NULL;
while (iCol < nRowCols && iSchema < nSchema) {
pColumn = &pSchema->columns[iSchema];
if (pIdx->colId == pColumn->colId) {
pSTColumn[nColMatch].colId = pIdx->colId;
pSTColumn[nColMatch].type = pColumn->type;
pSTColumn[nColMatch].bytes = pColumn->bytes;
pSTColumn[nColMatch].offset = pIdx->offset;
pIdx += sizeof(SColIdx);
++iCol;
++iSchema;
++nColMatch;
} else if (pIdx->colId > pColumn->colId) {
++iSchema;
} else {
pIdx += sizeof(SColIdx);
++iCol;
}
}
*nColMatched = nColMatch;
}
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0;
int dcol = 0;
if (kvRowDeleted(row)) {
for (; dcol < pCols->numOfCols; dcol++) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (dcol == 0) {
dataColAppendVal(pDataCol, kvRowValues(row), pCols->numOfRows, pCols->maxPoints);
} else { } else {
dataColSetNullAt(pDataCol, pCols->numOfRows); dataColSetNullAt(pDataCol, pCols->numOfRows);
}
}
} else {
int nRowCols = kvRowNCols(row);
int nRowColsMatched = 0;
STColumn stColumn[nRowCols];
tdGetKVRowColInfo(pSchema, kvRowColIdx(row), nRowCols, stColumn, &nRowColsMatched);
uDebug("kvRow: nRowCols=%d, nRowColsMatched=%d, nSchemaCols=%d", nRowCols, nRowColsMatched, schemaNCols(pSchema));
while (dcol < pCols->numOfCols) {
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowColsMatched || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++;
continue;
}
SColIdx *colIdx = kvRowColIdxAt(row, rcol);
if (colIdx->colId == pDataCol->colId) {
ASSERT(pDataCol->type == stColumn[rcol].type);
void *value = tdGetKvRowDataOfCol(row, pDataCol->type, stColumn[rcol].offset + TD_KV_ROW_HEAD_SIZE);
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
} else if (colIdx->colId < pDataCol->colId) {
rcol++;
} else {
dataColAppendVal(pDataCol, tdGetNullVal(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
dcol++; dcol++;
} }
} }
...@@ -441,6 +556,16 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols) ...@@ -441,6 +556,16 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
pCols->numOfRows++; pCols->numOfRows++;
} }
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols) {
if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowBody(row), pSchema, pCols);
} else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowBody(row), pSchema, pCols);
} else {
ASSERT(0);
}
}
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols); ASSERT(target->numOfCols == source->numOfCols);
...@@ -559,11 +684,11 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { ...@@ -559,11 +684,11 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
void * ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE); void * ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE);
if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row
int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; uint16_t diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff); nrow = malloc(kvRowLen(row) + sizeof(SColIdx) + diff);
if (nrow == NULL) return -1; if (nrow == NULL) return -1;
kvRowSetLen(nrow, kvRowLen(row) + (int16_t)sizeof(SColIdx) + diff); kvRowSetLen(nrow, kvRowLen(row) + (uint16_t)sizeof(SColIdx) + diff);
kvRowSetNCols(nrow, kvRowNCols(row) + 1); kvRowSetNCols(nrow, kvRowNCols(row) + 1);
if (ptr == NULL) { if (ptr == NULL) {
...@@ -605,8 +730,8 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { ...@@ -605,8 +730,8 @@ int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) {
if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place
memcpy(pOldVal, value, varDataTLen(value)); memcpy(pOldVal, value, varDataTLen(value));
} else { // need to reallocate the memory } else { // need to reallocate the memory
int16_t diff = varDataTLen(value) - varDataTLen(pOldVal); uint16_t diff = varDataTLen(value) - varDataTLen(pOldVal);
int16_t nlen = kvRowLen(row) + diff; uint16_t nlen = kvRowLen(row) + diff;
ASSERT(nlen > 0); ASSERT(nlen > 0);
nrow = malloc(nlen); nrow = malloc(nlen);
if (nrow == NULL) return -1; if (nrow == NULL) return -1;
...@@ -693,7 +818,7 @@ void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) { ...@@ -693,7 +818,7 @@ void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
} }
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; uint16_t tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
if (tlen == 0) return NULL; if (tlen == 0) return NULL;
tlen += TD_KV_ROW_HEAD_SIZE; tlen += TD_KV_ROW_HEAD_SIZE;
...@@ -709,3 +834,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { ...@@ -709,3 +834,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
return row; return row;
} }
...@@ -483,8 +483,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -483,8 +483,8 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead)); SSubmitMsg *pMsg = (SSubmitMsg *) (buffer + sizeof(SWalHead));
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg)); SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SDataRow trow = (SDataRow)pBlk->data; SMemRow trow = (SMemRow)pBlk->data;
tdInitDataRow(trow, pSchema); tdInitDataRow(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), pSchema);
for (int32_t i = 0; i < pSchema->numOfCols; i++) { for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i; STColumn *c = pSchema->columns + i;
...@@ -500,9 +500,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -500,9 +500,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
memcpy((char *)val + sizeof(VarDataLenT), buf, len); memcpy((char *)val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len; varDataLen(val) = len;
} }
tdAppendColVal(trow, val, c->type, c->bytes, c->offset); tdAppendColVal(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), val, c->type, c->bytes, c->offset);
} }
pBlk->dataLen = htonl(dataRowLen(trow)); pBlk->dataLen = htonl(memRowTLen(trow));
pBlk->schemaLen = 0; pBlk->schemaLen = 0;
pBlk->uid = htobe64(pObj->uid); pBlk->uid = htobe64(pObj->uid);
...@@ -511,7 +511,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { ...@@ -511,7 +511,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
pBlk->sversion = htonl(pSchema->version); pBlk->sversion = htonl(pSchema->version);
pBlk->padding = 0; pBlk->padding = 0;
pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowLen(trow); pHead->len = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + memRowTLen(trow);
pMsg->header.vgId = htonl(pContext->vgId); pMsg->header.vgId = htonl(pContext->vgId);
pMsg->header.contLen = htonl(pHead->len); pMsg->header.contLen = htonl(pHead->len);
......
...@@ -11,7 +11,7 @@ extern "C" { ...@@ -11,7 +11,7 @@ extern "C" {
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT; typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT; typedef int16_t VarDataLenT; // maxDataLen: 32767
typedef struct tstr { typedef struct tstr {
VarDataLenT len; VarDataLenT len;
......
...@@ -71,27 +71,27 @@ int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxK ...@@ -71,27 +71,27 @@ int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxK
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo); void* tsdbCommitData(STsdbRepo* pRepo);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL; if (pIter == NULL) return NULL;
SSkipListNode* node = tSkipListIterGet(pIter); SSkipListNode* node = tSkipListIterGet(pIter);
if (node == NULL) return NULL; if (node == NULL) return NULL;
return (SDataRow)SL_GET_NODE_DATA(node); return (SMemRow)SL_GET_NODE_DATA(node);
} }
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) { static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
SDataRow row = tsdbNextIterRow(pIter); SMemRow row = tsdbNextIterRow(pIter);
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
return dataRowKey(row); return memRowKey(row);
} }
static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) { static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
SDataRow row = tsdbNextIterRow(pIter); SMemRow row = tsdbNextIterRow(pIter);
if (row == NULL) return TKEY_NULL; if (row == NULL) return TKEY_NULL;
return dataRowTKey(row); return memRowTKey(row);
} }
#endif /* _TD_TSDB_MEMTABLE_H_ */ #endif /* _TD_TSDB_MEMTABLE_H_ */
\ No newline at end of file
...@@ -32,7 +32,7 @@ typedef struct STable { ...@@ -32,7 +32,7 @@ typedef struct STable {
void* eventHandler; // TODO void* eventHandler; // TODO
void* streamHandler; // TODO void* streamHandler; // TODO
TSKEY lastKey; TSKEY lastKey;
SDataRow lastRow; SMemRow lastRow;
char* sql; char* sql;
void* cqhandle; void* cqhandle;
SRWLatch latch; // TODO: implementa latch functions SRWLatch latch; // TODO: implementa latch functions
...@@ -148,7 +148,7 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { ...@@ -148,7 +148,7 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
} }
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) { static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow)); ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow)));
return pTable->lastKey; return pTable->lastKey;
} }
......
...@@ -920,7 +920,8 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo ...@@ -920,7 +920,8 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
SDataCol * pDataCol = pDataCols->cols + ncol; SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it // if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
if (isAllRowOfColNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue; continue;
} }
...@@ -1264,12 +1265,12 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1264,12 +1265,12 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
while (true) { while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
bool isRowDel = false; bool isRowDel = false;
SDataRow row = tsdbNextIterRow(pCommitIter->pIter); SMemRow row = tsdbNextIterRow(pCommitIter->pIter);
if (row == NULL || dataRowKey(row) > maxKey) { if (row == NULL || memRowKey(row) > maxKey) {
key2 = INT64_MAX; key2 = INT64_MAX;
} else { } else {
key2 = dataRowKey(row); key2 = memRowKey(row);
isRowDel = dataRowDeleted(row); isRowDel = memRowDeleted(row);
} }
if (key1 == INT64_MAX && key2 == INT64_MAX) break; if (key1 == INT64_MAX && key2 == INT64_MAX) break;
...@@ -1284,24 +1285,24 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt ...@@ -1284,24 +1285,24 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
(*iter)++; (*iter)++;
} else if (key1 > key2) { } else if (key1 > key2) {
if (!isRowDel) { if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendDataRowToDataCol(row, pSchema, pTarget); tdAppendMemRowToDataCol(row, pSchema, pTarget);
} }
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
} else { } else {
if (update) { if (update) {
if (!isRowDel) { if (!isRowDel) {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row));
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendDataRowToDataCol(row, pSchema, pTarget); tdAppendMemRowToDataCol(row, pSchema, pTarget);
} }
} else { } else {
ASSERT(!isRowDel); ASSERT(!isRowDel);
......
...@@ -639,7 +639,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -639,7 +639,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
int numColumns; int numColumns;
int32_t blockIdx; int32_t blockIdx;
SDataStatis* pBlockStatis = NULL; SDataStatis* pBlockStatis = NULL;
SDataRow row = NULL; SMemRow row = NULL;
// restore last column data with last schema // restore last column data with last schema
int err = 0; int err = 0;
...@@ -655,13 +655,13 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -655,13 +655,13 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
} }
} }
row = taosTMalloc(dataRowMaxBytesFromSchema(pSchema)); row = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (row == NULL) { if (row == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
err = -1; err = -1;
goto out; goto out;
} }
tdInitDataRow(row, pSchema); tdInitDataRow(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), pSchema);
// first load block index info // first load block index info
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
...@@ -718,9 +718,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -718,9 +718,10 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// OK,let's load row from backward to get not-null column // OK,let's load row from backward to get not-null column
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type,
pCol->bytes, pCol->offset);
//SDataCol *pDataCol = readh.pDCols[0]->cols + j; //SDataCol *pDataCol = readh.pDCols[0]->cols + j;
void* value = tdGetRowDataOfCol(row, (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset); void *value = tdGetMemRowDataOfCol(row, (int8_t)pCol->type, TD_MEM_ROW_HEAD_SIZE + pCol->offset);
if (isNull(value, pCol->type)) { if (isNull(value, pCol->type)) {
continue; continue;
} }
...@@ -740,8 +741,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea ...@@ -740,8 +741,9 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// save row ts(in column 0) // save row ts(in column 0)
pDataCol = pReadh->pDCols[0]->cols + 0; pDataCol = pReadh->pDCols[0]->cols + 0;
pCol = schemaColAt(pSchema, 0); pCol = schemaColAt(pSchema, 0);
tdAppendColVal(row, tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->bytes, pCol->offset); tdAppendColVal(POINTER_SHIFT(row, TD_MEM_ROW_TYPE_SIZE), tdGetColDataOfRow(pDataCol, rowId), pCol->type,
pLastCol->ts = dataRowKey(row); pCol->bytes, pCol->offset);
pLastCol->ts = memRowKey(row);
pTable->restoreColumnNum += 1; pTable->restoreColumnNum += 1;
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
typedef struct { typedef struct {
int32_t totalLen; int32_t totalLen;
int32_t len; int32_t len;
SDataRow row; SMemRow row;
} SSubmitBlkIter; } SSubmitBlkIter;
typedef struct { typedef struct {
...@@ -36,20 +36,19 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); ...@@ -36,20 +36,19 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData); static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data); static char * tsdbGetTsTupleKey(const void *data);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow); static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter); static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter); static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row); static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now); TSKEY now);
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
...@@ -354,7 +353,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -354,7 +353,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
TSKEY fKey = 0; TSKEY fKey = 0;
bool isRowDel = false; bool isRowDel = false;
int filterIter = 0; int filterIter = 0;
SDataRow row = NULL; SMemRow row = NULL;
SMergeInfo mInfo; SMergeInfo mInfo;
if (pMergeInfo == NULL) pMergeInfo = &mInfo; if (pMergeInfo == NULL) pMergeInfo = &mInfo;
...@@ -365,12 +364,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -365,12 +364,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
if (pCols) tdResetDataCols(pCols); if (pCols) tdResetDataCols(pCols);
row = tsdbNextIterRow(pIter); row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) { if (row == NULL || memRowKey(row) > maxKey) {
rowKey = INT64_MAX; rowKey = INT64_MAX;
isRowDel = false; isRowDel = false;
} else { } else {
rowKey = dataRowKey(row); rowKey = memRowKey(row);
isRowDel = dataRowDeleted(row); isRowDel = memRowDeleted(row);
} }
if (filterIter >= nFilterKeys) { if (filterIter >= nFilterKeys) {
...@@ -407,12 +406,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -407,12 +406,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tSkipListIterNext(pIter); tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter); row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) { if (row == NULL || memRowKey(row) > maxKey) {
rowKey = INT64_MAX; rowKey = INT64_MAX;
isRowDel = false; isRowDel = false;
} else { } else {
rowKey = dataRowKey(row); rowKey = memRowKey(row);
isRowDel = dataRowDeleted(row); isRowDel = memRowDeleted(row);
} }
} else { } else {
if (isRowDel) { if (isRowDel) {
...@@ -437,12 +436,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -437,12 +436,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
tSkipListIterNext(pIter); tSkipListIterNext(pIter);
row = tsdbNextIterRow(pIter); row = tsdbNextIterRow(pIter);
if (row == NULL || dataRowKey(row) > maxKey) { if (row == NULL || memRowKey(row) > maxKey) {
rowKey = INT64_MAX; rowKey = INT64_MAX;
isRowDel = false; isRowDel = false;
} else { } else {
rowKey = dataRowKey(row); rowKey = memRowKey(row);
isRowDel = dataRowDeleted(row); isRowDel = memRowDeleted(row);
} }
filterIter++; filterIter++;
...@@ -548,7 +547,7 @@ static void tsdbFreeTableData(STableData *pTableData) { ...@@ -548,7 +547,7 @@ static void tsdbFreeTableData(STableData *pTableData) {
} }
} }
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } static char *tsdbGetTsTupleKey(const void *data) { return memRowTuple((SMemRow)data); }
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
ASSERT(pMemTable->maxTables < maxTables); ASSERT(pMemTable->maxTables < maxTables);
...@@ -572,17 +571,17 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { ...@@ -572,17 +571,17 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
return 0; return 0;
} }
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row) { static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
if (pCols) { if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != dataRowVersion(row)) { if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, dataRowVersion(row)); *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row));
if (*ppSchema == NULL) { if (*ppSchema == NULL) {
ASSERT(false); ASSERT(false);
return -1; return -1;
} }
} }
tdAppendDataRowToDataCol(row, *ppSchema, pCols); tdAppendMemRowToDataCol(row, *ppSchema, pCols);
} }
return 0; return 0;
...@@ -592,31 +591,32 @@ static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { ...@@ -592,31 +591,32 @@ static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1; if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen; pIter->totalLen = pBlock->dataLen;
pIter->len = 0; pIter->len = 0;
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen); pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen);
return 0; return 0;
} }
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
SDataRow row = pIter->row; SMemRow row = pIter->row; // firstly, get current row
if (row == NULL) return NULL; if (row == NULL) return NULL;
pIter->len += dataRowLen(row); pIter->len += memRowTLen(row);
if (pIter->len >= pIter->totalLen) { if (pIter->len >= pIter->totalLen) { // reach the end
pIter->row = NULL; pIter->row = NULL;
} else { } else {
pIter->row = (char *)row + dataRowLen(row); pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row
} }
return row; return row;
} }
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) { TSKEY now) {
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) { TSKEY rowKey = memRowKey(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64, " maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
dataRowKey(row)); rowKey);
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE; terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
return -1; return -1;
} }
...@@ -630,7 +630,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { ...@@ -630,7 +630,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk * pBlock = NULL; SSubmitBlk * pBlock = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
SDataRow row = NULL; SMemRow row = NULL;
TSKEY now = taosGetTimestamp(pRepo->config.precision); TSKEY now = taosGetTimestamp(pRepo->config.precision);
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep; TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
...@@ -698,7 +698,7 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t * ...@@ -698,7 +698,7 @@ static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *
int64_t points = 0; int64_t points = 0;
STable * pTable = NULL; STable * pTable = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
SDataRow row = NULL; SMemRow row = NULL;
void * rows[TSDB_MAX_INSERT_BATCH] = {0}; void * rows[TSDB_MAX_INSERT_BATCH] = {0};
int rowCounter = 0; int rowCounter = 0;
...@@ -744,10 +744,10 @@ _err: ...@@ -744,10 +744,10 @@ _err:
return -1; return -1;
} }
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) { static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void **ppRow) {
STsdbCfg * pCfg = &pRepo->config; STsdbCfg * pCfg = &pRepo->config;
TKEY tkey = dataRowTKey(row); TKEY tkey = memRowTKey(row);
TSKEY key = dataRowKey(row); TSKEY key = memRowKey(row);
bool isRowDelete = TKEY_IS_DELETED(tkey); bool isRowDelete = TKEY_IS_DELETED(tkey);
if (isRowDelete) { if (isRowDelete) {
...@@ -765,15 +765,15 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void ...@@ -765,15 +765,15 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void
} }
} }
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row)); void *pRow = tsdbAllocBytes(pRepo, memRowTLen(row));
if (pRow == NULL) { if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno)); REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), memRowTLen(row), tstrerror(terrno));
return -1; return -1;
} }
dataRowCpy(pRow, row); memRowCpy(pRow, row);
ppRow[0] = pRow; ppRow[0] = pRow; // save the memory address of data rows
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
...@@ -954,8 +954,8 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { ...@@ -954,8 +954,8 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
for (int i = rowCounter - 1; i >= 0; --i) { for (int i = rowCounter - 1; i >= 0; --i) {
SDataRow row = (SDataRow)rows[i]; SMemRow row = (SMemRow)rows[i];
int bytes = (int)dataRowLen(row); int bytes = (int)memRowTLen(row);
if (pRepo->mem->extraBuffList == NULL) { if (pRepo->mem->extraBuffList == NULL) {
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
...@@ -988,15 +988,16 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) { ...@@ -988,15 +988,16 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
} }
} }
static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow row) { static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) {
tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, dataRowVersion(row)); tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data,
memRowVersion(row));
STSchema* pSchema = tsdbGetTableLatestSchema(pTable); STSchema* pSchema = tsdbGetTableLatestSchema(pTable);
if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) {
return; return;
} }
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
if (pSchema == NULL) { if (pSchema == NULL) {
return; return;
} }
...@@ -1010,8 +1011,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r ...@@ -1010,8 +1011,8 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
if (idx == -1) { if (idx == -1) {
continue; continue;
} }
void* value = tdGetRowDataOfCol(row, (int8_t)pTCol->type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); void *value = tdGetMemRowDataOfCol(row, (int8_t)pTCol->type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset);
if (isNull(value, pTCol->type)) { if (isNull(value, pTCol->type)) {
continue; continue;
} }
...@@ -1027,11 +1028,11 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r ...@@ -1027,11 +1028,11 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SDataRow r
memcpy(pDataCol->pData, value, pDataCol->bytes); memcpy(pDataCol->pData, value, pDataCol->bytes);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
pDataCol->ts = dataRowKey(row); pDataCol->ts = memRowKey(row);
} }
} }
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) { static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row) {
STsdbCfg *pCfg = &pRepo->config; STsdbCfg *pCfg = &pRepo->config;
// if cacheLastRow config has been reset, free the lastRow // if cacheLastRow config has been reset, free the lastRow
...@@ -1042,31 +1043,31 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow ...@@ -1042,31 +1043,31 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow
TSDB_WUNLOCK_TABLE(pTable); TSDB_WUNLOCK_TABLE(pTable);
} }
if (tsdbGetTableLastKeyImpl(pTable) < dataRowKey(row)) { if (tsdbGetTableLastKeyImpl(pTable) < memRowKey(row)) {
if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) {
SDataRow nrow = pTable->lastRow; SMemRow nrow = pTable->lastRow;
if (taosTSizeof(nrow) < dataRowLen(row)) { if (taosTSizeof(nrow) < memRowTLen(row)) {
SDataRow orow = nrow; SMemRow orow = nrow;
nrow = taosTMalloc(dataRowLen(row)); nrow = taosTMalloc(memRowTLen(row));
if (nrow == NULL) { if (nrow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
dataRowCpy(nrow, row); memRowCpy(nrow, row);
TSDB_WLOCK_TABLE(pTable); TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = dataRowKey(row); pTable->lastKey = memRowKey(row);
pTable->lastRow = nrow; pTable->lastRow = nrow;
TSDB_WUNLOCK_TABLE(pTable); TSDB_WUNLOCK_TABLE(pTable);
taosTZfree(orow); taosTZfree(orow);
} else { } else {
TSDB_WLOCK_TABLE(pTable); TSDB_WLOCK_TABLE(pTable);
pTable->lastKey = dataRowKey(row); pTable->lastKey = memRowKey(row);
dataRowCpy(nrow, row); memRowCpy(nrow, row);
TSDB_WUNLOCK_TABLE(pTable); TSDB_WUNLOCK_TABLE(pTable);
} }
} else { } else {
pTable->lastKey = dataRowKey(row); pTable->lastKey = memRowKey(row);
} }
if (CACHE_LAST_NULL_COLUMN(pCfg)) { if (CACHE_LAST_NULL_COLUMN(pCfg)) {
......
...@@ -139,7 +139,7 @@ typedef struct STableGroupSupporter { ...@@ -139,7 +139,7 @@ typedef struct STableGroupSupporter {
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle); static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey); static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
...@@ -669,8 +669,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -669,8 +669,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
assert(node != NULL); assert(node != NULL);
SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); SMemRow row = (SMemRow)SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer TSKEY key = memRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64, "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64,
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pMem->keyFirst, pMem->keyLast, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pMem->keyFirst, pMem->keyLast,
...@@ -691,8 +691,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh ...@@ -691,8 +691,8 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
assert(node != NULL); assert(node != NULL);
SDataRow row = (SDataRow)SL_GET_NODE_DATA(node); SMemRow row = (SMemRow)SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer TSKEY key = memRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64, "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64,
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pIMem->keyFirst, pIMem->keyLast, pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pIMem->keyFirst, pIMem->keyLast,
...@@ -716,19 +716,19 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { ...@@ -716,19 +716,19 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) {
tSkipListDestroyIter(pCheckInfo->iiter); tSkipListDestroyIter(pCheckInfo->iiter);
} }
static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { static SMemRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) {
SDataRow rmem = NULL, rimem = NULL; SMemRow rmem = NULL, rimem = NULL;
if (pCheckInfo->iter) { if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) { if (node != NULL) {
rmem = (SDataRow)SL_GET_NODE_DATA(node); rmem = (SMemRow)SL_GET_NODE_DATA(node);
} }
} }
if (pCheckInfo->iiter) { if (pCheckInfo->iiter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) { if (node != NULL) {
rimem = (SDataRow)SL_GET_NODE_DATA(node); rimem = (SMemRow)SL_GET_NODE_DATA(node);
} }
} }
...@@ -746,8 +746,8 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order ...@@ -746,8 +746,8 @@ static SDataRow getSDataRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order
return rimem; return rimem;
} }
TSKEY r1 = dataRowKey(rmem); TSKEY r1 = memRowKey(rmem);
TSKEY r2 = dataRowKey(rimem); TSKEY r2 = memRowKey(rimem);
if (r1 == r2) { // data ts are duplicated, ignore the data in mem if (r1 == r2) { // data ts are duplicated, ignore the data in mem
if (!update) { if (!update) {
...@@ -826,12 +826,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) { ...@@ -826,12 +826,12 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
initTableMemIterator(pHandle, pCheckInfo); initTableMemIterator(pHandle, pCheckInfo);
} }
SDataRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update); SMemRow row = getSDataRowInTableMem(pCheckInfo, pHandle->order, pCfg->update);
if (row == NULL) { if (row == NULL) {
return false; return false;
} }
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer pCheckInfo->lastKey = memRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, 0x%"PRIx64, pHandle, tsdbDebug("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, 0x%"PRIx64, pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qId); pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qId);
...@@ -1082,11 +1082,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p ...@@ -1082,11 +1082,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo); /*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
assert(cur->pos >= 0 && cur->pos <= binfo.rows); assert(cur->pos >= 0 && cur->pos <= binfo.rows);
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL; TSKEY key = (row != NULL) ? memRowKey(row) : TSKEY_INITIAL_VAL;
if (key != TSKEY_INITIAL_VAL) { if (key != TSKEY_INITIAL_VAL) {
tsdbDebug("%p key in mem:%"PRId64", 0x%"PRIx64, pQueryHandle, key, pQueryHandle->qId); tsdbDebug("%p key in mem:%"PRId64", 0x%"PRIx64, pQueryHandle, key, pQueryHandle->qId);
} else { } else {
...@@ -1327,7 +1327,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity ...@@ -1327,7 +1327,7 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
// todo refactor, only copy one-by-one // todo refactor, only copy one-by-one
for (int32_t k = start; k < num + start; ++k) { for (int32_t k = start; k < num + start; ++k) {
char* p = tdGetColDataOfRow(src, k); const char* p = tdGetColDataOfRow(src, k);
memcpy(dst, p, varDataTLen(p)); memcpy(dst, p, varDataTLen(p));
dst += bytes; dst += bytes;
} }
...@@ -1378,14 +1378,14 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity ...@@ -1378,14 +1378,14 @@ int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity
return numOfRows + num; return numOfRows + num;
} }
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SMemRow row,
int32_t numOfCols, STable* pTable, STSchema* pSchema) { int32_t numOfCols, STable* pTable, STSchema* pSchema) {
char* pData = NULL; char* pData = NULL;
// the schema version info is embeded in SDataRow // the schema version info is embeded in SDataRow
int32_t numOfRowCols = 0; int32_t numOfRowCols = 0;
if (pSchema == NULL) { if (pSchema == NULL) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
numOfRowCols = schemaNCols(pSchema); numOfRowCols = schemaNCols(pSchema);
} else { } else {
numOfRowCols = schemaNCols(pSchema); numOfRowCols = schemaNCols(pSchema);
...@@ -1406,7 +1406,8 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, ...@@ -1406,7 +1406,8 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
} }
if (pSchema->columns[j].colId == pColInfo->info.colId) { if (pSchema->columns[j].colId == pColInfo->info.colId) {
void* value = tdGetRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_DATA_ROW_HEAD_SIZE + pSchema->columns[j].offset); void* value =
tdGetMemRowDataOfCol(row, (int8_t)pColInfo->info.type, TD_MEM_ROW_HEAD_SIZE + pSchema->columns[j].offset);
switch (pColInfo->info.type) { switch (pColInfo->info.type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
...@@ -1656,12 +1657,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1656,12 +1657,12 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
} else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) {
SSkipListNode* node = NULL; SSkipListNode* node = NULL;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
if (row == NULL) { if (row == NULL) {
break; break;
} }
TSKEY key = dataRowKey(row); TSKEY key = memRowKey(row);
if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key > pQueryHandle->window.ekey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key < pQueryHandle->window.ekey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break; break;
...@@ -1674,11 +1675,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1674,11 +1675,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (rv != dataRowVersion(row)) { if (rv != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
rv = dataRowVersion(row); rv = memRowVersion(row);
} }
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
...@@ -1692,11 +1693,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1692,11 +1693,11 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
if (pCfg->update) { if (pCfg->update) {
if (rv != dataRowVersion(row)) { if (rv != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
rv = dataRowVersion(row); rv = memRowVersion(row);
} }
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema); copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable, pSchema);
numOfRows += 1; numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
...@@ -1746,8 +1747,10 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1746,8 +1747,10 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
* copy them all to result buffer, since it may be overlapped with file data block. * copy them all to result buffer, since it may be overlapped with file data block.
*/ */
if (node == NULL || if (node == NULL ||
((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) || ((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) > pQueryHandle->window.ekey) &&
((dataRowKey((SDataRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) { ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) < pQueryHandle->window.ekey) &&
!ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block // no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
...@@ -2333,12 +2336,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -2333,12 +2336,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
STSchema* pSchema = NULL; STSchema* pSchema = NULL;
do { do {
SDataRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update); SMemRow row = getSDataRowInTableMem(pCheckInfo, pQueryHandle->order, pCfg->update);
if (row == NULL) { if (row == NULL) {
break; break;
} }
TSKEY key = dataRowKey(row); TSKEY key = memRowKey(row);
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) { if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey, tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
pQueryHandle->window.ekey); pQueryHandle->window.ekey);
...@@ -2351,9 +2354,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int ...@@ -2351,9 +2354,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} }
win->ekey = key; win->ekey = key;
if (rv != dataRowVersion(row)) { if (rv != memRowVersion(row)) {
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); pSchema = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row));
rv = dataRowVersion(row); rv = memRowVersion(row);
} }
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema); copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable, pSchema);
...@@ -2470,7 +2473,7 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) { ...@@ -2470,7 +2473,7 @@ static bool loadCachedLastRow(STsdbQueryHandle* pQueryHandle) {
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL; SMemRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL; TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
...@@ -2873,7 +2876,7 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) { ...@@ -2873,7 +2876,7 @@ bool tsdbGetExternalRow(TsdbQueryHandleT pHandle) {
* if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW
* else set pRes and return TSDB_CODE_SUCCESS and save lastKey * else set pRes and return TSDB_CODE_SUCCESS and save lastKey
*/ */
int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
TSDB_RLOCK_TABLE(pTable); TSDB_RLOCK_TABLE(pTable);
...@@ -2884,7 +2887,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) { ...@@ -2884,7 +2887,7 @@ int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastKey) {
} }
if (pRes) { if (pRes) {
*pRes = tdDataRowDup(pTable->lastRow); *pRes = tdMemRowDup(pTable->lastRow);
if (*pRes == NULL) { if (*pRes == NULL) {
code = TSDB_CODE_TDB_OUT_OF_MEMORY; code = TSDB_CODE_TDB_OUT_OF_MEMORY;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册