提交 12c3a10a 编写于 作者: wmmhello's avatar wmmhello

fix error in order by logic & fix no return error

上级 69a5e55a
......@@ -36,8 +36,8 @@ IF (TD_WINDOWS)
ENDIF ()
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -fPIC -gdwarf-2 -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64")
......
......@@ -42,7 +42,7 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
SET(TD_DARWIN TRUE)
SET(OSTYPE "macOS")
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare -Wno-return-type")
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare")
MESSAGE("Current system processor is ${CMAKE_SYSTEM_PROCESSOR}.")
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "x86_64")
......
......@@ -113,39 +113,39 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
pColumnInfoData->hasNull = true;
}
static FORCE_INLINE int32_t colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int8_t*)p = *(int8_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int16_t*)p = *(int16_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
static FORCE_INLINE void colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int32_t*)p = *(int32_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
static FORCE_INLINE void colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
int32_t type = pColumnInfoData->info.type;
ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int64_t*)p = *(int64_t*)v;
}
static FORCE_INLINE int32_t colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
static FORCE_INLINE void colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(float*)p = *(float*)v;
}
static FORCE_INLINE int32_t colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(double*)p = *(double*)v;
......
......@@ -17,6 +17,7 @@
#include "tdatablock.h"
#include "tcompare.h"
#include "tglobal.h"
#include "tlog.h"
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0;
......@@ -424,7 +425,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
}
size += sizeof(pColInfoData->varmeta.offset[0]);
} else {
} else { // this block is unreached, because hasVarCol = true
size += pColInfoData->info.bytes;
if (((j - startIndex) & 0x07) == 0) {
......@@ -665,23 +666,13 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
void* left1 = colDataGetData(pColInfoData, left);
void* right1 = colDataGetData(pColInfoData, right);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftx = *(int32_t*)left1;
int32_t rightx = *(int32_t*)right1;
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
if (leftx == rightx) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return (leftx < rightx) ? -1 : 1;
} else {
return (leftx < rightx) ? 1 : -1;
}
}
}
default:
assert(0);
int ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
return ret;
}
}
......@@ -834,17 +825,14 @@ static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
taosMemoryFreeClear(pColInfoData->varmeta.offset);
pColInfoData->varmeta = pCols[i].varmeta;
} else {
taosMemoryFreeClear(pColInfoData->nullbitmap);
pColInfoData->nullbitmap = pCols[i].nullbitmap;
}
taosMemoryFreeClear(pColInfoData->pData);
pColInfoData->pData = pCols[i].pData;
}
taosMemoryFreeClear(pCols);
colDataDestroy(pCols);
}
static int32_t* createTupleIndex(size_t rows) {
......@@ -862,33 +850,6 @@ static int32_t* createTupleIndex(size_t rows) {
static void destroyTupleIndex(int32_t* index) { taosMemoryFreeClear(index); }
static __compar_fn_t getComparFn(int32_t type, int32_t order) {
switch (type) {
case TSDB_DATA_TYPE_TINYINT:
return order == TSDB_ORDER_ASC ? compareInt8Val : compareInt8ValDesc;
case TSDB_DATA_TYPE_SMALLINT:
return order == TSDB_ORDER_ASC ? compareInt16Val : compareInt16ValDesc;
case TSDB_DATA_TYPE_INT:
return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc;
case TSDB_DATA_TYPE_BIGINT:
return order == TSDB_ORDER_ASC ? compareInt64Val : compareInt64ValDesc;
case TSDB_DATA_TYPE_FLOAT:
return order == TSDB_ORDER_ASC ? compareFloatVal : compareFloatValDesc;
case TSDB_DATA_TYPE_DOUBLE:
return order == TSDB_ORDER_ASC ? compareDoubleVal : compareDoubleValDesc;
case TSDB_DATA_TYPE_UTINYINT:
return order == TSDB_ORDER_ASC ? compareUint8Val : compareUint8ValDesc;
case TSDB_DATA_TYPE_USMALLINT:
return order == TSDB_ORDER_ASC ? compareUint16Val : compareUint16ValDesc;
case TSDB_DATA_TYPE_UINT:
return order == TSDB_ORDER_ASC ? compareUint32Val : compareUint32ValDesc;
case TSDB_DATA_TYPE_UBIGINT:
return order == TSDB_ORDER_ASC ? compareUint64Val : compareUint64ValDesc;
default:
return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc;
}
}
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
ASSERT(pDataBlock != NULL && pOrderInfo != NULL);
if (pDataBlock->info.rows <= 1) {
......@@ -922,11 +883,11 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
int64_t p0 = taosGetTimestampUs();
__compar_fn_t fn = getComparFn(pColInfoData->info.type, pOrder->order);
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
qsort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
int64_t p1 = taosGetTimestampUs();
printf("sort:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
return TSDB_CODE_SUCCESS;
} else { // var data type
......@@ -955,6 +916,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
SColumnInfoData* pCols = createHelpColInfoData(pDataBlock);
if (pCols == NULL) {
destroyTupleIndex(index);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
......@@ -963,6 +925,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
int32_t code = blockDataAssign(pCols, pDataBlock, index);
if (code != TSDB_CODE_SUCCESS) {
destroyTupleIndex(index);
terrno = code;
return code;
}
......@@ -972,7 +935,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
p3 - p2, p4 - p3, rows);
destroyTupleIndex(index);
......@@ -1234,6 +1197,7 @@ void colDataDestroy(SColumnInfoData* pColData) {
}
taosMemoryFree(pColData->pData);
taosMemoryFree(pColData);
}
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
......
......@@ -2571,6 +2571,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
taosArrayDestroy(pReq->pRetensions);
pReq->pRetensions = NULL;
return 0;
}
int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) {
......
......@@ -95,6 +95,7 @@ int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
pMgmt->statusSent = 0;
return TSDB_CODE_SUCCESS;
}
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
......
......@@ -50,6 +50,7 @@ static void *dmThreadRoutine(void *param) {
lastMonitorTime = curTime;
}
}
return TSDB_CODE_SUCCESS;
}
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
......
......@@ -333,6 +333,7 @@ int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
taosMsleep(1);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
......
......@@ -630,6 +630,7 @@ static int32_t mndProcessConfigDnodeReq(SNodeMsg *pReq) {
static int32_t mndProcessConfigDnodeRsp(SNodeMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle);
return TSDB_CODE_SUCCESS;
}
static int32_t mndGetConfigMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
......
......@@ -331,6 +331,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
}
taosHashCleanup(pSmaStat->smaStatItems);
}
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
......@@ -433,6 +434,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
return TSDB_CODE_FAILED;
}
tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid, winSKey);
return TSDB_CODE_SUCCESS;
}
/**
......@@ -1051,6 +1053,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) {
}
// clear sma data files
// TODO:
return TSDB_CODE_SUCCESS;
}
static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid) {
......@@ -1173,6 +1176,7 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interv
pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision, true);
pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
return TSDB_CODE_SUCCESS;
}
/**
......@@ -1192,6 +1196,7 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey)
pSmaH->dFile.path = strdup(tSmaFile);
pSmaH->smaFsIter.iter = 0;
pSmaH->smaFsIter.fid = fid;
return TSDB_CODE_SUCCESS;
}
/**
......
......@@ -2532,7 +2532,7 @@ _return:
}
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {
return 0;
}
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
......@@ -2599,7 +2599,7 @@ _return:
}
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, const char *pIndexName, SIndexMeta** pIndexMeta) {
return 0;
}
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
......
......@@ -128,8 +128,8 @@ static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFile
int32_t offset) {
assert(rowOffset >= 0 && pQueryAttr != NULL);
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
// return ((char *)page->data) + rowOffset + offset * numOfRows;
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
return ((char *)page->data);
}
static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) {
......
......@@ -236,6 +236,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
taosCloseQueue(pDispatcher->pDataBlocks);
taosThreadMutexDestroy(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
}
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
......
......@@ -359,6 +359,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
idata.info.precision = pDescNode->dataType.precision;
taosArrayPush(pBlock->pDataBlock, &idata);
if (IS_VAR_DATA_TYPE(idata.info.type)) {
pBlock->info.hasVarCol = true;
}
}
return pBlock;
......@@ -4924,6 +4928,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
pOperator->status = OP_EXEC_DONE;
return pBlock;
#endif
return TSDB_CODE_SUCCESS;
}
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
......@@ -5000,6 +5005,7 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
return TSDB_CODE_SUCCESS;
}
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
......@@ -5673,6 +5679,7 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
}
tsem_post(&pScanResInfo->ready);
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
......@@ -6484,6 +6491,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
tsortAddSource(pInfo->pSortHandle, ps);
int32_t code = tsortOpen(pInfo->pSortHandle);
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
}
......@@ -6496,15 +6504,18 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
int32_t rowSize = pResBlock->info.rowSize;
if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer
pInfo->bufPageSize = 1024;
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize;
pInfo->sortBufSize = pInfo->bufPageSize * 16; // 1MB, TODO dynamic set the available sort buffer
pInfo->numOfRowsInRes = 1024;
pInfo->pDataBlock = pResBlock;
pInfo->pSortInfo = pSortInfo;
......@@ -7590,6 +7601,7 @@ static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, S
pBasicInfo->capacity = numOfRows;
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, pkey);
return TSDB_CODE_SUCCESS;
}
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
......@@ -8389,6 +8401,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
return (pRes->info.rows == 0)? NULL:pInfo->pRes;
#endif
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
......@@ -8906,6 +8919,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
}
}*/
return NULL;
}
static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STableGroupInfo* pGroupInfo,
......@@ -9217,6 +9231,7 @@ int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters) {
// tExprTreeDestroy(expr, NULL);
// return ret;
return TSDB_CODE_SUCCESS;
}
// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo**
......
......@@ -409,6 +409,7 @@ char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) {
// todo
return TSDB_CODE_SUCCESS;
}
void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
......
......@@ -258,6 +258,7 @@ void *tSimpleHashGet(SSHashObj *pHashObj, const void *key) {
int32_t tSimpleHashRemove(SSHashObj *pHashObj, const void *key) {
// todo
return TSDB_CODE_SUCCESS;
}
void tSimpleHashClear(SSHashObj *pHashObj) {
......
......@@ -22,6 +22,7 @@
#include "tpagedbuf.h"
#include "tsort.h"
#include "tutil.h"
#include "tcompare.h"
struct STupleHandle {
SSDataBlock* pBlock;
......@@ -123,6 +124,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
taosArrayPush(pSortHandle->pOrderedSource, &pSource);
return TSDB_CODE_SUCCESS;
}
static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) {
......@@ -164,7 +166,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
}
int32_t pageId = -1;
SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
return terrno;
}
......@@ -172,7 +174,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage->data, p);
blockDataToBuf(pPage, p);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
......@@ -184,10 +186,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
blockDataCleanup(pDataBlock);
SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
......@@ -201,8 +200,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
SExternalMemSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
code = blockDataFromBuf(pSource->src.pBlock, pPage->data);
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
code = blockDataFromBuf(pSource->src.pBlock, pPage);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -236,6 +235,7 @@ static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
}
cmpParam->numOfSources = 0;
return TSDB_CODE_SUCCESS;
}
static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
......@@ -309,6 +309,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
printf("\nafter adjust:\t");
tMergeTreePrint(pTree);
#endif
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
......@@ -392,23 +393,13 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftv = *(int32_t*)left1;
int32_t rightv = *(int32_t*)right1;
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
if (leftv == rightv) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return leftv < rightv? -1 : 1;
} else {
return leftv < rightv? 1 : -1;
}
}
}
default:
assert(0);
int ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
return ret;
}
}
return 0;
......@@ -424,13 +415,13 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort:%"PRId64", total elapsed:%"PRId64,
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed);
size_t pgSize = pHandle->pageSize;
int32_t numOfRows = (pgSize - blockDataGetSerialMetaSize(pHandle->pDataBlock))/ blockDataGetSerialRowSize(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
// blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); // useless, it is already enough
size_t numOfSorted = taosArrayGetSize(pHandle->pOrderedSource);
for(int32_t t = 0; t < sortPass; ++t) {
......@@ -469,7 +460,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
}
int32_t pageId = -1;
SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
if (pPage == NULL) {
return terrno;
}
......@@ -477,7 +468,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t size = blockDataGetSize(pDataBlock) + sizeof(int32_t) + pDataBlock->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage->data, pDataBlock);
blockDataToBuf(pPage, pDataBlock);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
......@@ -526,7 +517,6 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SGenericSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource);
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (pBlock == NULL) {
......@@ -559,8 +549,13 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize) {
pHandle->cmpParam.numOfSources = 1;
......@@ -573,8 +568,6 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
taosMemoryFreeClear(source);
}
return TSDB_CODE_SUCCESS;
......@@ -616,22 +609,22 @@ int32_t tsortOpen(SSortHandle* pHandle) {
return code;
}
code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
}
int32_t tsortClose(SSortHandle* pHandle) {
// do nothing
return TSDB_CODE_SUCCESS;
}
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
pHandle->fetchfp = fp;
return TSDB_CODE_SUCCESS;
}
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
pHandle->comparFn = fp;
return TSDB_CODE_SUCCESS;
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
......
......@@ -33,6 +33,7 @@
#include "tdef.h"
#include "trpc.h"
#include "tvariant.h"
#include "tcompare.h"
namespace {
typedef struct {
......@@ -125,24 +126,13 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
switch(pLeftColInfoData->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t leftv = *(int32_t*)left1;
int32_t rightv = *(int32_t*)right1;
if (leftv == rightv) {
break;
} else {
if (pOrder->order == TSDB_ORDER_ASC) {
return leftv < rightv? -1 : 1;
} else {
return leftv < rightv? 1 : -1;
}
}
}
default:
assert(0);
int ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
return ret;
}
}
......
......@@ -26,6 +26,7 @@ FstRegex *regexCreate(const char *str) {
memcpy(orig, str, sz);
regex->orig = orig;
return regex;
}
void regexSetup(FstRegex *regex, uint32_t size, const char *str) {
......
......@@ -132,7 +132,6 @@ void sclFreeRes(SHashObj *res) {
void sclFreeParam(SScalarParam *param) {
if (param->columnData != NULL) {
colDataDestroy(param->columnData);
taosMemoryFreeClear(param->columnData);
}
if (param->pHashFilter != NULL) {
......
......@@ -385,16 +385,19 @@ bool getTimePseudoFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
return TSDB_CODE_SUCCESS;
}
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
return TSDB_CODE_SUCCESS;
}
int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
return TSDB_CODE_SUCCESS;
}
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
......
......@@ -476,7 +476,6 @@ static SColumnInfoData* doVectorConvert(SScalarParam* pInput, int32_t* doConvert
static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
if (type == VECTOR_DO_CONVERT) {
colDataDestroy(pCol);
taosMemoryFree(pCol);
}
}
......
......@@ -258,6 +258,7 @@ _return:
SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus));
SCH_ERR_RET(code);
return TSDB_CODE_SUCCESS;
}
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
......@@ -791,7 +792,7 @@ _return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
int32_t schProcessOnDataFetched(SSchJob *job) {
void schProcessOnDataFetched(SSchJob *job) {
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
tsem_post(&job->rspSem);
}
......@@ -1150,6 +1151,7 @@ int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code)
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
return TSDB_CODE_SUCCESS;
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
......@@ -2029,7 +2031,7 @@ void schDropJobAllTasks(SSchJob *pJob) {
int32_t schCancelJob(SSchJob *pJob) {
// TODO
return TSDB_CODE_SUCCESS;
// TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
......
......@@ -436,7 +436,6 @@ int32_t compareWStrPatternMatch(const void *pLeft, const void *pRight) {
int32_t compareWStrPatternNotMatch(const void *pLeft, const void *pRight) {
return compareWStrPatternMatch(pLeft, pRight) ? 0 : 1;
}
__compar_fn_t getComparFunc(int32_t type, int32_t optr) {
__compar_fn_t comparFn = NULL;
......@@ -568,53 +567,36 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
}
__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) {
__compar_fn_t comparFn = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt8Val : compareInt8ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt8Val : compareInt8ValDesc;
case TSDB_DATA_TYPE_SMALLINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt16Val : compareInt16ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt16Val : compareInt16ValDesc;
case TSDB_DATA_TYPE_INT:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
case TSDB_DATA_TYPE_FLOAT:
comparFn = (order == TSDB_ORDER_ASC) ? compareFloatVal : compareFloatValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareFloatVal : compareFloatValDesc;
case TSDB_DATA_TYPE_DOUBLE:
comparFn = (order == TSDB_ORDER_ASC) ? compareDoubleVal : compareDoubleValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareDoubleVal : compareDoubleValDesc;
case TSDB_DATA_TYPE_UTINYINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint8Val : compareUint8ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint8Val : compareUint8ValDesc;
case TSDB_DATA_TYPE_USMALLINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint16Val : compareUint16ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint16Val : compareUint16ValDesc;
case TSDB_DATA_TYPE_UINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint32Val : compareUint32ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint32Val : compareUint32ValDesc;
case TSDB_DATA_TYPE_UBIGINT:
comparFn = (order == TSDB_ORDER_ASC) ? compareUint64Val : compareUint64ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareUint64Val : compareUint64ValDesc;
case TSDB_DATA_TYPE_BINARY:
comparFn = (order == TSDB_ORDER_ASC) ? compareLenPrefixedStr : compareLenPrefixedStrDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedStr : compareLenPrefixedStrDesc;
case TSDB_DATA_TYPE_NCHAR:
comparFn = (order == TSDB_ORDER_ASC) ? compareLenPrefixedWStr : compareLenPrefixedWStrDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareLenPrefixedWStr : compareLenPrefixedWStrDesc;
default:
comparFn = (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
break;
return (order == TSDB_ORDER_ASC) ? compareInt32Val : compareInt32ValDesc;
}
return comparFn;
}
int32_t doCompare(const char *f1, const char *f2, int32_t type, size_t size) {
......
......@@ -350,8 +350,7 @@ static void lruListMoveToFront(SList* pList, SPageInfo* pi) {
}
static SPageInfo* getPageInfoFromPayload(void* page) {
int32_t offset = offsetof(SPageInfo, pData);
char* p = (char *)page - offset;
char* p = (char *)page - POINTER_BYTES;
SPageInfo* ppi = ((SPageInfo**)p)[0];
return ppi;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册