提交 cf859069 编写于 作者: C Cary Xu

update

上级 79bfedd0
......@@ -126,6 +126,7 @@ typedef struct {
#define TD_VTYPE_OPTR 3 // TD_VTYPE_PARTS - 1, utilize to get remainder
#define TD_BITMAP_BYTES(cnt) (ceil((double)cnt / TD_VTYPE_PARTS))
#define TD_BIT_TO_BYTES(cnt) (ceil((double)cnt / 8))
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version);
void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder);
......@@ -362,7 +363,7 @@ typedef struct SDataCol {
int len; // column data length
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
void * pData; // Actual data pointer
void * pBitmap; // Bitmap pointer to support None value
void * pBitmap; // Bitmap pointer to mark Null/Norm(1 bit for each row)
TSKEY ts; // only used in last NULL column
} SDataCol;
......@@ -400,11 +401,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) {
}
typedef struct {
int maxCols; // max number of columns
col_id_t maxCols; // max number of columns
col_id_t numOfCols; // Total number of cols
int maxPoints; // max number of points
int numOfRows;
int numOfCols; // Total number of cols
int sversion; // TODO: set sversion
int sversion; // TODO: set sversion
SDataCol *cols;
} SDataCols;
......
......@@ -17,9 +17,8 @@
#define _TD_COMMON_ROW_H_
#include "os.h"
#include "tdef.h"
#include "taoserror.h"
#include "talgo.h"
#include "taoserror.h"
#include "tbuffer.h"
#include "tdataformat.h"
#include "tdef.h"
......@@ -38,7 +37,11 @@ extern "C" {
#define TD_ROW_TP 0x0 // default
#define TD_ROW_KV 0x01
// val type
/**
* @brief val type
* - for data from client input and STSRow in memory, 3 types of val none/null/norm available
* - for data in
*/
#define TD_VTYPE_NORM 0x0U // normal val: not none, not null
#define TD_VTYPE_NONE 0x01U // none or unknown/undefined
#define TD_VTYPE_NULL 0x02U // null val
......@@ -138,8 +141,8 @@ typedef struct {
#define TD_ROW_LEN(r) ((r)->len)
#define TD_ROW_TSKEY(r) ((r)->ts)
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and (int)ceil((double)nCols/TD_VTYPE_PARTS)
// should be added if TD_SUPPORT_BITMAP defined.
// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and
// (int)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined.
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN)
#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t))
......@@ -154,7 +157,7 @@ typedef struct {
#define TD_BOOL_STR(b) ((b) ? "true" : "false")
#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert))
#define TD_ROW_OFFSET(p) ((p)->toffset);
#define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow?
static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t colIdx, TDRowValT valType);
static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t colIdx, TDRowValT *pValType);
......@@ -164,13 +167,17 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, co
int8_t colType, TDRowValT valType, int16_t colIdx, int32_t offset,
int16_t colId);
static FORCE_INLINE void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_t flen, int16_t nBoundCols) {
static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) { return POINTER_SHIFT(pRow, flen); }
static FORCE_INLINE void *tdGetBitmapAddrKv(STSRow *pRow, col_id_t nBoundCols) {
return POINTER_SHIFT(pRow, nBoundCols * sizeof(SKvRowIdx));
}
static FORCE_INLINE void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_t flen, col_id_t nBoundCols) {
#ifdef TD_SUPPORT_BITMAP
switch (rowType) {
case TD_ROW_TP:
return POINTER_SHIFT(pRow, flen);
return tdGetBitmapAddrTp(pRow, flen);
case TD_ROW_KV:
return POINTER_SHIFT(pRow, nBoundCols * sizeof(SKvRowIdx));
return tdGetBitmapAddrKv(pRow, nBoundCols);
default:
break;
}
......@@ -178,9 +185,65 @@ static FORCE_INLINE void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_
return NULL;
}
void tdFreeTpRow(STpRow row);
void tdInitTpRow(STpRow row, STSchema *pSchema);
STpRow tdTpRowDup(STpRow row);
// void tdFreeTpRow(STpRow row);
// void tdInitTpRow(STpRow row, STSchema *pSchema);
// STpRow tdTpRowDup(STpRow row);
static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t colIdx, TDRowValT valType) {
if (!pBitmap || colIdx < 0) {
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
}
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
*pDestByte = ((*pDestByte) & 0x3F) | (valType << 6);
break;
case 1:
*pDestByte = ((*pDestByte) & 0xCF) | (valType << 4);
break;
case 2:
*pDestByte = ((*pDestByte) & 0xF3) | (valType << 2);
break;
case 3:
*pDestByte = ((*pDestByte) & 0xFC) | valType;
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t colIdx, TDRowValT *pValType) {
if (!pBitmap || colIdx < 0) {
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
}
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
*pValType = (((*pDestByte) & 0xC0) >> 6);
break;
case 1:
*pValType = (((*pDestByte) & 0x30) >> 4);
break;
case 2:
*pValType = (((*pDestByte) & 0x0C) >> 2);
break;
case 3:
*pValType = ((*pDestByte) & 0x03);
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return TSDB_CODE_SUCCESS;
}
// ----------------- Tuple row structure(STpRow)
/*
......@@ -283,10 +346,10 @@ static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t
* @param pBuf
*/
static FORCE_INLINE int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
pBuilder->pBuf = (STSRow*)pBuf;
pBuilder->pBuf = (STSRow *)pBuf;
if (!pBuilder->pBuf) {
terrno = TSDB_CODE_INVALID_PARA;
return TSDB_CODE_INVALID_PARA;
return terrno;
}
uint32_t len = 0;
switch (pBuilder->rowType) {
......@@ -309,7 +372,7 @@ static FORCE_INLINE int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
return TSDB_CODE_INVALID_PARA;
return terrno;
}
return TSDB_CODE_SUCCESS;
}
......@@ -328,7 +391,6 @@ static FORCE_INLINE int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
*/
static FORCE_INLINE int32_t tdSRowInitEx(SRowBuilder *pBuilder, void *pBuf, uint32_t allNullLen, uint32_t boundNullLen,
int32_t nCols, int32_t nBoundCols, int32_t flen) {
tdSRowSetExtendedInfo(pBuilder, allNullLen, boundNullLen, nCols, nBoundCols, flen);
return tdSRowResetBuf(pBuilder, pBuf);
}
......@@ -347,56 +409,6 @@ static FORCE_INLINE void tdSRowReset(SRowBuilder *pBuilder) {
pBuilder->pBitmap = NULL;
}
/**
* @brief
*
* @param pBuilder
* @param colId start from PRIMARYKEY_TIMESTAMP_COL_ID
* @param colType
* @param val
* @param valType
* @param offset
* @param colIdx sorted column index, start from 0
* @return FORCE_INLINE
*/
static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t colId, int8_t colType, const void *val,
TDRowValT valType, int32_t offset, int16_t colIdx) {
STSRow *pRow = pBuilder->pBuf;
void * pBitmap = NULL;
if (!val) {
#ifdef TD_SUPPORT_BITMAP
if (tdValIsNorm(valType, val, colType)) {
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
}
#else
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
#endif
}
// TS KEY is stored in STSRow.ts and not included in STSRow.data field.
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
TD_ROW_TSKEY(pRow) = *(TSKEY *)val;
#ifdef TD_SUPPORT_BITMAP
pBitmap = tdGetBitmapAddr(pRow, pRow->type, pBuilder->flen, pRow->ncols);
if (tdRowSetValType(pBitmap, colIdx, valType) != TSDB_CODE_SUCCESS) {
return terrno;
}
#endif
return TSDB_CODE_SUCCESS;
}
// TODO: We can avoid the type judegement by FP, but would prevent the inline scheme.
// typedef int (*tdAppendColValToSRowFp)(STSRow *pRow, void *pBitmap, int16_t colId, int8_t colType,
// const void *val, int8_t valType, int32_t tOffset, int16_t
// colIdx);
if (TD_IS_TP_ROW(pRow)) {
tdAppendColValToTpRow(pRow, pBitmap, val, true, colType, valType, colIdx, offset);
} else {
tdAppendColValToKvRow(pRow, pBitmap, val, true, colType, valType, colIdx, offset, colId);
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, const void *val, bool isCopyVarData,
int8_t colType, TDRowValT valType, int16_t colIdx, int32_t offset) {
ASSERT(offset >= sizeof(TSKEY));
......@@ -430,7 +442,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, co
SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row->data, offset - sizeof(SKvRowIdx));
char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row));
pColIdx->colId = colId;
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN
if (IS_VAR_DATA_TYPE(colType)) {
if (isCopyValData) {
......@@ -442,7 +454,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, co
TD_ROW_LEN(row) += TYPE_BYTES[colType];
}
}
#ifdef TD_SUPPORT_BITMAP
tdRowSetValType(pBitmap, colIdx, valType);
#endif
......@@ -450,58 +462,52 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, co
return 0;
}
static FORCE_INLINE int32_t tdRowSetValType(void *pBitmap, int16_t colIdx, TDRowValT valType) {
if (!pBitmap) {
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
}
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
*pDestByte = ((*pDestByte) & 0x3F) | (valType << 6);
break;
case 1:
*pDestByte = ((*pDestByte) & 0xCF) | (valType << 4);
break;
case 2:
*pDestByte = ((*pDestByte) & 0xF3) | (valType << 2);
break;
case 3:
*pDestByte = ((*pDestByte) & 0xFC) | valType;
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
/**
* @brief
*
* @param pBuilder
* @param colId start from PRIMARYKEY_TIMESTAMP_COL_ID
* @param colType
* @param val
* @param valType
* @param offset
* @param colIdx sorted column index, start from 0
* @return FORCE_INLINE
*/
static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t colId, int8_t colType, const void *val,
TDRowValT valType, int32_t offset, int16_t colIdx) {
STSRow *pRow = pBuilder->pBuf;
void * pBitmap = NULL;
if (!val) {
#ifdef TD_SUPPORT_BITMAP
if (tdValIsNorm(valType, val, colType)) {
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdRowGetValType(void *pBitmap, int16_t colIdx, TDRowValT *pValType) {
if (!pBitmap || colIdx < 0) {
}
#else
terrno = TSDB_CODE_INVALID_PTR;
return terrno;
#endif
}
int16_t nBytes = colIdx / TD_VTYPE_PARTS;
int16_t nOffset = colIdx & TD_VTYPE_OPTR;
char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes);
switch (nOffset) {
case 0:
*pValType = (((*pDestByte) & 0xC0) >> 6);
break;
case 1:
*pValType = (((*pDestByte) & 0x30) >> 4);
break;
case 2:
*pValType = (((*pDestByte) & 0x0C) >> 2);
break;
case 3:
*pValType = ((*pDestByte) & 0x03);
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
// TS KEY is stored in STSRow.ts and not included in STSRow.data field.
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
TD_ROW_TSKEY(pRow) = *(TSKEY *)val;
#ifdef TD_SUPPORT_BITMAP
pBitmap = tdGetBitmapAddr(pRow, pRow->type, pBuilder->flen, pRow->ncols);
if (tdRowSetValType(pBitmap, colIdx, valType) != TSDB_CODE_SUCCESS) {
return terrno;
}
#endif
return TSDB_CODE_SUCCESS;
}
// TODO: We can avoid the type judegement by FP, but would prevent the inline scheme.
// typedef int (*tdAppendColValToSRowFp)(STSRow *pRow, void *pBitmap, int16_t colId, int8_t colType,
// const void *val, int8_t valType, int32_t tOffset, int16_t
// colIdx);
if (TD_IS_TP_ROW(pRow)) {
tdAppendColValToTpRow(pRow, pBitmap, val, true, colType, valType, colIdx, offset);
} else {
tdAppendColValToKvRow(pRow, pBitmap, val, true, colType, valType, colIdx, offset, colId);
}
return TSDB_CODE_SUCCESS;
}
......@@ -515,16 +521,14 @@ static FORCE_INLINE int32_t tdGetTpRowValOfCol(SCellVal *output, STSRow *row, vo
}
if (tdValTypeIsNorm(output->valType)) {
if (IS_VAR_DATA_TYPE(colType)) {
output->val =
POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSKEY)));
output->val = POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSKEY)));
} else {
output->val = POINTER_SHIFT(row->data, offset - sizeof(TSKEY));
}
}
#else
if (IS_VAR_DATA_TYPE(colType)) {
output->val =
POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSKEY)));
output->val = POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSKEY)));
} else {
output->val = POINTER_SHIFT(row->data, offset - sizeof(TSKEY));
}
......@@ -569,15 +573,14 @@ static FORCE_INLINE int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *row, vo
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdGetSRowValOfCol(SCellVal *output, STSRow *row, void *pBitmap, int8_t colType,
int32_t offset, uint16_t flen, int16_t colIdx) {
if (TD_IS_TP_ROW(row)) {
return tdGetTpRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
} else {
return tdGetKvRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
}
}
// static FORCE_INLINE int32_t tdGetSRowValOfCol(SCellVal *output, STSRow *row, void *pBitmap, int8_t colType,
// int32_t offset, uint16_t flen, int16_t colIdx) {
// if (TD_IS_TP_ROW(row)) {
// return tdGetTpRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
// } else {
// return tdGetKvRowValOfCol(output, row, pBitmap, colType, offset, colIdx);
// }
// }
typedef struct {
STSchema *pSchema;
......@@ -713,9 +716,7 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId,
#ifdef TD_SUPPORT_BITMAP
int16_t colIdx = -1;
if (pKvIdx) {
colIdx = POINTER_DISTANCE(pRow->data, pKvIdx) / sizeof(SKvRowIdx);
}
if (pKvIdx) colIdx = POINTER_DISTANCE(pRow->data, pKvIdx) / sizeof(SKvRowIdx);
if (tdRowGetValType(pIter->pBitmap, colIdx, &pVal->valType) != TSDB_CODE_SUCCESS) {
pVal->valType = TD_VTYPE_NONE;
}
......@@ -731,6 +732,9 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId,
*
* @param pIter
* @param pVal
* @param colId
* @param colType
* @param pVal output
* @return true Not reach end and pVal is set(None/Null/Norm).
* @return false Reach end of row and pVal not set.
*/
......@@ -764,7 +768,7 @@ static FORCE_INLINE bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col
}
}
return tdGetTpRowDataOfCol(pIter, pCol->type, pCol->offset - sizeof(TSKEY), pVal);
} else if(TD_IS_KV_ROW(pIter->pRow)){
} else if (TD_IS_KV_ROW(pIter->pRow)) {
return tdGetKvRowValOfColEx(pIter, colId, colType, &pIter->kvIdx, pVal);
} else {
pVal->valType = TD_VTYPE_NONE;
......@@ -774,14 +778,6 @@ static FORCE_INLINE bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col
return true;
}
static FORCE_INLINE void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow) {
pIter->pRow = pRow;
pIter->pBitmap = tdGetBitmapAddr(pRow, pRow->type, pIter->pSchema->flen, pRow->ncols);
pIter->offset = 0;
pIter->colIdx = PRIMARYKEY_TIMESTAMP_COL_ID;
pIter->kvIdx = 0;
}
#ifdef TROW_ORIGIN_HZ
typedef struct {
uint32_t nRows;
......@@ -814,11 +810,11 @@ typedef struct {
typedef struct {
STSchema *pSchema;
STSRow * pRow;
STSRow * pRow;
} STSRowReader;
typedef struct {
uint32_t it;
uint32_t it;
STSRowBatch *pRowBatch;
} STSRowBatchIter;
......
......@@ -29,7 +29,7 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
}
#ifdef TD_SUPPORT_BITMAP
spaceNeeded += (int)TD_BITMAP_BYTES(maxPoints);
spaceNeeded += (int)TD_BIT_TO_BYTES(maxPoints);
#endif
if(pCol->spaceSize < spaceNeeded) {
void* ptr = realloc(pCol->pData, spaceNeeded);
......
......@@ -15,6 +15,34 @@
#include "trow.h"
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows, bool forceSetNull);
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
setVardataNull(ptr, pCol->type);
pCol->len += varDataTLen(ptr);
} else {
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
pCol->len += TYPE_BYTES[pCol->type];
}
}
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->len = 0;
for (int i = 0; i < nEle; i++) {
dataColSetNullAt(pCol, i);
}
} else {
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
pCol->len = TYPE_BYTES[pCol->type] * nEle;
}
}
#if 0
void trbSetRowInfo(SRowBuilder *pRB, bool del, uint16_t sver) {
// TODO
......@@ -54,31 +82,7 @@ int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) {
#include "wchar.h"
#include "tarray.h"
static void dataColSetNEleNull(SDataCol *pCol, int nEle);
static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2,
int limit2, int tRows, bool forceSetNull);
int tdAllocMemForCol(SDataCol *pCol, int maxPoints) {
int spaceNeeded = pCol->bytes * maxPoints;
if(IS_VAR_DATA_TYPE(pCol->type)) {
spaceNeeded += sizeof(VarDataOffsetT) * maxPoints;
}
if(pCol->spaceSize < spaceNeeded) {
void* ptr = realloc(pCol->pData, spaceNeeded);
if(ptr == NULL) {
uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded,
strerror(errno));
return -1;
} else {
pCol->pData = ptr;
pCol->spaceSize = spaceNeeded;
}
}
if(IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints);
}
return 0;
}
/**
* Duplicate the schema and return a new object
......@@ -274,38 +278,12 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) {
pDataCol->len = 0;
}
// value from timestamp should be TKEY here instead of TSKEY
int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL);
if (isAllRowsNull(pCol)) {
if (isNull(value, pCol->type)) {
// all null value yet, just return
return 0;
}
#endif
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
if (numOfRows > 0) {
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset
pCol->dataOff[numOfRows] = pCol->len;
// Copy data
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Update the length
pCol->len += varDataTLen(value);
} else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes;
}
return 0;
}
#if 0
static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]);
......@@ -322,29 +300,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
return true;
}
static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->dataOff[index] = pCol->len;
char *ptr = POINTER_SHIFT(pCol->pData, pCol->len);
setVardataNull(ptr, pCol->type);
pCol->len += varDataTLen(ptr);
} else {
setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes);
pCol->len += TYPE_BYTES[pCol->type];
}
}
static void dataColSetNEleNull(SDataCol *pCol, int nEle) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
pCol->len = 0;
for (int i = 0; i < nEle; i++) {
dataColSetNullAt(pCol, i);
}
} else {
setNullN(pCol->pData, pCol->type, pCol->bytes, nEle);
pCol->len = TYPE_BYTES[pCol->type] * nEle;
}
}
void dataColSetOffset(SDataCol *pCol, int nEle) {
ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR)));
......@@ -477,33 +433,68 @@ void tdResetDataCols(SDataCols *pCols) {
}
#endif
int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL);
if (isAllRowsNull(pCol)) {
if (tdValIsNone(valType) || tdValIsNull(valType, val, pCol->type)) {
// all Null value yet, just return
return 0;
}
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
if (numOfRows > 0) {
// Find the first not null value, fill all previous values as Null
dataColSetNEleNull(pCol, numOfRows);
}
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
// set offset
pCol->dataOff[numOfRows] = pCol->len;
// Copy data
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, varDataTLen(val));
// Update the length
pCol->len += varDataTLen(val);
} else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), val, pCol->bytes);
pCol->len += pCol->bytes;
}
return 0;
}
static void tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow));
int rcol = 0;
int dcol = 0;
void* pBitmap = tdGetBitmapAddrTp(pRow, pSchema->flen);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints);
dcol++;
continue;
}
STColumn *pRowCol = schemaColAt(pSchema, rcol);
SCellVal sVal = {0};
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(pRow, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE, rcol) < 0){
}
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
tdAppendValToDataCol(pDataCol, value, pCols->numOfRows, pCols->maxPoints);
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull || setCol) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
tdAppendValToDataCol(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints);
}
dcol++;
}
......@@ -514,10 +505,10 @@ static void tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *p
static void tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow));
int rcol = 0;
int dcol = 0;
int nRowCols = TD_ROW_NCOLS(pRow);
int rcol = 0;
int dcol = 0;
int nRowCols = TD_ROW_NCOLS(pRow);
void *pBitmap = tdGetBitmapAddrKv(pRow, nRowCols);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
......@@ -548,6 +539,14 @@ static void tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *p
pCols->numOfRows++;
}
/**
* @brief
*
* @param pRow
* @param pSchema
* @param pCols
* @param forceSetNull
*/
void tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) {
if (TD_IS_TP_ROW(pRow)) {
tdAppendTpRowToDataCol(pRow, pSchema, pCols, forceSetNull);
......
......@@ -49,6 +49,23 @@ typedef struct {
TSKEY keyLast;
} SBlock;
typedef struct {
int64_t last : 1;
int64_t offset : 63;
int32_t algorithm : 8;
int32_t numOfRows : 24;
uint8_t reserve0;
uint8_t numOfSubBlocks;
int16_t numOfCols; // not including timestamp column
uint32_t len : 32; // data block length
uint32_t keyLen : 24; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
uint32_t reserve1 : 8;
uint64_t blkVer : 8;
uint64_t aggrOffset : 56;
TSKEY keyFirst;
TSKEY keyLast;
} SBlock_3;
typedef struct {
int32_t delimiter; // For recovery usage
int32_t tid;
......@@ -58,6 +75,8 @@ typedef struct {
typedef struct {
int16_t colId;
int16_t bitmap : 1; // 0: no bitmap if all rows not null, 1: has bitmap if has null rows
int16_t reserve : 15;
int32_t len;
uint32_t type : 8;
uint32_t offset : 24;
......
......@@ -89,7 +89,7 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
SDiskID did;
SDFileSet nSet;
SDFileSet nSet = {0};
STsdbFS * pfs = REPO_FS(pRepo);
int level;
......
......@@ -98,13 +98,13 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
}
static void *tsdbDecodeDFileSetArray(STsdb*pRepo, void *buf, SArray *pArray) {
uint64_t nset;
SDFileSet dset;
uint64_t nset = 0;
taosArrayClear(pArray);
buf = taosDecodeFixedU64(buf, &nset);
for (size_t i = 0; i < nset; i++) {
SDFileSet dset = {0};
buf = tsdbDecodeDFileSet(pRepo, buf, &dset);
taosArrayPush(pArray, (void *)(&dset));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册