未验证 提交 6fbe057a 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #11311 from taosdata/feature/TD-14422

Feature/td 14422
......@@ -36,11 +36,13 @@ IF (TD_WINDOWS)
ENDIF ()
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -fPIC -gdwarf-2 -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
#SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
#SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
ADD_DEFINITIONS("-D_TD_ARM_")
ELSE ()
ADD_DEFINITIONS("-msse4.2 -mfma")
......
......@@ -42,7 +42,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
SET(TD_DARWIN TRUE)
SET(OSTYPE "macOS")
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare -Wno-return-type")
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare")
MESSAGE("Current system processor is ${CMAKE_SYSTEM_PROCESSOR}.")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64")
......
......@@ -127,39 +127,39 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui
pColumnInfoData->hasNull = true;
}
static FORCE_INLINE int32_t colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int8_t*)p = *(int8_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int16_t*)p = *(int16_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
static FORCE_INLINE void colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int32_t*)p = *(int32_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
static FORCE_INLINE void colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
int32_t type = pColumnInfoData->info.type;
ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int64_t*)p = *(int64_t*)v;
}
static FORCE_INLINE int32_t colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
static FORCE_INLINE void colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(float*)p = *(float*)v;
}
static FORCE_INLINE int32_t colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(double*)p = *(double*)v;
......@@ -177,7 +177,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
int32_t pageSize);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
......
......@@ -17,6 +17,7 @@
#include "tdatablock.h"
#include "tcompare.h"
#include "tglobal.h"
#include "tlog.h"
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0;
......@@ -130,44 +131,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
memcpy(pColumnInfoData->pData + len, pData, varDataTLen(pData));
pColumnInfoData->varmeta.length += varDataTLen(pData);
} else {
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
switch (type) {
case TSDB_DATA_TYPE_BOOL: {
*(bool*)p = *(bool*)pData;
break;
}
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT: {
*(int8_t*)p = *(int8_t*)pData;
break;
}
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT: {
*(int16_t*)p = *(int16_t*)pData;
break;
}
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: {
*(int32_t*)p = *(int32_t*)pData;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: {
*(int64_t*)p = *(int64_t*)pData;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(float*)p = *(float*)pData;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
*(double*)p = *(double*)pData;
break;
}
default:
assert(0);
}
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow, pData, pColumnInfoData->info.bytes);
}
return 0;
......@@ -175,6 +139,8 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
int32_t numOfRow2) {
if (numOfRow2 <= 0) return;
uint32_t total = numOfRow1 + numOfRow2;
if (BitmapLen(numOfRow1) < BitmapLen(total)) {
......@@ -189,22 +155,32 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
if (remindBits == 0) { // no need to shift bits of bitmap
memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2));
} else {
int32_t len = BitmapLen(numOfRow2);
int32_t i = 0;
return;
}
uint8_t* p = (uint8_t*)pSource->nullbitmap;
pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits);
uint8_t* p = (uint8_t*)pSource->nullbitmap;
pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits); // copy remind bits
uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
while (i < len) {
start[i] |= (p[i] << shiftBits);
i += 1;
if (BitmapLen(numOfRow1) == BitmapLen(total)) {
return;
}
if (i > 1) {
start[i - 1] |= (p[i] >> remindBits);
}
int32_t len = BitmapLen(numOfRow2);
int32_t i = 0;
uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1);
while (i < len) { // size limit of pSource->nullbitmap
if (i >= 1) {
start[i - 1] |= (p[i] >> remindBits); //copy remind bits
}
if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap
return;
}
start[i] |= (p[i] << shiftBits); //copy shift bits
i += 1;
}
}
......@@ -216,6 +192,9 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
return numOfRow1;
}
if (pSource->hasNull) {
pColumnInfoData->hasNull = pSource->hasNull;
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
// Handle the bitmap
char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2));
......@@ -336,13 +315,18 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
return 0;
}
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
// if pIndexMap = NULL, merger one column by on column
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap) {
assert(pSrc != NULL && pDest != NULL);
int32_t numOfCols = pSrc->info.numOfCols;
int32_t numOfCols = pDest->info.numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t mapIndex = i;
if(pIndexMap) {
mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
}
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows);
uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows);
......@@ -399,49 +383,49 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
// TODO speedup by checking if the whole page can fit in firstly.
if (!hasVarCol) {
size_t rowSize = blockDataGetRowSize(pBlock);
int32_t capacity = (payloadSize / (rowSize * 8 + bitmapChar * numOfCols)) * 8;
int32_t capacity = payloadSize / (rowSize + numOfCols * bitmapChar / 8.0);
ASSERT(capacity > 0);
*stopIndex = startIndex + capacity;
*stopIndex = startIndex + capacity - 1;
if (*stopIndex >= numOfRows) {
*stopIndex = numOfRows - 1;
}
return TSDB_CODE_SUCCESS;
} else {
// iterate the rows that can be fit in this buffer page
int32_t size = (headerSize + colHeaderSize);
for (int32_t j = startIndex; j < numOfRows; ++j) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
if (pColInfoData->varmeta.offset[j] != -1) {
char* p = colDataGetData(pColInfoData, j);
size += varDataTLen(p);
}
size += sizeof(pColInfoData->varmeta.offset[0]);
} else {
size += pColInfoData->info.bytes;
if (((j - startIndex) & 0x07) == 0) {
size += 1; // the space for null bitmap
}
}
// iterate the rows that can be fit in this buffer page
int32_t size = (headerSize + colHeaderSize);
for (int32_t j = startIndex; j < numOfRows; ++j) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
if (pColInfoData->varmeta.offset[j] != -1) {
char* p = colDataGetData(pColInfoData, j);
size += varDataTLen(p);
}
}
if (size > pageSize) {
*stopIndex = j - 1;
ASSERT(*stopIndex > startIndex);
size += sizeof(pColInfoData->varmeta.offset[0]);
} else {
size += pColInfoData->info.bytes;
return TSDB_CODE_SUCCESS;
if (((j - startIndex) & 0x07) == 0) {
size += 1; // the space for null bitmap
}
}
}
// all fit in
*stopIndex = numOfRows - 1;
return TSDB_CODE_SUCCESS;
if (size > pageSize) { // pageSize must be able to hold one row
*stopIndex = j - 1;
ASSERT(*stopIndex >= startIndex);
return TSDB_CODE_SUCCESS;
}
}
// all fit in
*stopIndex = numOfRows - 1;
return TSDB_CODE_SUCCESS;
}
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
......@@ -546,6 +530,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCol->varmeta.offset = (int32_t*)tmp;
memcpy(pCol->varmeta.offset, pStart, metaSize);
pStart += metaSize;
} else {
......@@ -709,23 +698,13 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
void* left1 = colDataGetData(pColInfoData, left);
void* right1 = colDataGetData(pColInfoData, right);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftx = *(int32_t*)left1;
int32_t rightx = *(int32_t*)right1;
if (leftx == rightx) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return (leftx < rightx) ? -1 : 1;
} else {
return (leftx < rightx) ? 1 : -1;
}
}
}
default:
assert(0);
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
int ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
return ret;
}
}
......@@ -779,61 +758,12 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
}
} else {
switch (pSrc->info.type) {
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
int32_t* p = (int32_t*)pDst->pData;
int32_t* srclist = (int32_t*)pSrc->pData;
p[j] = srclist[index[j]];
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
colDataSetNull_f(pDst->nullbitmap, j);
}
}
break;
}
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
int32_t* p = (int32_t*)pDst->pData;
int32_t* srclist = (int32_t*)pSrc->pData;
p[j] = srclist[index[j]];
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
colDataSetNull_f(pDst->nullbitmap, j);
}
}
break;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
int32_t* p = (int32_t*)pDst->pData;
int32_t* srclist = (int32_t*)pSrc->pData;
p[j] = srclist[index[j]];
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
colDataSetNull_f(pDst->nullbitmap, j);
}
}
break;
}
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
int32_t* p = (int32_t*)pDst->pData;
int32_t* srclist = (int32_t*)pSrc->pData;
p[j] = srclist[index[j]];
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
colDataSetNull_f(pDst->nullbitmap, j);
}
}
break;
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
colDataSetNull_f(pDst->nullbitmap, j);
continue;
}
default:
assert(0);
memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
}
}
}
......@@ -906,33 +836,6 @@ static int32_t* createTupleIndex(size_t rows) {
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
static __compar_fn_t getComparFn(int32_t type, int32_t order) {
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
return order == TSDB_ORDER_ASC ? compareInt8Val : compareInt8ValDesc;
case TSDB_DATA_TYPE_SMALLINT:
return order == TSDB_ORDER_ASC ? compareInt16Val : compareInt16ValDesc;
case TSDB_DATA_TYPE_INT:
return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc;
case TSDB_DATA_TYPE_BIGINT:
return order == TSDB_ORDER_ASC ? compareInt64Val : compareInt64ValDesc;
case TSDB_DATA_TYPE_FLOAT:
return order == TSDB_ORDER_ASC ? compareFloatVal : compareFloatValDesc;
case TSDB_DATA_TYPE_DOUBLE:
return order == TSDB_ORDER_ASC ? compareDoubleVal : compareDoubleValDesc;
case TSDB_DATA_TYPE_UTINYINT:
return order == TSDB_ORDER_ASC ? compareUint8Val : compareUint8ValDesc;
case TSDB_DATA_TYPE_USMALLINT:
return order == TSDB_ORDER_ASC ? compareUint16Val : compareUint16ValDesc;
case TSDB_DATA_TYPE_UINT:
return order == TSDB_ORDER_ASC ? compareUint32Val : compareUint32ValDesc;
case TSDB_DATA_TYPE_UBIGINT:
return order == TSDB_ORDER_ASC ? compareUint64Val : compareUint64ValDesc;
default:
return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc;
}
}
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
ASSERT(pDataBlock != NULL && pOrderInfo != NULL);
if (pDataBlock->info.rows <= 1) {
......@@ -966,11 +869,11 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
int64_t p0 = taosGetTimestampUs();
__compar_fn_t fn = getComparFn(pColInfoData->info.type, pOrder->order);
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
qsort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
int64_t p1 = taosGetTimestampUs();
printf("sort:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
return TSDB_CODE_SUCCESS;
} else { // var data type
......@@ -999,24 +902,21 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
if (pCols == NULL) {
destroyTupleIndex(index);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
int64_t p2 = taosGetTimestampUs();
int32_t code = blockDataAssign(pCols, pDataBlock, index);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return code;
}
blockDataAssign(pCols, pDataBlock, index);
int64_t p3 = taosGetTimestampUs();
copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
p3 - p2, p4 - p3, rows);
destroyTupleIndex(index);
......@@ -1248,6 +1148,9 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
if(pDataBlock == NULL){
return NULL;
}
int32_t numOfCols = pDataBlock->info.numOfCols;
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
......
......@@ -2645,6 +2645,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
taosArrayDestroy(pReq->pRetensions);
pReq->pRetensions = NULL;
return 0;
}
int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) {
......
......@@ -97,6 +97,7 @@ int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
pMgmt->statusSent = 0;
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......
......@@ -44,6 +44,7 @@ static void *dmThreadRoutine(void *param) {
lastMonitorTime = curTime;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t dmStartThread(SDnodeMgmt *pMgmt) {
......
......@@ -630,6 +630,7 @@ static int32_t mndProcessConfigDnodeReq(SNodeMsg *pReq) {
static int32_t mndProcessConfigDnodeRsp(SNodeMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle);
return TSDB_CODE_SUCCESS;
}
static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
......
......@@ -346,6 +346,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
}
taosHashCleanup(pSmaStat->smaStatItems);
}
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
......@@ -450,8 +451,10 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
winSKey);
return TSDB_CODE_FAILED;
}
tsdbDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", REPO_ID(pTsdb), indexUid,
winSKey);
return TSDB_CODE_SUCCESS;
}
/**
......@@ -1099,6 +1102,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
}
// clear sma data files
// TODO:
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid) {
......@@ -1221,6 +1225,7 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interv
pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true);
pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
return TSDB_CODE_SUCCESS;
}
/**
......@@ -1240,6 +1245,7 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey)
pSmaH->dFile.path = strdup(tSmaFile);
pSmaH->smaFsIter.iter = 0;
pSmaH->smaFsIter.fid = fid;
return TSDB_CODE_SUCCESS;
}
/**
......@@ -1431,6 +1437,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
tdDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps?
return TSDB_CODE_SUCCESS;
}
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
......@@ -1460,6 +1467,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
tsdbTSmaSub(pTsdb, 1);
// TODO: return directly or go on follow steps?
return TSDB_CODE_SUCCESS;
}
#if 0
......
......@@ -2445,7 +2445,7 @@ _return:
}
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {
return 0;
}
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
......@@ -2512,7 +2512,7 @@ _return:
}
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, const char *pIndexName, SIndexMeta** pIndexMeta) {
return 0;
}
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
......
......@@ -120,9 +120,10 @@ static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFilePage* page, int32_t rowOffset,
int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL);
ASSERT(0);
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
// return ((char *)page->data) + rowOffset + offset * numOfRows;
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data);
}
static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) {
......
......@@ -643,6 +643,7 @@ typedef struct SSortOperatorInfo {
SSDataBlock *pDataBlock;
SArray* pSortInfo;
SSortHandle *pSortHandle;
SArray* inputSlotMap; // for index map from table scan output
int32_t bufPageSize;
int32_t numOfRowsInRes;
......@@ -679,8 +680,10 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
......
......@@ -34,22 +34,22 @@ typedef struct SMultiMergeSource {
SSDataBlock *pBlock;
} SMultiMergeSource;
typedef struct SExternalMemSource {
typedef struct SSortSource {
SMultiMergeSource src;
SArray* pageIdList;
int32_t pageIndex;
} SExternalMemSource;
union{
struct{
SArray* pageIdList;
int32_t pageIndex;
};
void *param;
};
typedef struct SGenericSource {
SMultiMergeSource src;
void *param;
} SGenericSource;
} SSortSource;
typedef struct SMsortComparParam {
void **pSources;
int32_t numOfSources;
SArray *orderInfo; // SArray<SBlockOrderInfo>
bool nullFirst;
} SMsortComparParam;
typedef struct SSortHandle SSortHandle;
......@@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @param type
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
/**
*
......
......@@ -238,6 +238,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
taosCloseQueue(pDispatcher->pDataBlocks);
taosThreadMutexDestroy(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
......
......@@ -4154,6 +4154,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
return TSDB_CODE_SUCCESS;
}
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
......@@ -4753,6 +4754,8 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
blockDataCleanup(pDataBlock);
blockDataEnsureCapacity(pDataBlock, capacity);
while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
......@@ -4954,13 +4957,13 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource));
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i];
tsortAddSource(pInfo->pSortHandle, ps);
}
......@@ -5101,16 +5104,17 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
pInfo->pDataBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource));
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[0];
tsortAddSource(pInfo->pSortHandle, ps);
int32_t code = tsortOpen(pInfo->pSortHandle);
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
......@@ -5119,18 +5123,25 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
}
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
int32_t rowSize = pResBlock->info.rowSize;
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer
pInfo->bufPageSize = 1024;
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->bufPageSize = rowSize < 1024 ? 1024*2 : rowSize*2; // there are headers, so pageSize = rowSize + header
pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResBlock;
pInfo->pSortInfo = pSortInfo;
pInfo->inputSlotMap = pIndexMap;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
......@@ -6156,6 +6167,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
pBasicInfo->capacity = numOfRows;
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, pkey);
return TSDB_CODE_SUCCESS;
}
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
......@@ -6289,6 +6301,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->inputSlotMap);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6841,6 +6854,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
#endif
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
......@@ -7090,7 +7104,9 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractScanColumnId(SNodeList* pNodeList);
static SArray* extractColumnInfo(SNodeList* pNodeList);
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget);
static SArray* createIndexMap(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
......@@ -7189,9 +7205,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
return createSortOperatorInfo(op, pResBlock, info, pTaskInfo);
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
return createSortOperatorInfo(op, pResBlock, info, slotMap, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
......@@ -7220,6 +7236,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}*/
return NULL;
}
static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo,
......@@ -7331,7 +7348,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
return pList;
}
SArray* createSortInfo(SNodeList* pNodeList) {
SArray* createSortInfo(SNodeList* pNodeList, SNodeList* pNodeListTarget) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo));
if (pList == NULL) {
......@@ -7340,19 +7357,53 @@ SArray* createSortInfo(SNodeList* pNodeList) {
}
for (int32_t i = 0; i < numOfCols; ++i) {
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
SBlockOrderInfo bi = {0};
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr;
bi.slotId = pColNode->slotId;
bool found = false;
for (int32_t j = 0; j < LIST_LENGTH(pNodeListTarget); ++j) {
STargetNode* pTarget = (STargetNode*)nodesListGetNode(pNodeListTarget, j);
SColumnNode* pColNodeT = (SColumnNode*)pTarget->pExpr;
if(pColNode->slotId == pColNodeT->slotId){ // to find slotId in PhysiSort OutputDataBlockDesc
bi.slotId = pTarget->slotId;
found = true;
break;
}
}
if(!found){
qError("sort slot id does not found");
}
taosArrayPush(pList, &bi);
}
return pList;
}
SArray* createIndexMap(SNodeList* pNodeList) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(int32_t));
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return pList;
}
for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pTarget = (STargetNode*)nodesListGetNode(pNodeList, i);
SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
taosArrayPush(pList, &pColNode->slotId);
}
return pList;
}
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols) {
size_t numOfCols = LIST_LENGTH(pNodeList);
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
......
......@@ -581,6 +581,7 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
}
tsem_post(&pScanResInfo->ready);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
......
......@@ -409,6 +409,7 @@ char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) {
// todo
return TSDB_CODE_SUCCESS;
}
void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
......
......@@ -258,6 +258,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) {
// todo
return TSDB_CODE_SUCCESS;
}
void tSimpleHashClear(SSHashObj *pHashObj) {
......
......@@ -22,6 +22,7 @@
#include "tpagedbuf.h"
#include "tsort.h"
#include "tutil.h"
#include "tcompare.h"
struct STupleHandle {
SSDataBlock* pBlock;
......@@ -36,6 +37,7 @@ struct SSortHandle {
SDiskbasedBuf *pBuf;
SArray *pSortInfo;
SArray *pIndexMap;
SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp;
......@@ -89,13 +91,14 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
* @param type
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages;
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pIndexMap = pIndexMap;
pSortHandle->pDataBlock = createOneDataBlock(pBlock);
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
......@@ -110,6 +113,17 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
return pSortHandle;
}
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
blockDataDestroy(pSource->src.pBlock);
taosMemoryFreeClear(pSource);
}
cmpParam->numOfSources = 0;
return TSDB_CODE_SUCCESS;
}
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
tsortClose(pSortHandle);
if (pSortHandle->pMergeTree != NULL) {
......@@ -118,15 +132,23 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
destroyDiskbasedBuf(pSortHandle->pBuf);
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){
SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i);
blockDataDestroy((*pSource)->src.pBlock);
taosMemoryFreeClear(*pSource);
}
taosArrayDestroy(pSortHandle->pOrderedSource);
taosMemoryFreeClear(pSortHandle);
}
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
taosArrayPush(pSortHandle->pOrderedSource, &pSource);
return TSDB_CODE_SUCCESS;
}
static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) {
SExternalMemSource* pSource = taosMemoryCalloc(1, sizeof(SExternalMemSource));
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -139,8 +161,8 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSource
(*sourceId) += 1;
int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(pBlock))/rowSize;
int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(pBlock))/rowSize; // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
ASSERT(numOfRows > 0);
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
}
......@@ -148,7 +170,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t start = 0;
if (pHandle->pBuf == NULL) {
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", "/tmp");
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -164,15 +186,16 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
}
int32_t pageId = -1;
SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
blockDataDestroy(p);
return terrno;
}
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage->data, p);
blockDataToBuf(pPage, p);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
......@@ -184,10 +207,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
......@@ -198,11 +218,11 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
SSortSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
code = blockDataFromBuf(pSource->src.pBlock, pPage->data);
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
code = blockDataFromBuf(pSource->src.pBlock, pPage);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -212,7 +232,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
} else {
// multi-pass internal merge sort is required
if (pHandle->pBuf == NULL) {
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", "/tmp");
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -220,7 +240,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
}
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SGenericSource* pSource = cmpParam->pSources[i];
SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
}
}
......@@ -228,16 +248,6 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
return code;
}
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SExternalMemSource* pSource = cmpParam->pSources[i];
blockDataDestroy(pSource->src.pBlock);
taosMemoryFreeClear(pSource);
}
cmpParam->numOfSources = 0;
}
static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
......@@ -257,7 +267,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou
*rowIndex += 1;
}
static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) {
static int32_t adjustMergeTreeForNextTuple(SSortSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) {
/*
* load a new SDataBlock into memory of a given intermediate data-set source,
* since it's last record in buffer has been chosen to be processed, as the winner of loser-tree
......@@ -266,6 +276,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
pSource->src.rowIndex = 0;
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
pSource->pageIndex ++;
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
......@@ -274,8 +285,8 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
} else {
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage->data);
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -283,7 +294,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
releaseBufPage(pHandle->pBuf, pPage);
}
} else {
pSource->src.pBlock = pHandle->fetchfp(((SGenericSource*)pSource)->param);
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1;
pSource->src.rowIndex = -1;
......@@ -308,9 +319,10 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
printf("\nafter adjust:\t");
tMergeTreePrint(pTree);
#endif
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
blockDataCleanup(pHandle->pDataBlock);
while(1) {
......@@ -320,7 +332,7 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam*
int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
SExternalMemSource *pSource = (*cmpParam).pSources[index];
SSortSource *pSource = (*cmpParam).pSources[index];
appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex);
int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
......@@ -345,8 +357,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
SArray *pInfo = pParam->orderInfo;
SExternalMemSource* pLeftSource = pParam->pSources[pLeftIdx];
SExternalMemSource* pRightSource = pParam->pSources[pRightIdx];
SSortSource* pLeftSource = pParam->pSources[pLeftIdx];
SSortSource* pRightSource = pParam->pSources[pRightIdx];
// this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) {
......@@ -381,35 +393,26 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
}
if (rightNull) {
return pParam->nullFirst? 1:-1;
return pOrder->nullFirst? 1:-1;
}
if (leftNull) {
return pParam->nullFirst? -1:1;
return pOrder->nullFirst? -1:1;
}
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftv = *(int32_t*)left1;
int32_t rightv = *(int32_t*)right1;
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
if (leftv == rightv) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return leftv < rightv? -1 : 1;
} else {
return leftv < rightv? 1 : -1;
}
}
}
default:
assert(0);
int ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
return ret;
}
}
return 0;
}
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
......@@ -422,7 +425,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64,
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed);
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
......@@ -459,13 +462,13 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
}
while (1) {
SSDataBlock* pDataBlock = getSortedBlockData(pHandle, &pHandle->cmpParam, numOfRows);
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) {
break;
}
int32_t pageId = -1;
SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
return terrno;
}
......@@ -473,7 +476,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage->data, pDataBlock);
blockDataToBuf(pPage, pDataBlock);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
......@@ -481,6 +484,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
blockDataCleanup(pDataBlock);
}
sortComparClearup(&pHandle->cmpParam);
tMergeTreeDestroy(pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0;
......@@ -491,8 +495,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
}
}
sortComparClearup(&pHandle->cmpParam);
taosArrayClear(pHandle->pOrderedSource);
taosArrayAddAll(pHandle->pOrderedSource, pResList);
taosArrayDestroy(pResList);
......@@ -520,9 +522,8 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SGenericSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource);
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (pBlock == NULL) {
......@@ -533,7 +534,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
pHandle->pDataBlock = createOneDataBlock(pBlock);
}
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
if (code != 0) {
return code;
}
......@@ -555,8 +556,13 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize) {
pHandle->cmpParam.numOfSources = 1;
......@@ -569,8 +575,6 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
taosMemoryFreeClear(source);
}
return TSDB_CODE_SUCCESS;
......@@ -612,22 +616,22 @@ int32_t tsortOpen(SSortHandle* pHandle) {
return code;
}
code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
}
int32_t tsortClose(SSortHandle* pHandle) {
// do nothing
return TSDB_CODE_SUCCESS;
}
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
pHandle->fetchfp = fp;
return TSDB_CODE_SUCCESS;
}
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
pHandle->comparFn = fp;
return TSDB_CODE_SUCCESS;
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
......@@ -647,7 +651,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
}
int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree);
SExternalMemSource *pSource = pHandle->cmpParam.pSources[index];
SSortSource *pSource = pHandle->cmpParam.pSources[index];
if (pHandle->needAdjust) {
int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources);
......@@ -677,7 +681,8 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
}
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
return false;
SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
return colDataIsNull(pColInfoSrc, 0, pVHandle->rowIndex, NULL);
}
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
......
......@@ -33,14 +33,29 @@
#include "tdef.h"
#include "trpc.h"
#include "tvariant.h"
#include "tcompare.h"
namespace {
typedef struct {
int32_t startVal;
int32_t count;
int32_t pageRows;
int16_t type;
} _info;
int16_t VARCOUNT = 16;
float rand_f2()
{
unsigned r = taosRand();
r &= 0x007fffff;
r |= 0x40800000;
return *(float*)&r - 6.0;
}
static const int32_t TEST_NUMBER = 1;
#define bigendian() ((*(char *)&TEST_NUMBER) == 0)
SSDataBlock* getSingleColDummyBlock(void* param) {
_info* pInfo = (_info*) param;
if (--pInfo->count < 0) {
......@@ -51,19 +66,65 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.type = pInfo->type;
if (pInfo->type == TSDB_DATA_TYPE_NCHAR){
colInfo.info.bytes = TSDB_NCHAR_SIZE * VARCOUNT + VARSTR_HEADER_SIZE;
colInfo.varmeta.offset = static_cast<int32_t *>(taosMemoryCalloc(pInfo->pageRows, sizeof(int32_t)));
} else if(pInfo->type == TSDB_DATA_TYPE_BINARY) {
colInfo.info.bytes = VARCOUNT + VARSTR_HEADER_SIZE;
colInfo.varmeta.offset = static_cast<int32_t *>(taosMemoryCalloc(pInfo->pageRows, sizeof(int32_t)));
} else{
colInfo.info.bytes = tDataTypes[pInfo->type].bytes;
colInfo.pData = static_cast<char*>(taosMemoryCalloc(pInfo->pageRows, colInfo.info.bytes));
colInfo.nullbitmap = static_cast<char*>(taosMemoryCalloc(1, (pInfo->pageRows + 7) / 8));
}
colInfo.info.colId = 1;
colInfo.pData = static_cast<char*>(taosMemoryCalloc(pInfo->pageRows, sizeof(int32_t)));
colInfo.nullbitmap = static_cast<char*>(taosMemoryCalloc(1, (pInfo->pageRows + 7) / 8));
taosArrayPush(pBlock->pDataBlock, &colInfo);
for (int32_t i = 0; i < pInfo->pageRows; ++i) {
SColumnInfoData* pColInfo = static_cast<SColumnInfoData*>(TARRAY_GET_ELEM(pBlock->pDataBlock, 0));
int32_t v = ++pInfo->startVal;
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
if (pInfo->type == TSDB_DATA_TYPE_NCHAR){
int32_t size = taosRand() % VARCOUNT;
char str[128] = {0};
char strOri[128] = {0};
taosRandStr(strOri, size);
int32_t len = 0;
bool ret = taosMbsToUcs4(strOri, size, (TdUcs4*)varDataVal(str), size * TSDB_NCHAR_SIZE, &len);
if (!ret){
printf("error\n");
return NULL;
}
varDataSetLen(str, len);
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(str), false);
pBlock->info.hasVarCol = true;
printf("nchar: %s\n",strOri);
} else if(pInfo->type == TSDB_DATA_TYPE_BINARY){
int32_t size = taosRand() % VARCOUNT;
char str[64] = {0};
taosRandStr(varDataVal(str), size);
varDataSetLen(str, size);
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(str), false);
pBlock->info.hasVarCol = true;
printf("binary: %s\n", varDataVal(str));
} else if(pInfo->type == TSDB_DATA_TYPE_DOUBLE || pInfo->type == TSDB_DATA_TYPE_FLOAT) {
double v = rand_f2();
colDataAppend(pColInfo, i, reinterpret_cast<const char*>(&v), false);
printf("float: %f\n", v);
} else{
int64_t v = ++pInfo->startVal;
char *result = static_cast<char*>(taosMemoryCalloc(tDataTypes[pInfo->type].bytes, 1));
if (!bigendian()){
memcpy(result, &v, tDataTypes[pInfo->type].bytes);
}else{
memcpy(result, (char*)(&v) + sizeof(int64_t) - tDataTypes[pInfo->type].bytes, tDataTypes[pInfo->type].bytes);
}
colDataAppend(pColInfo, i, result, false);
printf("int: %ld\n", v);
taosMemoryFree(result);
}
}
pBlock->info.rows = pInfo->pageRows;
......@@ -71,17 +132,18 @@ SSDataBlock* getSingleColDummyBlock(void* param) {
return pBlock;
}
int32_t docomp(const void* p1, const void* p2, void* param) {
int32_t pLeftIdx = *(int32_t *)p1;
int32_t pRightIdx = *(int32_t *)p2;
SMsortComparParam *pParam = (SMsortComparParam *)param;
SGenericSource** px = reinterpret_cast<SGenericSource**>(pParam->pSources);
SSortSource** px = reinterpret_cast<SSortSource**>(pParam->pSources);
SArray *pInfo = pParam->orderInfo;
SGenericSource* pLeftSource = px[pLeftIdx];
SGenericSource* pRightSource = px[pRightIdx];
SSortSource* pLeftSource = px[pLeftIdx];
SSortSource* pRightSource = px[pRightIdx];
// this input is exhausted, set the special value to denote this
if (pLeftSource->src.rowIndex == -1) {
......@@ -116,33 +178,22 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
}
if (rightNull) {
return pParam->nullFirst? 1:-1;
return pOrder->nullFirst? 1:-1;
}
if (leftNull) {
return pParam->nullFirst? -1:1;
return pOrder->nullFirst? -1:1;
}
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftv = *(int32_t*)left1;
int32_t rightv = *(int32_t*)right1;
if (leftv == rightv) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return leftv < rightv? -1 : 1;
} else {
return leftv < rightv? 1 : -1;
}
}
}
default:
assert(0);
int ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
return ret;
}
}
......@@ -150,28 +201,30 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
}
} // namespace
#if 0
#if 1
TEST(testCase, inMem_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
int32_t numOfRows = 1000;
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
oi.slotId = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
tsortAddSource(phandle, &numOfRows);
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
pInfo->startVal = 0;
pInfo->pageRows = 100;
pInfo->count = 6;
pInfo->type = TSDB_DATA_TYPE_USMALLINT;
SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
ps->param = pInfo;
tsortAddSource(phandle, ps);
int32_t code = tsortOpen(phandle);
int32_t row = 1;
taosMemoryFreeClear(ps);
while(1) {
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
......@@ -180,83 +233,153 @@ TEST(testCase, inMem_sort_Test) {
}
void* v = tsortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v);
printf("%d: %d\n", row, *(uint16_t*) v);
ASSERT_EQ(row++, *(uint16_t*) v);
}
taosArrayDestroy(orderInfo);
tsortDestroySortHandle(phandle);
taosMemoryFree(pInfo);
}
TEST(testCase, external_mem_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
_info* pInfo = (_info*) taosMemoryCalloc(8, sizeof(_info));
pInfo[0].startVal = 0;
pInfo[0].pageRows = 10;
pInfo[0].count = 6;
pInfo[0].type = TSDB_DATA_TYPE_BOOL;
pInfo[1].startVal = 0;
pInfo[1].pageRows = 10;
pInfo[1].count = 6;
pInfo[1].type = TSDB_DATA_TYPE_TINYINT;
pInfo[2].startVal = 0;
pInfo[2].pageRows = 100;
pInfo[2].count = 6;
pInfo[2].type = TSDB_DATA_TYPE_USMALLINT;
pInfo[3].startVal = 0;
pInfo[3].pageRows = 100;
pInfo[3].count = 6;
pInfo[3].type = TSDB_DATA_TYPE_INT;
pInfo[4].startVal = 0;
pInfo[4].pageRows = 100;
pInfo[4].count = 6;
pInfo[4].type = TSDB_DATA_TYPE_UBIGINT;
pInfo[5].startVal = 0;
pInfo[5].pageRows = 100;
pInfo[5].count = 6;
pInfo[5].type = TSDB_DATA_TYPE_DOUBLE;
pInfo[6].startVal = 0;
pInfo[6].pageRows = 50;
pInfo[6].count = 6;
pInfo[6].type = TSDB_DATA_TYPE_NCHAR;
pInfo[7].startVal = 0;
pInfo[7].pageRows = 100;
pInfo[7].count = 6;
pInfo[7].type = TSDB_DATA_TYPE_BINARY;
for (int i = 0; i < 8; i++){
SBlockOrderInfo oi = {0};
if(pInfo[i].type == TSDB_DATA_TYPE_NCHAR){
oi.order = TSDB_ORDER_DESC;
}else{
oi.order = TSDB_ORDER_ASC;
}
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
pInfo->startVal = 100000;
pInfo->pageRows = 1000;
pInfo->count = 50;
oi.slotId = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SGenericSource* ps = static_cast<SGenericSource*>(taosMemoryCalloc(1, sizeof(SGenericSource)));
ps->param = pInfo;
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
tsortAddSource(phandle, ps);
SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
ps->param = &pInfo[i];
int32_t code = tsortOpen(phandle);
int32_t row = 1;
tsortAddSource(phandle, ps);
while(1) {
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
if (pTupleHandle == NULL) {
break;
}
int32_t code = tsortOpen(phandle);
int32_t row = 1;
taosMemoryFreeClear(ps);
void* v = tsortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v);
printf("--------start with %s-----------\n", tDataTypes[pInfo[i].type].name);
while(1) {
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
if (pTupleHandle == NULL) {
break;
}
void* v = tsortGetValue(pTupleHandle, 0);
if(pInfo[i].type == TSDB_DATA_TYPE_NCHAR){
char buf[128] = {0};
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(v), varDataLen(v), buf);
printf("%d: %s\n", row++, buf);
}else if(pInfo[i].type == TSDB_DATA_TYPE_BINARY){
char buf[128] = {0};
memcpy(buf, varDataVal(v), varDataLen(v));
printf("%d: %s\n", row++, buf);
}else if(pInfo[i].type == TSDB_DATA_TYPE_DOUBLE) {
printf("double: %lf\n", *(double*)v);
}else if (pInfo[i].type == TSDB_DATA_TYPE_FLOAT) {
printf("float: %f\n", *(float*)v);
}else{
int64_t result = 0;
if (!bigendian()){
memcpy(&result, v, tDataTypes[pInfo[i].type].bytes);
}else{
memcpy((char*)(&result) + sizeof(int64_t) - tDataTypes[pInfo[i].type].bytes, v, tDataTypes[pInfo[i].type].bytes);
}
printf("%d: %ld\n", row++, result);
}
}
taosArrayDestroy(orderInfo);
tsortDestroySortHandle(phandle);
}
tsortDestroySortHandle(phandle);
taosMemoryFree(pInfo);
}
TEST(testCase, ordered_merge_sort_Test) {
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
SOrder o = {.order = TSDB_ORDER_ASC};
o.col.info.colId = 1;
o.col.info.type = TSDB_DATA_TYPE_INT;
taosArrayPush(pOrderVal, &o);
int32_t numOfRows = 1000;
SBlockOrderInfo oi = {0};
oi.order = TSDB_ORDER_ASC;
oi.colIndex = 0;
oi.slotId = 0;
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
taosArrayPush(orderInfo, &oi);
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc");
SSDataBlock* pBlock = static_cast<SSDataBlock*>(taosMemoryCalloc(1, sizeof(SSDataBlock)));
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc");
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
tsortSetComparFp(phandle, docomp);
SSortSource* p[10] = {0};
_info c[10] = {0};
for(int32_t i = 0; i < 10; ++i) {
SGenericSource* p = static_cast<SGenericSource*>(taosMemoryCalloc(1, sizeof(SGenericSource)));
_info* c = static_cast<_info*>(taosMemoryCalloc(1, sizeof(_info)));
c->count = 1;
c->pageRows = 1000;
c->startVal = 0;
p->param = c;
tsortAddSource(phandle, p);
p[i] = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
c[i].count = 1;
c[i].pageRows = 1000;
c[i].startVal = i*1000;
c[i].type = TSDB_DATA_TYPE_INT;
p[i]->param = &c[i];
tsortAddSource(phandle, p[i]);
}
int32_t code = tsortOpen(phandle);
......@@ -269,10 +392,14 @@ TEST(testCase, ordered_merge_sort_Test) {
}
void* v = tsortGetValue(pTupleHandle, 0);
printf("%d: %d\n", row++, *(int32_t*) v);
printf("%d: %d\n", row, *(int32_t*) v);
ASSERT_EQ(row++, *(int32_t*) v);
}
taosArrayDestroy(orderInfo);
tsortDestroySortHandle(phandle);
blockDataDestroy(pBlock);
}
#endif
......
......@@ -96,6 +96,7 @@ int32_t countFunction(SqlFunctionCtx *pCtx) {
*((int64_t *)buf) += numOfElem;
SET_VAL(pResInfo, numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \
......@@ -166,6 +167,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) {
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
......@@ -434,11 +436,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int32_t minFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 1);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
int32_t maxFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
typedef struct STopBotRes {
......@@ -588,6 +592,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
void stddevFinalize(SqlFunctionCtx* pCtx) {
......@@ -714,6 +719,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
}
SET_VAL(pResInfo, notNullElems, 1);
return TSDB_CODE_SUCCESS;
}
// TODO set the correct parameter.
......@@ -779,6 +785,7 @@ int32_t firstFunction(SqlFunctionCtx *pCtx) {
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
int32_t lastFunction(SqlFunctionCtx *pCtx) {
......@@ -836,6 +843,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
typedef struct SDiffInfo {
......@@ -1064,6 +1072,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
* 2. current block may be null value
*/
assert(pCtx->hasNull);
return 0;
} else {
// for (int t = 0; t < pCtx->tagInfo.numOfTagCols; ++t) {
// SqlFunctionCtx* tagCtx = pCtx->tagInfo.pTagCtxList[t];
......
......@@ -133,7 +133,6 @@ void sclFreeRes(SHashObj *res) {
void sclFreeParam(SScalarParam *param) {
if (param->columnData != NULL) {
colDataDestroy(param->columnData);
taosMemoryFreeClear(param->columnData);
}
if (param->pHashFilter != NULL) {
......
......@@ -795,16 +795,19 @@ bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
return TSDB_CODE_SUCCESS;
}
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
return TSDB_CODE_SUCCESS;
}
int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
return TSDB_CODE_SUCCESS;
}
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
......
......@@ -476,7 +476,6 @@ static SColumnInfoData* doVectorConvert(SScalarParam* pInput, int32_t* doConvert
static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
if (type == VECTOR_DO_CONVERT) {
colDataDestroy(pCol);
taosMemoryFree(pCol);
}
}
......
......@@ -343,6 +343,7 @@ _return:
SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
SCH_ERR_RET(code);
return TSDB_CODE_SUCCESS;
}
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
......@@ -877,7 +878,7 @@ _return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
int32_t schProcessOnDataFetched(SSchJob *job) {
void schProcessOnDataFetched(SSchJob *job) {
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
tsem_post(&job->rspSem);
}
......@@ -1333,6 +1334,7 @@ int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
return TSDB_CODE_SUCCESS;
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
......@@ -2232,7 +2234,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
int32_t schCancelJob(SSchJob *pJob) {
// TODO
return TSDB_CODE_SUCCESS;
// TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
......
......@@ -298,4 +298,5 @@ static int tdbPCacheCloseImpl(SPCache *pCache) {
}
tdbPCacheDestroyLock(pCache);
return 0;
}
......@@ -437,7 +437,6 @@ int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight) {
int32_t compareWStrPatternNotMatch(const void *pLeft, const void *pRight) {
return compareWStrPatternMatch(pLeft, pRight) ? 0 : 1;
}
__compar_fn_t getComparFunc(int32_t type, int32_t optr) {
__compar_fn_t comparFn = NULL;
......@@ -569,53 +568,36 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
}
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) {
__compar_fn_t comparFn = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt8Val : compareInt8ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt8Val : compareInt8ValDesc;
case TSDB_DATA_TYPE_SMALLINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt16Val : compareInt16ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt16Val : compareInt16ValDesc;
case TSDB_DATA_TYPE_INT:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
case TSDB_DATA_TYPE_FLOAT:
comparFn = (order == TSDB_ORDER_ASC) ? compareFloatVal : compareFloatValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareFloatVal : compareFloatValDesc;
case TSDB_DATA_TYPE_DOUBLE:
comparFn = (order == TSDB_ORDER_ASC) ? compareDoubleVal : compareDoubleValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareDoubleVal : compareDoubleValDesc;
case TSDB_DATA_TYPE_UTINYINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint8Val : compareUint8ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint8Val : compareUint8ValDesc;
case TSDB_DATA_TYPE_USMALLINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint16Val : compareUint16ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint16Val : compareUint16ValDesc;
case TSDB_DATA_TYPE_UINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint32Val : compareUint32ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint32Val : compareUint32ValDesc;
case TSDB_DATA_TYPE_UBIGINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint64Val : compareUint64ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint64Val : compareUint64ValDesc;
case TSDB_DATA_TYPE_BINARY:
comparFn = (order == TSDB_ORDER_ASC) ? compareLenPrefixedStr : compareLenPrefixedStrDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedStr : compareLenPrefixedStrDesc;
case TSDB_DATA_TYPE_NCHAR:
comparFn = (order == TSDB_ORDER_ASC) ? compareLenPrefixedWStr : compareLenPrefixedWStrDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedWStr : compareLenPrefixedWStrDesc;
default:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
}
return comparFn;
}
int32_t doCompare(const char *f1, const char *f2, int32_t type, size_t size) {
......
......@@ -136,7 +136,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
pBuf->nextPos += size;
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
if (ret != 0) {
if (ret == -1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
......@@ -167,7 +167,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
// 3. write to disk.
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
if (ret != 0) {
if (ret == -1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
......@@ -222,7 +222,7 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
// load file block data in disk
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
if (ret != 0) {
if (ret == -1) {
ret = TAOS_SYSTEM_ERROR(errno);
return ret;
}
......@@ -348,8 +348,7 @@ static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
}
static SPageInfo* getPageInfoFromPayload(void* page) {
int32_t offset = offsetof(SPageInfo, pData);
char* p = (char *)page - offset;
char* p = (char *)page - POINTER_BYTES;
SPageInfo* ppi = ((SPageInfo**)p)[0];
return ppi;
......@@ -369,7 +368,6 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf->totalBufSize = 0;
pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
pPBuf->allocateId = -1;
pPBuf->comp = true;
pPBuf->pFile = NULL;
pPBuf->id = strdup(id);
pPBuf->fileSize = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册