提交 fdad406e 编写于 作者: S Shengliang Guan 提交者: Shuduo Sang

Merge pull request #2024 from taosdata/feature/query

Feature/query
...@@ -30,7 +30,7 @@ extern int32_t cDebugFlag; ...@@ -30,7 +30,7 @@ extern int32_t cDebugFlag;
} }
#define tscWarn(...) \ #define tscWarn(...) \
if (cDebugFlag & DEBUG_WARN) { \ if (cDebugFlag & DEBUG_WARN) { \
taosPrintLog("WARN TSC ", cDebugFlag, __VA_ARGS__); \ taosPrintLog("WARN TSC ", cDebugFlag, __VA_ARGS__); \
} }
#define tscTrace(...) \ #define tscTrace(...) \
if (cDebugFlag & DEBUG_TRACE) { \ if (cDebugFlag & DEBUG_TRACE) { \
......
...@@ -53,11 +53,7 @@ typedef struct STableComInfo { ...@@ -53,11 +53,7 @@ typedef struct STableComInfo {
} STableComInfo; } STableComInfo;
typedef struct STableMeta { typedef struct STableMeta {
// super table if it is created according to super table, otherwise, tableInfo is used STableComInfo tableInfo;
union {
struct STableMeta *pSTable;
STableComInfo tableInfo;
};
uint8_t tableType; uint8_t tableType;
int16_t sversion; int16_t sversion;
SCMVgroupInfo vgroupInfo; SCMVgroupInfo vgroupInfo;
......
...@@ -154,8 +154,8 @@ typedef struct SDataCol { ...@@ -154,8 +154,8 @@ typedef struct SDataCol {
static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; }
void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints);
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints); void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints);
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints); void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows);
void dataColSetOffset(SDataCol *pCol, int nEle); void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle);
...@@ -195,7 +195,7 @@ typedef struct { ...@@ -195,7 +195,7 @@ typedef struct {
int maxPoints; // max number of points int maxPoints; // max number of points
int bufSize; int bufSize;
int numOfPoints; int numOfRows;
int numOfCols; // Total number of cols int numOfCols; // Total number of cols
int sversion; // TODO: set sversion int sversion; // TODO: set sversion
void * buf; void * buf;
...@@ -205,7 +205,7 @@ typedef struct { ...@@ -205,7 +205,7 @@ typedef struct {
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column #define keyCol(pCols) (&((pCols)->cols[0])) // Key column
#define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)] #define dataColsKeyAt(pCols, idx) ((TSKEY *)(keyCol(pCols)->pData))[(idx)]
#define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0) #define dataColsKeyFirst(pCols) dataColsKeyAt(pCols, 0)
#define dataColsKeyLast(pCols) ((pCols->numOfPoints == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfPoints - 1)) #define dataColsKeyLast(pCols) ((pCols->numOfRows == 0) ? 0 : dataColsKeyAt(pCols, (pCols)->numOfRows - 1))
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows); SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols); void tdResetDataCols(SDataCols *pCols);
......
...@@ -187,29 +187,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) ...@@ -187,29 +187,29 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints)
} }
void dataColAppendVal(SDataCol *pCol, void *value, int numOfPoints, int maxPoints) { void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) {
ASSERT(pCol != NULL && value != NULL); ASSERT(pCol != NULL && value != NULL);
switch (pCol->type) { switch (pCol->type) {
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
// set offset // set offset
pCol->dataOff[numOfPoints] = pCol->len; pCol->dataOff[numOfRows] = pCol->len;
// Copy data // Copy data
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value));
// Update the length // Update the length
pCol->len += varDataTLen(value); pCol->len += varDataTLen(value);
break; break;
default: default:
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes);
pCol->len += pCol->bytes; pCol->len += pCol->bytes;
break; break;
} }
} }
void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) { void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfRows) {
int pointsLeft = numOfPoints - pointsToPop; int pointsLeft = numOfRows - pointsToPop;
ASSERT(pointsLeft > 0); ASSERT(pointsLeft > 0);
...@@ -221,7 +221,7 @@ void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) { ...@@ -221,7 +221,7 @@ void dataColPopPoints(SDataCol *pCol, int pointsToPop, int numOfPoints) {
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len); memmove(pCol->pData, POINTER_SHIFT(pCol->pData, toffset), pCol->len);
dataColSetOffset(pCol, pointsLeft); dataColSetOffset(pCol, pointsLeft);
} else { } else {
ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfPoints); ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows);
pCol->len = TYPE_BYTES[pCol->type] * pointsLeft; pCol->len = TYPE_BYTES[pCol->type] * pointsLeft;
memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len); memmove(pCol->pData, POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * pointsToPop), pCol->len);
} }
...@@ -322,7 +322,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -322,7 +322,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
pRet->numOfCols = pDataCols->numOfCols; pRet->numOfCols = pDataCols->numOfCols;
pRet->sversion = pDataCols->sversion; pRet->sversion = pDataCols->sversion;
if (keepData) pRet->numOfPoints = pDataCols->numOfPoints; if (keepData) pRet->numOfRows = pDataCols->numOfRows;
for (int i = 0; i < pDataCols->numOfCols; i++) { for (int i = 0; i < pDataCols->numOfCols; i++) {
pRet->cols[i].type = pDataCols->cols[i].type; pRet->cols[i].type = pDataCols->cols[i].type;
...@@ -352,7 +352,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { ...@@ -352,7 +352,7 @@ SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
} }
void tdResetDataCols(SDataCols *pCols) { void tdResetDataCols(SDataCols *pCols) {
pCols->numOfPoints = 0; pCols->numOfRows = 0;
for (int i = 0; i < pCols->maxCols; i++) { for (int i = 0; i < pCols->maxCols; i++) {
dataColReset(pCols->cols + i); dataColReset(pCols->cols + i);
} }
...@@ -365,14 +365,14 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { ...@@ -365,14 +365,14 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) {
SDataCol *pCol = pCols->cols + i; SDataCol *pCol = pCols->cols + i;
void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset); void * value = tdGetRowDataOfCol(row, pCol->type, pCol->offset);
dataColAppendVal(pCol, value, pCols->numOfPoints, pCols->maxPoints); dataColAppendVal(pCol, value, pCols->numOfRows, pCols->maxPoints);
} }
pCols->numOfPoints++; pCols->numOfRows++;
} }
// Pop pointsToPop points from the SDataCols // Pop pointsToPop points from the SDataCols
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
int pointsLeft = pCols->numOfPoints - pointsToPop; int pointsLeft = pCols->numOfRows - pointsToPop;
if (pointsLeft <= 0) { if (pointsLeft <= 0) {
tdResetDataCols(pCols); tdResetDataCols(pCols);
return; return;
...@@ -380,14 +380,14 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { ...@@ -380,14 +380,14 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) { for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pCol = pCols->cols + iCol; SDataCol *pCol = pCols->cols + iCol;
dataColPopPoints(pCol, pointsToPop, pCols->numOfPoints); dataColPopPoints(pCol, pointsToPop, pCols->numOfRows);
} }
pCols->numOfPoints = pointsLeft; pCols->numOfRows = pointsLeft;
} }
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints); ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows);
ASSERT(target->numOfPoints + rowsToMerge <= target->maxPoints); ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints);
ASSERT(target->numOfCols == source->numOfCols); ASSERT(target->numOfCols == source->numOfCols);
SDataCols *pTarget = NULL; SDataCols *pTarget = NULL;
...@@ -395,10 +395,10 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { ...@@ -395,10 +395,10 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap if (dataColsKeyLast(target) < dataColsKeyFirst(source)) { // No overlap
for (int i = 0; i < rowsToMerge; i++) { for (int i = 0; i < rowsToMerge; i++) {
for (int j = 0; j < source->numOfCols; j++) { for (int j = 0; j < source->numOfCols; j++) {
dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfPoints, dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i), target->numOfRows,
target->maxPoints); target->maxPoints);
} }
target->numOfPoints++; target->numOfRows++;
} }
} else { } else {
pTarget = tdDupDataCols(target, true); pTarget = tdDupDataCols(target, true);
...@@ -406,7 +406,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) { ...@@ -406,7 +406,7 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
int iter1 = 0; int iter1 = 0;
int iter2 = 0; int iter2 = 0;
tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge); tdMergeTwoDataCols(target, pTarget, &iter1, source, &iter2, pTarget->numOfRows + rowsToMerge);
} }
tdFreeDataCols(pTarget); tdFreeDataCols(pTarget);
...@@ -421,30 +421,30 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol ...@@ -421,30 +421,30 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol
// TODO: add resolve duplicate key here // TODO: add resolve duplicate key here
tdResetDataCols(target); tdResetDataCols(target);
while (target->numOfPoints < tRows) { while (target->numOfRows < tRows) {
if (*iter1 >= src1->numOfPoints && *iter2 >= src2->numOfPoints) break; if (*iter1 >= src1->numOfRows && *iter2 >= src2->numOfRows) break;
TSKEY key1 = (*iter1 >= src1->numOfPoints) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1]; TSKEY key1 = (*iter1 >= src1->numOfRows) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
TSKEY key2 = (*iter2 >= src2->numOfPoints) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2]; TSKEY key2 = (*iter2 >= src2->numOfRows) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
if (key1 <= key2) { if (key1 <= key2) {
for (int i = 0; i < src1->numOfCols; i++) { for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type); ASSERT(target->cols[i].type == src1->cols[i].type);
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfPoints, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows,
target->maxPoints); target->maxPoints);
} }
target->numOfPoints++; target->numOfRows++;
(*iter1)++; (*iter1)++;
if (key1 == key2) (*iter2)++; if (key1 == key2) (*iter2)++;
} else { } else {
for (int i = 0; i < src2->numOfCols; i++) { for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type); ASSERT(target->cols[i].type == src2->cols[i].type);
dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfPoints, dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows,
target->maxPoints); target->maxPoints);
} }
target->numOfPoints++; target->numOfRows++;
(*iter2)++; (*iter2)++;
} }
} }
......
...@@ -273,6 +273,46 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num ...@@ -273,6 +273,46 @@ static void getStatics_d(const TSKEY *primaryKey, const void *pData, int32_t num
#endif #endif
} }
static void getStatics_bin(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
const char* data = pData;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_BINARY)) {
(*numOfNull) += 1;
}
data += varDataLen(data);
}
*sum = 0;
*max = 0;
*min = 0;
*minIndex = 0;
*maxIndex = 0;
}
static void getStatics_nchr(const TSKEY *primaryKey, const void *pData, int32_t numOfRow, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int16_t *numOfNull) {
const char* data = pData;
ASSERT(numOfRow <= INT16_MAX);
for (int32_t i = 0; i < numOfRow; ++i) {
if (isNull((const char*) varDataVal(data), TSDB_DATA_TYPE_NCHAR)) {
(*numOfNull) += 1;
}
data += varDataLen(data);
}
*sum = 0;
*max = 0;
*min = 0;
*minIndex = 0;
*maxIndex = 0;
}
tDataTypeDescriptor tDataTypeDesc[11] = { tDataTypeDescriptor tDataTypeDesc[11] = {
{TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL, NULL}, {TSDB_DATA_TYPE_NULL, 6, 1, "NOTYPE", NULL, NULL, NULL},
{TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, getStatics_i8}, {TSDB_DATA_TYPE_BOOL, 4, CHAR_BYTES, "BOOL", tsCompressBool, tsDecompressBool, getStatics_i8},
...@@ -282,9 +322,9 @@ tDataTypeDescriptor tDataTypeDesc[11] = { ...@@ -282,9 +322,9 @@ tDataTypeDescriptor tDataTypeDesc[11] = {
{TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint, getStatics_i64}, {TSDB_DATA_TYPE_BIGINT, 6, LONG_BYTES, "BIGINT", tsCompressBigint, tsDecompressBigint, getStatics_i64},
{TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat, getStatics_f}, {TSDB_DATA_TYPE_FLOAT, 5, FLOAT_BYTES, "FLOAT", tsCompressFloat, tsDecompressFloat, getStatics_f},
{TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble, getStatics_d}, {TSDB_DATA_TYPE_DOUBLE, 6, DOUBLE_BYTES, "DOUBLE", tsCompressDouble, tsDecompressDouble, getStatics_d},
{TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, NULL}, {TSDB_DATA_TYPE_BINARY, 6, 0, "BINARY", tsCompressString, tsDecompressString, getStatics_bin},
{TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64}, {TSDB_DATA_TYPE_TIMESTAMP, 9, LONG_BYTES, "TIMESTAMP", tsCompressTimestamp, tsDecompressTimestamp, getStatics_i64},
{TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, NULL}, {TSDB_DATA_TYPE_NCHAR, 5, 8, "NCHAR", tsCompressString, tsDecompressString, getStatics_nchr},
}; };
char tTokenTypeSwitcher[13] = { char tTokenTypeSwitcher[13] = {
......
...@@ -200,7 +200,7 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter * ...@@ -200,7 +200,7 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
return false; return false;
} else { // prev has been located } else { // prev has been located
if (pQuery->fileId >= 0) { if (pQuery->fileId >= 0) {
pQuery->pos = pQuery->pBlock[pQuery->slot].numOfPoints - 1; pQuery->pos = pQuery->pBlock[pQuery->slot].numOfRows - 1;
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
...@@ -210,11 +210,11 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter * ...@@ -210,11 +210,11 @@ bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *
assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD); assert(vnodeIsDatablockLoaded(pRuntimeEnv, pMeterObj, -1, true) == DISK_BLOCK_NO_NEED_TO_LOAD);
pBlock = &pRuntimeEnv->cacheBlock; pBlock = &pRuntimeEnv->cacheBlock;
pQuery->pos = pBlock->numOfPoints - 1; pQuery->pos = pBlock->numOfRows - 1;
getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos); getOneRowFromDataBlock(pRuntimeEnv, pPointInterpSupporter->pPrevPoint, pQuery->pos);
qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery), qTrace("QInfo:%p get prev data point, fileId:%d, slot:%d, pos:%d, pQuery->pos:%d", GET_QINFO_ADDR(pQuery),
pQuery->fileId, pQuery->slot, pBlock->numOfPoints - 1, pQuery->pos); pQuery->fileId, pQuery->slot, pBlock->numOfRows - 1, pQuery->pos);
} }
} }
} }
...@@ -603,9 +603,9 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int ...@@ -603,9 +603,9 @@ static SWindowStatus *getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int
return &pWindowResInfo->pResult[slot].status; return &pWindowResInfo->pResult[slot].status;
} }
static int32_t getForwardStepsInBlock(int32_t numOfPoints, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, static int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos,
int16_t order, int64_t *pData) { int16_t order, int64_t *pData) {
int32_t endPos = searchFn((char *)pData, numOfPoints, ekey, order); int32_t endPos = searchFn((char *)pData, numOfRows, ekey, order);
int32_t forwardStep = 0; int32_t forwardStep = 0;
if (endPos >= 0) { if (endPos >= 0) {
...@@ -2329,7 +2329,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, ...@@ -2329,7 +2329,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
int32_t midPos = -1; int32_t midPos = -1;
int32_t numOfPoints; int32_t numOfRows;
if (num <= 0) { if (num <= 0) {
return -1; return -1;
...@@ -2348,8 +2348,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { ...@@ -2348,8 +2348,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
if (key == keyList[firstPos]) return firstPos; if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1; if (key < keyList[firstPos]) return firstPos - 1;
numOfPoints = lastPos - firstPos + 1; numOfRows = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
...@@ -2374,8 +2374,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { ...@@ -2374,8 +2374,8 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
return lastPos; return lastPos;
} }
numOfPoints = lastPos - firstPos + 1; numOfRows = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
......
...@@ -74,7 +74,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh); ...@@ -74,7 +74,7 @@ void tsdbCloseMetaFile(SMetaFile *mfh);
typedef struct { typedef struct {
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
int32_t numOfPoints; int32_t numOfRows;
void * pData; void * pData;
} SMemTable; } SMemTable;
...@@ -173,7 +173,7 @@ typedef struct { ...@@ -173,7 +173,7 @@ typedef struct {
typedef struct { typedef struct {
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
int64_t numOfPoints; int64_t numOfRows;
SList * list; SList * list;
} SCacheMem; } SCacheMem;
...@@ -294,7 +294,7 @@ typedef struct { ...@@ -294,7 +294,7 @@ typedef struct {
int64_t last : 1; // If the block in data file or last file int64_t last : 1; // If the block in data file or last file
int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks
int32_t algorithm : 8; // Compression algorithm int32_t algorithm : 8; // Compression algorithm
int32_t numOfPoints : 24; // Number of total points int32_t numOfRows : 24; // Number of total points
int32_t sversion; // Schema version int32_t sversion; // Schema version
int32_t len; // Data block length or nothing int32_t len; // Data block length or nothing
int16_t numOfSubBlocks; // Number of sub-blocks; int16_t numOfSubBlocks; // Number of sub-blocks;
......
...@@ -82,7 +82,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { ...@@ -82,7 +82,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
memset(ptr, 0, bytes); memset(ptr, 0, bytes);
if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key; if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key;
if (key > pCache->mem->keyLast) pCache->mem->keyLast = key; if (key > pCache->mem->keyLast) pCache->mem->keyLast = key;
pCache->mem->numOfPoints++; pCache->mem->numOfRows++;
return ptr; return ptr;
} }
...@@ -127,7 +127,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { ...@@ -127,7 +127,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
if (pCache->mem == NULL) return -1; if (pCache->mem == NULL) return -1;
pCache->mem->keyFirst = INT64_MAX; pCache->mem->keyFirst = INT64_MAX;
pCache->mem->keyLast = 0; pCache->mem->keyLast = 0;
pCache->mem->numOfPoints = 0; pCache->mem->numOfRows = 0;
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
} }
......
...@@ -233,10 +233,10 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) { ...@@ -233,10 +233,10 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
// SCompBlock *pBlock = pStartBlock; // SCompBlock *pBlock = pStartBlock;
// for (int i = 0; i < numOfBlocks; i++) { // for (int i = 0; i < numOfBlocks; i++) {
// if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; // if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1;
// pCols->numOfPoints += (pCompData->cols[0].len / 8); // pCols->numOfRows += (pCompData->cols[0].len / 8);
// for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { // for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) {
// SCompCol *pCompCol = &(pCompData->cols[iCol]); // SCompCol *pCompCol = &(pCompData->cols[iCol]);
// // pCols->numOfPoints += pBlock->numOfPoints; // // pCols->numOfRows += pBlock->numOfRows;
// int k = 0; // int k = 0;
// for (; k < pCols->numOfCols; k++) { // for (; k < pCols->numOfCols; k++) {
// if (pCompCol->colId == pCols->cols[k].colId) break; // if (pCompCol->colId == pCols->cols[k].colId) break;
......
...@@ -830,7 +830,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -830,7 +830,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize); tSkipListNewNodeInfo(pTable->mem->pData, &level, &headSize);
TSKEY key = dataRowKey(row); TSKEY key = dataRowKey(row);
// printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints); // printf("insert:%lld, size:%d\n", key, pTable->mem->numOfRows);
// Copy row into the memory // Copy row into the memory
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key); SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
...@@ -854,7 +854,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -854,7 +854,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
if (key > pTable->lastKey) pTable->lastKey = key; if (key > pTable->lastKey) pTable->lastKey = key;
pTable->mem->numOfPoints = tSkipListGetSize(pTable->mem->pData); pTable->mem->numOfRows = tSkipListGetSize(pTable->mem->pData);
tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", table:%s a row is inserted to table! key:%" PRId64, pRepo->config.tsdbId, tsdbTrace("vgId:%d, tid:%d, uid:%" PRId64 ", table:%s a row is inserted to table! key:%" PRId64, pRepo->config.tsdbId,
pTable->tableId.tid, pTable->tableId.uid, varDataVal(pTable->name), dataRowKey(row)); pTable->tableId.tid, pTable->tableId.uid, varDataVal(pTable->name), dataRowKey(row));
...@@ -1063,7 +1063,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -1063,7 +1063,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
while (true) { while (true) {
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols); int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
assert(rowsRead >= 0); assert(rowsRead >= 0);
if (pDataCols->numOfPoints == 0) break; if (pDataCols->numOfRows == 0) break;
nLoop++; nLoop++;
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
...@@ -1072,13 +1072,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters ...@@ -1072,13 +1072,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
ASSERT(rowsWritten != 0); ASSERT(rowsWritten != 0);
if (rowsWritten < 0) goto _err; if (rowsWritten < 0) goto _err;
ASSERT(rowsWritten <= pDataCols->numOfPoints); ASSERT(rowsWritten <= pDataCols->numOfRows);
tdPopDataColsPoints(pDataCols, rowsWritten); tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints; maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows;
} }
ASSERT(pDataCols->numOfPoints == 0); ASSERT(pDataCols->numOfRows == 0);
// Move the last block to the new .l file if neccessary // Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
......
...@@ -307,7 +307,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) { ...@@ -307,7 +307,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
*/ */
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER); ASSERT(TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER);
ASSERT(pDataCols->numOfPoints > 0); ASSERT(pDataCols->numOfRows > 0);
SCompBlock compBlock; SCompBlock compBlock;
int rowsToWrite = 0; int rowsToWrite = 0;
...@@ -322,7 +322,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) { ...@@ -322,7 +322,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block if (pIdx->offset == 0 || (!pIdx->hasLast && keyFirst > pIdx->maxKey)) { // Just append as a super block
ASSERT(pHelper->hasOldLastBlock == false); ASSERT(pHelper->hasOldLastBlock == false);
rowsToWrite = pDataCols->numOfPoints; rowsToWrite = pDataCols->numOfRows;
SFile *pWFile = NULL; SFile *pWFile = NULL;
bool isLast = false; bool isLast = false;
...@@ -380,10 +380,10 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) { ...@@ -380,10 +380,10 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
if (pCompBlock->numOfSubBlocks > 1) { if (pCompBlock->numOfSubBlocks > 1) {
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfBlocks - 1), NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 && ASSERT(pHelper->pDataCols[0]->numOfRows > 0 &&
pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock); pHelper->pDataCols[0]->numOfRows < pHelper->config.minRowsPerFileBlock);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfPoints, &compBlock, true, true) < 0) pHelper->pDataCols[0]->numOfRows, &compBlock, true, true) < 0)
return -1; return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
...@@ -625,13 +625,13 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, ...@@ -625,13 +625,13 @@ int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx,
for (int i = 1; i < numOfSubBlocks; i++) { for (int i = 1; i < numOfSubBlocks; i++) {
pStartBlock++; pStartBlock++;
if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1; if (tsdbLoadSingleBlockDataCols(pHelper, pStartBlock, colIds, numOfColIds, pHelper->pDataCols[1]) < 0) return -1;
tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints); tdMergeDataCols(pDataCols, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows);
} }
return 0; return 0;
} }
static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfPoints, static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows,
int maxPoints, char *buffer, int bufferSize) { int maxPoints, char *buffer, int bufferSize) {
// Verify by checksum // Verify by checksum
if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1; if (!taosCheckChecksumWhole((uint8_t *)content, len)) return -1;
...@@ -640,16 +640,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32 ...@@ -640,16 +640,16 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32
if (comp) { if (comp) {
// // Need to decompress // // Need to decompress
pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))( pDataCol->len = (*(tDataTypeDesc[pDataCol->type].decompFunc))(
content, len - sizeof(TSCKSUM), numOfPoints, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize); content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize);
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
dataColSetOffset(pDataCol, numOfPoints); dataColSetOffset(pDataCol, numOfRows);
} }
} else { } else {
// No need to decompress, just memcpy it // No need to decompress, just memcpy it
pDataCol->len = len - sizeof(TSCKSUM); pDataCol->len = len - sizeof(TSCKSUM);
memcpy(pDataCol->pData, content, pDataCol->len); memcpy(pDataCol->pData, content, pDataCol->len);
if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) { if (pDataCol->type == TSDB_DATA_TYPE_BINARY || pDataCol->type == TSDB_DATA_TYPE_NCHAR) {
dataColSetOffset(pDataCol, numOfPoints); dataColSetOffset(pDataCol, numOfRows);
} }
} }
return 0; return 0;
...@@ -673,7 +673,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -673,7 +673,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM); int32_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err; if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) goto _err;
pDataCols->numOfPoints = pCompBlock->numOfPoints; pDataCols->numOfRows = pCompBlock->numOfRows;
// Recover the data // Recover the data
int ccol = 0; int ccol = 0;
...@@ -682,7 +682,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -682,7 +682,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
SDataCol *pDataCol = &(pDataCols->cols[dcol]); SDataCol *pDataCol = &(pDataCols->cols[dcol]);
if (ccol >= pCompData->numOfCols) { if (ccol >= pCompData->numOfCols) {
// Set current column as NULL and forward // Set current column as NULL and forward
dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
dcol++; dcol++;
continue; continue;
} }
...@@ -691,15 +691,15 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -691,15 +691,15 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
if (pCompCol->colId == pDataCol->colId) { if (pCompCol->colId == pDataCol->colId) {
if (pCompBlock->algorithm == TWO_STAGE_COMP) { if (pCompBlock->algorithm == TWO_STAGE_COMP) {
int zsize = pDataCol->bytes * pCompBlock->numOfPoints + COMP_OVERFLOW_BYTES; int zsize = pDataCol->bytes * pCompBlock->numOfRows + COMP_OVERFLOW_BYTES;
if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) { if (pCompCol->type == TSDB_DATA_TYPE_BINARY || pCompCol->type == TSDB_DATA_TYPE_NCHAR) {
zsize += (sizeof(VarDataLenT) * pCompBlock->numOfPoints); zsize += (sizeof(VarDataLenT) * pCompBlock->numOfRows);
} }
pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize); pHelper->compBuffer = trealloc(pHelper->compBuffer, zsize);
if (pHelper->compBuffer == NULL) goto _err; if (pHelper->compBuffer == NULL) goto _err;
} }
if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len, if (tsdbCheckAndDecodeColumnData(pDataCol, (char *)pCompData + tsize + pCompCol->offset, pCompCol->len,
pCompBlock->algorithm, pCompBlock->numOfPoints, pDataCols->maxPoints, pCompBlock->algorithm, pCompBlock->numOfRows, pDataCols->maxPoints,
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0)
goto _err; goto _err;
dcol++; dcol++;
...@@ -708,7 +708,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa ...@@ -708,7 +708,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
ccol++; ccol++;
} else { } else {
// Set current column as NULL and forward // Set current column as NULL and forward
dataColSetNEleNull(pDataCol, pCompBlock->numOfPoints, pDataCols->maxPoints); dataColSetNEleNull(pDataCol, pCompBlock->numOfRows, pDataCols->maxPoints);
dcol++; dcol++;
} }
} }
...@@ -732,7 +732,7 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar ...@@ -732,7 +732,7 @@ int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *tar
tdResetDataCols(pHelper->pDataCols[1]); tdResetDataCols(pHelper->pDataCols[1]);
pCompBlock++; pCompBlock++;
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err; if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints) < 0) goto _err; if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
} }
// if (target) TODO // if (target) TODO
...@@ -753,7 +753,7 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) { ...@@ -753,7 +753,7 @@ static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock, static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, SCompBlock *pCompBlock,
bool isLast, bool isSuperBlock) { bool isLast, bool isSuperBlock) {
ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfPoints && ASSERT(rowsToWrite > 0 && rowsToWrite <= pDataCols->numOfRows &&
rowsToWrite <= pHelper->config.maxRowsPerFileBlock); rowsToWrite <= pHelper->config.maxRowsPerFileBlock);
SCompData *pCompData = (SCompData *)(pHelper->pBuffer); SCompData *pCompData = (SCompData *)(pHelper->pBuffer);
...@@ -840,7 +840,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa ...@@ -840,7 +840,7 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
pCompBlock->last = isLast; pCompBlock->last = isLast;
pCompBlock->offset = offset; pCompBlock->offset = offset;
pCompBlock->algorithm = pHelper->config.compress; pCompBlock->algorithm = pHelper->config.compress;
pCompBlock->numOfPoints = rowsToWrite; pCompBlock->numOfRows = rowsToWrite;
pCompBlock->sversion = pHelper->tableInfo.sversion; pCompBlock->sversion = pHelper->tableInfo.sversion;
pCompBlock->len = (int32_t)lsize; pCompBlock->len = (int32_t)lsize;
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
...@@ -877,7 +877,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -877,7 +877,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
int rowsWritten = 0; int rowsWritten = 0;
SCompBlock compBlock = {0}; SCompBlock compBlock = {0};
ASSERT(pDataCols->numOfPoints > 0); ASSERT(pDataCols->numOfRows > 0);
TSKEY keyFirst = dataColsKeyFirst(pDataCols); TSKEY keyFirst = dataColsKeyFirst(pDataCols);
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid; SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
...@@ -889,32 +889,32 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -889,32 +889,32 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0); // ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1); ASSERT(blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1);
int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints); rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfRows), pDataCols->numOfRows);
if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) && if ((blockAtIdx(pHelper, blkIdx)->numOfSubBlocks < TSDB_MAX_SUBBLOCKS) &&
(blockAtIdx(pHelper, blkIdx)->numOfPoints + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) { (blockAtIdx(pHelper, blkIdx)->numOfRows + rowsWritten < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd) > 0) {
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0) if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, rowsWritten, &compBlock, true, false) < 0)
goto _err; goto _err;
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err; if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
} else { } else {
// Load // Load
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err; if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints); ASSERT(pHelper->pDataCols[0]->numOfRows == blockAtIdx(pHelper, blkIdx)->numOfRows);
// Merge // Merge
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err; if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
// Write // Write
SFile *pWFile = NULL; SFile *pWFile = NULL;
bool isLast = false; bool isLast = false;
if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.minRowsPerFileBlock) { if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.minRowsPerFileBlock) {
pWFile = &(pHelper->files.dataF); pWFile = &(pHelper->files.dataF);
} else { } else {
isLast = true; isLast = true;
pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF); pWFile = (pHelper->files.nLastF.fd > 0) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
} }
if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0], if (tsdbWriteBlockToFile(pHelper, pWFile, pHelper->pDataCols[0],
pHelper->pDataCols[0]->numOfPoints, &compBlock, isLast, true) < 0) pHelper->pDataCols[0]->numOfRows, &compBlock, isLast, true) < 0)
goto _err; goto _err;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err; if (tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx) < 0) goto _err;
} }
...@@ -931,7 +931,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -931,7 +931,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// rows1: number of rows must merge in this block // rows1: number of rows must merge in this block
int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast); int rows1 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, blockAtIdx(pHelper, blkIdx)->keyLast);
// rows2: max nuber of rows the block can have more // rows2: max nuber of rows the block can have more
int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfPoints; int rows2 = pHelper->config.maxRowsPerFileBlock - blockAtIdx(pHelper, blkIdx)->numOfRows;
// rows3: number of rows between this block and the next block // rows3: number of rows between this block and the next block
int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit); int rows3 = tsdbGetRowsInRange(pDataCols, blockAtIdx(pHelper, blkIdx)->keyFirst, keyLimit);
...@@ -939,7 +939,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -939,7 +939,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
if ((rows2 >= rows1) && if ((rows2 >= rows1) &&
(( blockAtIdx(pHelper, blkIdx)->last) || (( blockAtIdx(pHelper, blkIdx)->last) ||
((rows1 + blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) { ((rows1 + blockAtIdx(pHelper, blkIdx)->numOfRows < pHelper->config.minRowsPerFileBlock) && (pHelper->files.nLastF.fd < 0)))) {
rowsWritten = rows1; rowsWritten = rows1;
bool isLast = false; bool isLast = false;
SFile *pFile = NULL; SFile *pFile = NULL;
...@@ -965,11 +965,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -965,11 +965,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
int round = 0; int round = 0;
// tdResetDataCols(pHelper->pDataCols[1]); // tdResetDataCols(pHelper->pDataCols[1]);
while (true) { while (true) {
if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) break; if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) break;
tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5); tdMergeTwoDataCols(pHelper->pDataCols[1], pHelper->pDataCols[0], &iter1, pDataCols, &iter2, pHelper->config.maxRowsPerFileBlock * 4 / 5);
ASSERT(pHelper->pDataCols[1]->numOfPoints > 0); ASSERT(pHelper->pDataCols[1]->numOfRows > 0);
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1], if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pHelper->pDataCols[1],
pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
goto _err; goto _err;
if (round == 0) { if (round == 0) {
tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx); tsdbUpdateSuperBlock(pHelper, &compBlock, blkIdx);
...@@ -980,17 +980,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -980,17 +980,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
blkIdx++; blkIdx++;
// TODO: the blkIdx here is not correct // TODO: the blkIdx here is not correct
// if (iter1 >= pHelper->pDataCols[0]->numOfPoints && iter2 >= rows3) { // if (iter1 >= pHelper->pDataCols[0]->numOfRows && iter2 >= rows3) {
// if (pHelper->pDataCols[1]->numOfPoints > 0) { // if (pHelper->pDataCols[1]->numOfRows > 0) {
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1],
// pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) // pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0)
// goto _err; // goto _err;
// // TODO: the blkIdx here is not correct // // TODO: the blkIdx here is not correct
// tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfPoints); // tsdbAddSubBlock(pHelper, &compBlock, blkIdx, pHelper->pDataCols[1]->numOfRows);
// } // }
// } // }
// TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfPoints // TSKEY key1 = iter1 >= pHelper->pDataCols[0]->numOfRows
// ? INT64_MAX // ? INT64_MAX
// : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1]; // : ((int64_t *)(pHelper->pDataCols[0]->cols[0].pData))[iter1];
// TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2]; // TSKEY key2 = iter2 >= rowsWritten ? INT64_MAX : ((int64_t *)(pDataCols->cols[0].pData))[iter2];
...@@ -998,11 +998,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -998,11 +998,11 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// if (key1 < key2) { // if (key1 < key2) {
// for (int i = 0; i < pDataCols->numOfCols; i++) { // for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
// ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1), // ((char *)pHelper->pDataCols[0]->cols[i].pData + TYPE_BYTES[pDataCol->type] * iter1),
// TYPE_BYTES[pDataCol->type]); // TYPE_BYTES[pDataCol->type]);
// } // }
// pHelper->pDataCols[1]->numOfPoints++; // pHelper->pDataCols[1]->numOfRows++;
// iter1++; // iter1++;
// } else if (key1 == key2) { // } else if (key1 == key2) {
// // TODO: think about duplicate key cases // // TODO: think about duplicate key cases
...@@ -1010,17 +1010,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa ...@@ -1010,17 +1010,17 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
// } else { // } else {
// for (int i = 0; i < pDataCols->numOfCols; i++) { // for (int i = 0; i < pDataCols->numOfCols; i++) {
// SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i; // SDataCol *pDataCol = pHelper->pDataCols[1]->cols + i;
// memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfPoints), // memcpy(((char *)pDataCol->pData + TYPE_BYTES[pDataCol->type] * pHelper->pDataCols[1]->numOfRows),
// ((char *)pDataCols->cols[i].pData + // ((char *)pDataCols->cols[i].pData +
// TYPE_BYTES[pDataCol->type] * iter2), // TYPE_BYTES[pDataCol->type] * iter2),
// TYPE_BYTES[pDataCol->type]); // TYPE_BYTES[pDataCol->type]);
// } // }
// pHelper->pDataCols[1]->numOfPoints++; // pHelper->pDataCols[1]->numOfRows++;
// iter2++; // iter2++;
// } // }
// if (pHelper->pDataCols[0]->numOfPoints >= pHelper->config.maxRowsPerFileBlock * 4 / 5) { // if (pHelper->pDataCols[0]->numOfRows >= pHelper->config.maxRowsPerFileBlock * 4 / 5) {
// if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfPoints, &compBlock, false, true) < 0) goto _err; // if (tsdbWriteBlockToFile(pHelper, &pHelper->files.dataF, pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows, &compBlock, false, true) < 0) goto _err;
// // TODO: blkIdx here is not correct, fix it // // TODO: blkIdx here is not correct, fix it
// tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx); // tsdbInsertSuperBlock(pHelper, &compBlock, blkIdx);
...@@ -1133,7 +1133,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -1133,7 +1133,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
pSCompBlock->numOfSubBlocks++; pSCompBlock->numOfSubBlocks++;
ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
pSCompBlock->len += sizeof(SCompBlock); pSCompBlock->len += sizeof(SCompBlock);
pSCompBlock->numOfPoints += rowsAdded; pSCompBlock->numOfRows += rowsAdded;
pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst); pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast); pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
pIdx->len += sizeof(SCompBlock); pIdx->len += sizeof(SCompBlock);
...@@ -1164,7 +1164,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId ...@@ -1164,7 +1164,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
((SCompBlock *)ptr)[1] = *pCompBlock; ((SCompBlock *)ptr)[1] = *pCompBlock;
pSCompBlock->numOfSubBlocks = 2; pSCompBlock->numOfSubBlocks = 2;
pSCompBlock->numOfPoints += rowsAdded; pSCompBlock->numOfRows += rowsAdded;
pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
pSCompBlock->len = sizeof(SCompBlock) * 2; pSCompBlock->len = sizeof(SCompBlock) * 2;
pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst); pSCompBlock->keyFirst = MIN(((SCompBlock *)ptr)[0].keyFirst, ((SCompBlock *)ptr)[1].keyFirst);
...@@ -1219,7 +1219,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ...@@ -1219,7 +1219,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
// Get the number of rows in range [minKey, maxKey] // Get the number of rows in range [minKey, maxKey]
static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) { static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) {
if (pDataCols->numOfPoints == 0) return 0; if (pDataCols->numOfRows == 0) return 0;
ASSERT(minKey <= maxKey); ASSERT(minKey <= maxKey);
TSKEY keyFirst = dataColsKeyFirst(pDataCols); TSKEY keyFirst = dataColsKeyFirst(pDataCols);
...@@ -1228,11 +1228,11 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) ...@@ -1228,11 +1228,11 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey)
if (minKey > keyLast || maxKey < keyFirst) return 0; if (minKey > keyLast || maxKey < keyFirst) return 0;
void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), void *ptr1 = taosbsearch((void *)&minKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
compTSKEY, TD_GE); compTSKEY, TD_GE);
ASSERT(ptr1 != NULL); ASSERT(ptr1 != NULL);
void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY), void *ptr2 = taosbsearch((void *)&maxKey, (void *)pDataCols->cols[0].pData, pDataCols->numOfRows, sizeof(TSKEY),
compTSKEY, TD_LE); compTSKEY, TD_LE);
ASSERT(ptr2 != NULL); ASSERT(ptr2 != NULL);
......
...@@ -436,7 +436,7 @@ static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlo ...@@ -436,7 +436,7 @@ static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlo
SDataBlockInfo info = { SDataBlockInfo info = {
.window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast}, .window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast},
.numOfCols = pBlock->numOfCols, .numOfCols = pBlock->numOfCols,
.rows = pBlock->numOfPoints, .rows = pBlock->numOfRows,
.tid = pCheckInfo->tableId.tid, .tid = pCheckInfo->tableId.tid,
.uid = pCheckInfo->tableId.uid, .uid = pCheckInfo->tableId.uid,
}; };
...@@ -608,11 +608,11 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -608,11 +608,11 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
} }
SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0]; SDataCols* pTSCol = pQueryHandle->rhelper.pDataCols[0];
assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfPoints == pBlock->numOfPoints); assert(pTSCol->cols->type == TSDB_DATA_TYPE_TIMESTAMP && pTSCol->numOfRows == pBlock->numOfRows);
if (pCheckInfo->lastKey > pBlock->keyFirst) { if (pCheckInfo->lastKey > pBlock->keyFirst) {
cur->pos = cur->pos =
binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); binarySearchForKey(pTSCol->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else { } else {
cur->pos = 0; cur->pos = 0;
} }
...@@ -630,9 +630,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -630,9 +630,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
SDataCols* pDataCols = pCheckInfo->pDataCols; SDataCols* pDataCols = pCheckInfo->pDataCols;
if (pCheckInfo->lastKey < pBlock->keyLast) { if (pCheckInfo->lastKey < pBlock->keyLast) {
cur->pos = cur->pos =
binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfPoints, pCheckInfo->lastKey, pQueryHandle->order); binarySearchForKey(pDataCols->cols[0].pData, pBlock->numOfRows, pCheckInfo->lastKey, pQueryHandle->order);
} else { } else {
cur->pos = pBlock->numOfPoints - 1; cur->pos = pBlock->numOfRows - 1;
} }
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa); doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, sa);
...@@ -647,7 +647,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock ...@@ -647,7 +647,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1; int firstPos, lastPos, midPos = -1;
int numOfPoints; int numOfRows;
TSKEY* keyList; TSKEY* keyList;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
...@@ -665,8 +665,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { ...@@ -665,8 +665,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
if (key == keyList[firstPos]) return firstPos; if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1; if (key < keyList[firstPos]) return firstPos - 1;
numOfPoints = lastPos - firstPos + 1; numOfRows = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
...@@ -691,8 +691,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) { ...@@ -691,8 +691,8 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
return lastPos; return lastPos;
} }
numOfPoints = lastPos - firstPos + 1; numOfRows = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
...@@ -810,7 +810,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -810,7 +810,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->mixBlock = (cur->pos != blockInfo.rows - 1); cur->mixBlock = (cur->pos != blockInfo.rows - 1);
} else { } else {
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order); endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
cur->mixBlock = true; cur->mixBlock = true;
} }
...@@ -904,7 +904,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -904,7 +904,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
} }
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order); int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
tSkipListIterNext(pCheckInfo->iter); tSkipListIterNext(pCheckInfo->iter);
} }
...@@ -1002,7 +1002,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* ...@@ -1002,7 +1002,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1; int firstPos, lastPos, midPos = -1;
int numOfPoints; int numOfRows;
TSKEY* keyList; TSKEY* keyList;
if (num <= 0) return -1; if (num <= 0) return -1;
...@@ -1018,8 +1018,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { ...@@ -1018,8 +1018,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
if (key == keyList[firstPos]) return firstPos; if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1; if (key < keyList[firstPos]) return firstPos - 1;
numOfPoints = lastPos - firstPos + 1; numOfRows = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
...@@ -1044,8 +1044,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { ...@@ -1044,8 +1044,8 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
return lastPos; return lastPos;
} }
numOfPoints = lastPos - firstPos + 1; numOfRows = lastPos - firstPos + 1;
midPos = (numOfPoints >> 1) + firstPos; midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) { if (key < keyList[midPos]) {
lastPos = midPos - 1; lastPos = midPos - 1;
...@@ -1066,7 +1066,6 @@ static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t n ...@@ -1066,7 +1066,6 @@ static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t n
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i]; STableBlockInfo* pBlockInfo = pSupporter->pDataBlockInfo[i];
// tfree(pBlockInfo->statis);
tfree(pBlockInfo); tfree(pBlockInfo);
} }
...@@ -1539,9 +1538,19 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta ...@@ -1539,9 +1538,19 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL); tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, QH_GET_NUM_OF_COLS(pHandle)); size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
memset(pHandle->statis, 0, sizeof(SDataStatis) * numOfCols);
tsdbGetDataStatis(&pHandle->rhelper, pHandle->statis, numOfCols);
*pBlockStatis = pHandle->statis; *pBlockStatis = pHandle->statis;
//update the number of NULL data rows
for(int32_t i = 0; i < numOfCols; ++i) {
if (pHandle->statis[i].numOfNull == -1) {
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1575,7 +1584,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { ...@@ -1575,7 +1584,7 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo); doLoadFileDataBlock(pHandle, pBlock, pCheckInfo);
// todo refactor // todo refactor
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfPoints - 1); int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
// if the buffer is not full in case of descending order query, move the data in the front of the buffer // if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) { if (!ASCENDING_TRAVERSE(pHandle->order) && numOfRows < pHandle->outputCapacity) {
......
...@@ -46,7 +46,6 @@ class TDTestCase: ...@@ -46,7 +46,6 @@ class TDTestCase:
tdLog.info('insert data until the first commit') tdLog.info('insert data until the first commit')
dnodesDir = tdDnodes.getDnodesRootDir() dnodesDir = tdDnodes.getDnodesRootDir()
dataDir = dnodesDir + '/dnode1/data/vnode' dataDir = dnodesDir + '/dnode1/data/vnode'
tdLog.info('CBD: dataDir=%s' % dataDir)
startTime = self.startTime startTime = self.startTime
rid0 = 1 rid0 = 1
while (True): while (True):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册