提交 df6166ea 编写于 作者: C Cary Xu

iterator/SDataCol adaption

上级 33133d26
......@@ -56,7 +56,7 @@ extern "C" {
// ----------------- TSDB COLUMN DEFINITION
typedef struct {
int8_t type; // Column type
col_id_t colId; // column ID
col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1))
int16_t bytes; // column bytes (restore to int16_t in case of misuse)
uint16_t offset; // point offset in SDataRow after the header part.
} STColumn;
......@@ -125,6 +125,8 @@ typedef struct {
#define TD_VTYPE_PARTS 4 // 8 bits / TD_VTYPE_BITS = 4
#define TD_VTYPE_OPTR 3 // TD_VTYPE_PARTS - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (ceil((double)cnt / TD_VTYPE_PARTS))
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
......
......@@ -86,10 +86,12 @@ typedef struct {
// TODO
} STpRow; // tuple
#pragma pack(push, 1)
typedef struct {
col_id_t cid;
uint32_t offset;
} SKvRowIdx;
#pragma pack(pop)
typedef struct {
uint16_t ncols;
......@@ -154,12 +156,12 @@ typedef struct {
#define TD_ROW_OFFSET(p) ((p)->toffset);
static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t tIdx, TDRowValT valType);
static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t tIdx, TDRowValT *pValType);
static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t colIdx, TDRowValT valType);
static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t colIdx, TDRowValT *pValType);
static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, const void *val, bool isCopyVarData,
int8_t colType, TDRowValT valType, int16_t idx, int32_t offset);
int8_t colType, TDRowValT valType, int16_t colIdx, int32_t offset);
static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, const void *val, bool isCopyValData,
int8_t colType, TDRowValT valType, int16_t idx, int32_t offset,
int8_t colType, TDRowValT valType, int16_t colIdx, int32_t offset,
int16_t colId);
static FORCE_INLINE void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_t flen, int16_t nBoundCols) {
......@@ -265,8 +267,8 @@ static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t
return TSDB_CODE_INVALID_PARA;
}
#ifdef TD_SUPPORT_BITMAP
pBuilder->nBitmaps = (int16_t)ceil((double)pBuilder->nCols / TD_VTYPE_PARTS);
pBuilder->nBoundBitmaps = (int16_t)ceil((double)pBuilder->nBoundCols / TD_VTYPE_PARTS);
pBuilder->nBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nCols);
pBuilder->nBoundBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nBoundCols);
#else
pBuilder->nBitmaps = 0;
pBuilder->nBoundBitmaps = 0;
......@@ -354,11 +356,11 @@ static FORCE_INLINE void tdSRowReset(SRowBuilder *pBuilder) {
* @param val
* @param valType
* @param offset
* @param idx sorted column index, start from 0
* @param colIdx sorted column index, start from 0
* @return FORCE_INLINE
*/
static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t colId, int8_t colType, const void *val,
TDRowValT valType, int32_t offset, int16_t idx) {
TDRowValT valType, int32_t offset, int16_t colIdx) {
STSRow *pRow = pBuilder->pBuf;
void * pBitmap = NULL;
if (!val) {
......@@ -377,7 +379,7 @@ static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t c
TD_ROW_TSKEY(pRow) = *(TSKEY *)val;
#ifdef TD_SUPPORT_BITMAP
pBitmap = tdGetBitmapAddr(pRow, pRow->type, pBuilder->flen, pRow->ncols);
if (tdRowSetValType(pBitmap, idx, valType) != TSDB_CODE_SUCCESS) {
if (tdRowSetValType(pBitmap, colIdx, valType) != TSDB_CODE_SUCCESS) {
return terrno;
}
#endif
......@@ -385,40 +387,41 @@ static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t c
}
// TODO: We can avoid the type judegement by FP, but would prevent the inline scheme.
// typedef int (*tdAppendColValToSRowFp)(STSRow *pRow, void *pBitmap, int16_t colId, int8_t colType,
// const void *val, int8_t valType, int32_t tOffset, int16_t tIdx);
// const void *val, int8_t valType, int32_t tOffset, int16_t
// colIdx);
if (TD_IS_TP_ROW(pRow)) {
tdAppendColValToTpRow(pRow, pBitmap, val, true, colType, valType, idx, offset);
tdAppendColValToTpRow(pRow, pBitmap, val, true, colType, valType, colIdx, offset);
} else {
tdAppendColValToKvRow(pRow, pBitmap, val, true, colType, valType, idx, offset, colId);
tdAppendColValToKvRow(pRow, pBitmap, val, true, colType, valType, colIdx, offset, colId);
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, const void *val, bool isCopyVarData,
int8_t colType, TDRowValT valType, int16_t idx, int32_t offset) {
ASSERT(offset >= sizeof(TSDB_DATA_TYPE_TIMESTAMP));
int8_t colType, TDRowValT valType, int16_t colIdx, int32_t offset) {
ASSERT(offset >= sizeof(TSKEY));
if (!tdValIsNone(valType)) {
if (IS_VAR_DATA_TYPE(colType)) {
// ts key stored in STSRow.ts
*(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP)) = TD_ROW_LEN(row);
*(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSKEY)) = TD_ROW_LEN(row);
if (isCopyVarData) {
memcpy(POINTER_SHIFT(row, TD_ROW_LEN(row)), val, varDataTLen(val));
}
TD_ROW_LEN(row) += varDataTLen(val);
} else {
memcpy(POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP)), val, TYPE_BYTES[colType]);
memcpy(POINTER_SHIFT(row->data, offset - sizeof(TSKEY)), val, TYPE_BYTES[colType]);
}
}
#ifdef TD_SUPPORT_BITMAP
tdRowSetValType(pBitmap, idx, valType);
tdRowSetValType(pBitmap, colIdx, valType);
#endif
return 0;
}
static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, const void *val, bool isCopyValData,
int8_t colType, TDRowValT valType, int16_t idx, int32_t offset,
int8_t colType, TDRowValT valType, int16_t colIdx, int32_t offset,
int16_t colId) {
ASSERT(offset >= sizeof(SKvRowIdx));
......@@ -441,19 +444,19 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, co
}
#ifdef TD_SUPPORT_BITMAP
tdRowSetValType(pBitmap, idx, valType);
tdRowSetValType(pBitmap, colIdx, valType);
#endif
return 0;
}
static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t tIdx, TDRowValT valType) {
static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t colIdx, TDRowValT valType) {
if (!pBitmap) {
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
}
int16_t nBytes = tIdx / TD_VTYPE_PARTS;
int16_t nOffset = tIdx & TD_VTYPE_OPTR;
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
......@@ -475,13 +478,13 @@ static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t tIdx, TDRowVa
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t tIdx, TDRowValT *pValType) {
if (!pBitmap) {
static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t colIdx, TDRowValT *pValType) {
if (!pBitmap || colIdx < 0) {
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
}
int16_t nBytes = tIdx / TD_VTYPE_PARTS;
int16_t nOffset = tIdx & TD_VTYPE_OPTR;
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
......@@ -503,83 +506,78 @@ static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t tIdx, TDRowVa
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdGetTpRowDataOfCol(SCellVal *output, STSRow *row, void *pBitmap, int8_t colType,
int32_t offset, int16_t idx) {
static FORCE_INLINE int32_t tdGetTpRowValOfCol(SCellVal *output, STSRow *row, void *pBitmap, int8_t colType,
int32_t offset, int16_t colIdx) {
#ifdef TD_SUPPORT_BITMAP
TDRowValT valType;
if (tdRowGetValType(pBitmap, idx, &valType) != TSDB_CODE_SUCCESS) {
if (tdRowGetValType(pBitmap, colIdx, &output->valType) != TSDB_CODE_SUCCESS) {
output->valType = TD_VTYPE_NONE;
return terrno;
}
output->valType = valType;
if (tdValIsNorm(valType, NULL, -1)) {
if (tdValTypeIsNorm(output->valType)) {
if (IS_VAR_DATA_TYPE(colType)) {
output->val =
POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP)));
POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSKEY)));
} else {
output->val = POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP));
output->val = POINTER_SHIFT(row->data, offset - sizeof(TSKEY));
}
}
#else
if (IS_VAR_DATA_TYPE(colType)) {
output->val =
POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP)));
POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSKEY)));
} else {
output->val = POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP));
output->val = POINTER_SHIFT(row->data, offset - sizeof(TSKEY));
}
output->valType = isNull(output->val, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
#endif
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int compareKvRowColId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE int32_t tdGetKvRowValOfCol(SKVRow row, int16_t colId) {
// void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
// if (ret == NULL) return NULL;
// return kvRowColVal(row, (SColIdx *)ret);
static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *row, void *pBitmap, col_type_t colType,
int32_t offset, int16_t colIdx) {
#ifdef TD_SUPPORT_BITMAP
if (tdRowGetValType(pBitmap, colIdx, &output->valType) != TSDB_CODE_SUCCESS) {
output->valType = TD_VTYPE_NONE;
return terrno;
}
if (tdValTypeIsNorm(output->valType)) {
if (offset < 0) {
terrno = TSDB_CODE_INVALID_PARA;
output->valType = TD_VTYPE_NONE;
return terrno;
}
output->val = POINTER_SHIFT(row, offset);
}
#else
if (offset < 0) {
terrno = TSDB_CODE_INVALID_PARA;
output->valType = TD_VTYPE_NONE;
return terrno;
}
output->val = POINTER_SHIFT(row, offset);
output->valType = isNull(output->val, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
#endif
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdGetSRowDataOfCol(SCellVal *output, STSRow *row, int16_t colId, int8_t colType,
int32_t offset, uint16_t flen, int16_t idx) {
void *pBitmap = tdGetBitmapAddr(row, row->type, flen, row->ncols);
static FORCE_INLINE int32_t tdGetSRowValOfCol(SCellVal *output, STSRow *row, void *pBitmap, int8_t colType,
int32_t offset, uint16_t flen, int16_t colIdx) {
if (TD_IS_TP_ROW(row)) {
return tdGetTpRowDataOfCol(output, row, pBitmap, colType, offset, idx);
return tdGetTpRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
} else {
return tdGetKvRowValOfCol(memRowKvBody(row), colId);
return tdGetKvRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
}
}
// NOTE: offset here including the header size
// static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); }
// static FORCE_INLINE void *tdGetKVRowValOfColEx(SKVRow row, int16_t colId, int32_t *nIdx) {
// while (*nIdx < kvRowNCols(row)) {
// SColIdx *pColIdx = kvRowColIdxAt(row, *nIdx);
// if (pColIdx->colId == colId) {
// ++(*nIdx);
// return tdGetKvRowDataOfCol(row, pColIdx->offset);
// } else if (pColIdx->colId > colId) {
// return NULL;
// } else {
// ++(*nIdx);
// }
// }
// return NULL;
// }
/**
* NOTE:
* 1. Applicable to scan columns one by one
* 2. offset here including the header size
*/
// static FORCE_INLINE SCellVal *tdGetRowDataOfColEx(void *row, int16_t colId, int8_t colType, int32_t offset,
// int32_t *kvNIdx) {
// if (isDataRow(row)) {
// return tdGetRowDataOfCol(memRowDataBody(row), colType, offset);
// } else {
// return tdGetKVRowValOfColEx(memRowKvBody(row), colId, kvNIdx);
// }
// }
typedef struct {
STSchema *pSchema;
......@@ -587,12 +585,16 @@ typedef struct {
void * pBitmap;
uint32_t offset;
col_id_t maxColId;
col_id_t colIdx; // [PRIMARYKEY_TIMESTAMP_COL_ID, nCols], PRIMARYKEY_TIMESTAMP_COL_ID equals 1
col_id_t kvIdx;
} STSRowIter;
static FORCE_INLINE void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) {
pIter->pRow = pRow;
pIter->pBitmap = tdGetBitmapAddr(pRow, pRow->type, pIter->pSchema->flen, pRow->ncols);
pIter->offset = 0;
pIter->colIdx = PRIMARYKEY_TIMESTAMP_COL_ID;
pIter->kvIdx = 0;
}
static FORCE_INLINE void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) {
......@@ -612,16 +614,18 @@ static int tdCompareColId(const void *arg1, const void *arg2) {
return 1;
}
}
/**
* @brief
* @brief STSRow method to get value of specified colId/colType by bsearch
*
* @param pIter
* @param colId
* @param colId Start from PRIMARYKEY_TIMESTAMP_COL_ID(1)
* @param colType
* @param pVal
* @return true Not reach end and pVal is set(None/Null/Norm).
* @return false Reach end of row and pVal not set.
* @return false Reach end and pVal not set.
*/
static FORCE_INLINE bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, SCellVal *pVal) {
static FORCE_INLINE bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts;
#ifdef TD_SUPPORT_BITMAP
......@@ -635,26 +639,118 @@ static FORCE_INLINE bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, SCe
}
STSRow *pRow = pIter->pRow;
int16_t colIdx = -1;
if (TD_IS_TP_ROW(pRow)) {
STSchema *pSchema = pIter->pSchema;
STColumn *pCol =
(STColumn *)taosbsearch(&colId, pSchema->columns, pSchema->numOfCols, sizeof(STColumn), tdCompareColId, TD_EQ);
if (!pCol) {
if (colId >= pIter->maxColId) return false;
pVal->valType = TD_VTYPE_NONE;
if (COL_REACH_END(colId, pIter->maxColId)) return false;
return true;
}
int16_t idx = POINTER_DISTANCE(pCol, pSchema->columns) / sizeof(STColumn);
#ifdef TD_SUPPORT_BITMAP
colIdx = POINTER_DISTANCE(pCol, pSchema->columns) / sizeof(STColumn);
#endif
tdGetTpRowValOfCol(pVal, pRow, pIter->pBitmap, pCol->type, pCol->offset, colIdx);
} else if (TD_IS_KV_ROW(pRow)) {
SKvRowIdx *pIdx =
(SKvRowIdx *)taosbsearch(&colId, pRow->data, pRow->ncols, sizeof(SKvRowIdx), compareKvRowColId, TD_EQ);
#ifdef TD_SUPPORT_BITMAP
if (pIdx) {
colIdx = POINTER_DISTANCE(pRow->data, pIdx) / sizeof(SKvRowIdx);
}
#endif
tdGetKvRowValOfCol(pVal, pRow, pIter->pBitmap, colType, pIdx ? pIdx->offset : -1, colIdx);
} else {
if (COL_REACH_END(colId, pIter->maxColId)) return false;
pVal->valType = TD_VTYPE_NONE;
return false;
}
return true;
}
static FORCE_INLINE bool tdGetTpRowDataOfCol(STSRow *pRow, col_type_t type, int32_t offset, SCellVal *pVal) {
if (IS_VAR_DATA_TYPE(type)) {
pVal->val = POINTER_SHIFT(pRow, *(VarDataOffsetT *)POINTER_SHIFT(pRow->data, offset));
} else {
pVal->val = POINTER_SHIFT(pRow->data, offset);
}
return true;
}
static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRow *pRow, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal) {
while (*nIdx < pRow->ncols) {
SKvRowIdx *pKvIdx = POINTER_SHIFT(pRow->data , *nIdx * sizeof(SKvRowIdx));
if (pKvIdx->cid == colId) {
++(*nIdx);
pVal->val = POINTER_SHIFT(pRow, pKvIdx->offset);
break;
} else if (pKvIdx->cid > colId) {
return false;
} else {
++(*nIdx);
}
}
return true;
}
/**
* @brief STSRow Iter to get value from colId 1 to maxColId ascendingly
*
* @param pIter
* @param pVal
* @return true Not reach end and pVal is set(None/Null/Norm).
* @return false Reach end of row and pVal not set.
*/
static FORCE_INLINE bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts;
#ifdef TD_SUPPORT_BITMAP
if (tdRowGetValType(pIter->pBitmap, 0, &pVal->valType) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE;
}
#else
pVal->valType = isNull(pVal->val, TSDB_DATA_TYPE_TIMESTAMP) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
#endif
++(pIter->colIdx);
return true;
}
if (TD_IS_TP_ROW(pIter->pRow)) {
STColumn *pCol = NULL;
STSchema *pSchema = pIter->pSchema;
while (pIter->colIdx <= pSchema->numOfCols) {
pCol = &pSchema->columns[pIter->colIdx];
if (colId == pCol->colId) {
++pIter->colIdx;
break;
} else if (colId < pCol->colId) {
++pIter->colIdx;
continue;
} else {
return false;
}
}
return tdGetTpRowDataOfCol(pIter->pRow, pCol->type, pCol->offset - sizeof(TSKEY), pVal);
} else if(TD_IS_KV_ROW(pIter->pRow)){
return tdGetKvRowValOfColEx(pIter->pRow, colId, colType, &pIter->kvIdx, pVal);
} else {
pVal->valType = TD_VTYPE_NONE;
terrno = TSDB_CODE_INVALID_PARA;
if (COL_REACH_END(colId, pIter->maxColId)) return false;
}
return true;
}
static FORCE_INLINE void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) {
pIter->pRow = pRow;
pIter->pBitmap = tdGetBitmapAddr(pRow, pRow->type, pIter->pSchema->flen, pRow->ncols);
pIter->offset = 0;
pIter->colIdx = PRIMARYKEY_TIMESTAMP_COL_ID;
pIter->kvIdx = 0;
}
#ifdef TROW_ORIGIN_HZ
typedef struct {
uint32_t nRows;
......
......@@ -13,6 +13,7 @@ typedef int32_t VarDataOffsetT;
typedef uint32_t TDRowLenT;
typedef uint8_t TDRowValT;
typedef uint16_t col_id_t;
typedef int8_t col_type_t;
typedef struct tstr {
VarDataLenT len;
......
......@@ -321,6 +321,7 @@ do { \
#define TSDB_MAX_BINARY_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
#define TSDB_MAX_NCHAR_LEN (TSDB_MAX_FIELD_LEN-TSDB_KEYSIZE) // keep 16384
#define PRIMARYKEY_TIMESTAMP_COL_ID 1
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
#define TSDB_MAX_RPC_THREADS 5
......
......@@ -28,6 +28,9 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
if(IS_VAR_DATA_TYPE(pCol->type)) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
}
#ifdef TD_SUPPORT_BITMAP
spaceNeeded += (int)TD_BITMAP_BYTES(maxPoints);
#endif
if(pCol->spaceSize < spaceNeeded) {
void* ptr = realloc(pCol->pData, spaceNeeded);
if(ptr == NULL) {
......@@ -39,9 +42,17 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
pCol->spaceSize = spaceNeeded;
}
}
if(IS_VAR_DATA_TYPE(pCol->type)) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
#ifdef TD_SUPPORT_BITMAP
pCol->pBitmap = POINTER_SHIFT(pCol->dataOff, sizeof(VarDataOffsetT) * maxPoints);
#endif
}
#ifdef TD_SUPPORT_BITMAP
else {
pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
}
#endif
return 0;
}
......@@ -186,7 +197,7 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) {
schemaVLen(pSchema) = pBuilder->vlen;
#ifdef TD_SUPPORT_BITMAP
schemaTLen(pSchema) += (int)ceil((double)schemaNCols(pSchema) / TD_VTYPE_PARTS);
schemaTLen(pSchema) += (int)TD_BITMAP_BYTES(schemaNCols(pSchema));
#endif
memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols);
......@@ -479,7 +490,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
pCols->numOfRows++;
}
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
static void tdAppendKVRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
int rcol = 0;
......@@ -520,7 +531,7 @@ void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, b
if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
} else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
tdAppendKVRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
} else {
ASSERT(0);
}
......
......@@ -475,9 +475,10 @@ void tdResetDataCols(SDataCols *pCols) {
}
}
}
#endif
static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row));
static void tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow));
int rcol = 0;
int dcol = 0;
......@@ -493,7 +494,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
void *value = tdGetRowDataOfCol(pRow, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
......@@ -510,13 +511,13 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
pCols->numOfRows++;
}
static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row));
static void tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow));
int rcol = 0;
int dcol = 0;
int nRowCols = kvRowNCols(row);
int nRowCols = TD_ROW_NCOLS(pRow);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
......@@ -527,10 +528,10 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
continue;
}
SColIdx *colIdx = kvRowColIdxAt(row, rcol);
SColIdx *colIdx = NULL;//kvRowColIdxAt(row, rcol);
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
void *value = NULL; //tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
++dcol;
......@@ -547,16 +548,19 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
pCols->numOfRows++;
}
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
if (isDataRow(row)) {
tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull);
} else if (isKvRow(row)) {
tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull);
void tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
if (TD_IS_TP_ROW(pRow)) {
tdAppendTpRowToDataCol(pRow, pSchema, pCols, forceSetNull);
} else if (TD_IS_KV_ROW(pRow)) {
tdAppendKvRowToDataCol(pRow, pSchema, pCols, forceSetNull);
} else {
ASSERT(0);
}
}
#if 0
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfCols == source->numOfCols);
......
......@@ -1185,6 +1185,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *
int32_t flen; // final length
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
#ifdef TD_SUPPORT_BITMAP
tlen += (int32_t)TD_BITMAP_BYTES(rowsToWrite);
#endif
void * tptr;
// Make room
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册