未验证 提交 6d70f488 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18592 from taosdata/feature/3_liaohj

enh(query): optimize query perf.
......@@ -249,10 +249,11 @@ typedef struct SColumnInfoData {
typedef struct SQueryTableDataCond {
uint64_t suid;
int32_t order; // desc|asc order to iterate the data block
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo* colList;
int32_t type; // data block load type:
int32_t* pSlotList; // the column output destation slot, and it may be null
int32_t type; // data block load type:
STimeWindow twindows;
int64_t startVersion;
int64_t endVersion;
......
......@@ -41,9 +41,9 @@ typedef struct SBlockOrderInfo {
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
#define colDataSetNotNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \
#define colDataClearNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) &= ((char)(~(1u << (7u - BitPos(r_))))); \
} while (0)
#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1)
......@@ -151,9 +151,6 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui
for (int32_t i = start; i < start + nRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
}
int32_t bytes = pColumnInfoData->info.bytes;
memset(pColumnInfoData->pData + start * bytes, 0, nRows * bytes);
}
pColumnInfoData->hasNull = true;
......@@ -234,9 +231,11 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
int32_t blockDataEnsureCapacityNoClear(SSDataBlock* pDataBlock, uint32_t numOfRows);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock);
void blockDataEmpty(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
......
......@@ -278,11 +278,9 @@ typedef struct {
#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
#define IS_VALID_INT(_t) ((_t) >= INT32_MIN && (_t) <= INT32_MAX)
#define IS_VALID_BIGINT(_t) ((_t) >= INT64_MIN && (_t) <= INT64_MAX)
#define IS_VALID_UTINYINT(_t) ((_t) >= 0 && (_t) <= UINT8_MAX)
#define IS_VALID_USMALLINT(_t) ((_t) >= 0 && (_t) <= UINT16_MAX)
#define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) <= UINT32_MAX)
#define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) <= UINT64_MAX)
#define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX)
#define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX)
......
......@@ -137,22 +137,22 @@ typedef struct SqlFunctionCtx {
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries;
SPoint1 start;
SPoint1 end;
SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp;
struct SExprInfo *pExpr;
struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
SSerializeDataHandle saveHandle;
bool isStream;
char udfName[TSDB_FUNC_NAME_LEN];
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput;
int32_t offset;
SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries;
SPoint1 start;
SPoint1 end;
SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp;
struct SExprInfo *pExpr;
struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
SSerializeDataHandle saveHandle;
char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx;
typedef struct tExprNode {
......@@ -183,7 +183,6 @@ struct SScalarParam {
};
void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell);
//int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock);
bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry);
......@@ -195,32 +194,6 @@ typedef struct SPoint {
int32_t taosGetLinearInterpolationVal(SPoint *point, int32_t outputType, SPoint *point1, SPoint *point2,
int32_t inputType);
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api
/**
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
* @return error code
*/
int32_t udfcOpen();
/**
* destroy udfd proxy
* @return error code
*/
int32_t udfcClose();
/**
* start udfd that serves udf function invocation under dnode startDnodeId
* @param startDnodeId
* @return
*/
int32_t udfStartUdfd(int32_t startDnodeId);
/**
* stop udfd
* @return
*/
int32_t udfStopUdfd();
#ifdef __cplusplus
}
#endif
......
......@@ -85,6 +85,32 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
int32_t cleanUpUdfs();
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api
/**
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
* @return error code
*/
int32_t udfcOpen();
/**
* destroy udfd proxy
* @return error code
*/
int32_t udfcClose();
/**
* start udfd that serves udf function invocation under dnode startDnodeId
* @param startDnodeId
* @return
*/
int32_t udfStartUdfd(int32_t startDnodeId);
/**
* stop udfd
* @return
*/
int32_t udfStopUdfd();
#ifdef __cplusplus
}
#endif
......
......@@ -1137,14 +1137,15 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
}
void blockDataCleanup(SSDataBlock* pDataBlock) {
blockDataEmpty(pDataBlock);
SDataBlockInfo* pInfo = &pDataBlock->info;
pInfo->rows = 0;
pInfo->id.uid = 0;
pInfo->id.groupId = 0;
pInfo->window.ekey = 0;
pInfo->window.skey = 0;
}
void blockDataEmpty(SSDataBlock* pDataBlock) {
SDataBlockInfo* pInfo = &pDataBlock->info;
ASSERT(pInfo->rows <= pDataBlock->info.capacity);
if (pInfo->capacity == 0) {
return;
}
......@@ -1154,6 +1155,10 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
colInfoDataCleanup(p, pInfo->capacity);
}
pInfo->rows = 0;
pInfo->window.ekey = 0;
pInfo->window.skey = 0;
}
// todo temporarily disable it
......@@ -1249,6 +1254,25 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
return TSDB_CODE_SUCCESS;
}
int32_t blockDataEnsureCapacityNoClear(SSDataBlock* pDataBlock, uint32_t numOfRows) {
int32_t code = 0;
if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) {
return TSDB_CODE_SUCCESS;
}
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false);
if (code) {
return code;
}
}
pDataBlock->info.capacity = numOfRows;
return TSDB_CODE_SUCCESS;
}
void blockDataFreeRes(SSDataBlock* pBlock) {
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
......@@ -1621,6 +1645,8 @@ static int32_t colDataMoveVarData(SColumnInfoData* pColInfoData, size_t start, s
static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_t total) {
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, n, total);
// clear the offset value of the unused entries.
memset(&pColInfoData->varmeta.offset[total - n], 0, n);
} else {
int32_t bytes = pColInfoData->info.bytes;
......@@ -1635,7 +1661,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) {
}
if (pBlock->info.rows <= n) {
blockDataCleanup(pBlock);
blockDataEmpty(pBlock);
} else {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -1652,12 +1678,22 @@ static void colDataKeepFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pColInfoData->varmeta.length = colDataMoveVarData(pColInfoData, 0, n);
memset(&pColInfoData->varmeta.offset[n], 0, total - n);
} else { // reset the bitmap value
/*int32_t stopIndex = BitmapLen(n) * 8;
for(int32_t i = n; i < stopIndex; ++i) {
colDataClearNull_f(pColInfoData->nullbitmap, i);
}
int32_t remain = BitmapLen(total) - BitmapLen(n);
if (remain > 0) {
memset(pColInfoData->nullbitmap+BitmapLen(n), 0, remain);
}*/
}
}
int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n) {
if (n == 0) {
blockDataCleanup(pBlock);
blockDataEmpty(pBlock);
return TSDB_CODE_SUCCESS;
}
......
......@@ -39,7 +39,7 @@
#include "sync.h"
#include "wal.h"
#include "libs/function/function.h"
#include "libs/function/tudf.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -169,20 +169,19 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
STsdbReader **ppReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockSMA, bool *allHave);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
uint64_t suid, void **pReader);
......
......@@ -198,7 +198,7 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
}
}
static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -307,7 +307,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, NULL);
return pOperator;
_error:
......@@ -570,13 +570,10 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
tsem_post(&pExchangeInfo->ready);
return TSDB_CODE_SUCCESS;
}
......
......@@ -1241,6 +1241,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
}
}
// set the output flag for each column in SColMatchInfo, according to the
*numOfOutputCols = 0;
int32_t num = LIST_LENGTH(pOutputNodeList->pSlots);
for (int32_t i = 0; i < num; ++i) {
......@@ -1537,8 +1538,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pExpr->base.numOfParams;
pCtx->isStream = false;
pCtx->param = pFunct->pParam;
pCtx->saveHandle.currentPage = -1;
}
......@@ -1602,20 +1601,22 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t)*pCond->numOfCols);
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
taosMemoryFreeClear(pCond->colList);
taosMemoryFreeClear(pCond->pSlotList);
return terrno;
}
// pCond->twindow = pTableScanNode->scanRange;
// TODO: get it from stable scan node
pCond->twindows = pTableScanNode->scanRange;
pCond->suid = pTableScanNode->scan.suid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
// pCond->type = pTableScanNode->scanFlag;
int32_t j = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
......@@ -1628,6 +1629,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->colList[j].type = pColNode->node.resType.type;
pCond->colList[j].bytes = pColNode->node.resType.bytes;
pCond->colList[j].colId = pColNode->colId;
pCond->pSlotList[j] = pNode->slotId;
j += 1;
}
......@@ -1635,7 +1638,10 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
return TSDB_CODE_SUCCESS;
}
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) { taosMemoryFreeClear(pCond->colList); }
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond) {
taosMemoryFreeClear(pCond->colList);
taosMemoryFreeClear(pCond->pSlotList);
}
int32_t convertFillType(int32_t mode) {
int32_t type = TSDB_FILL_NONE;
......@@ -1965,7 +1971,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond,
struct SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
int64_t st = taosGetTimestampUs();
const char* idStr = GET_TASKID(pTaskInfo);
......
......@@ -971,21 +971,27 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = pMtInfo->schema->nCols;
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
taosMemoryFreeClear(pCond->colList);
taosMemoryFreeClear(pCond->pSlotList);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->twindows = TSWINDOW_INITIALIZER;
pCond->suid = pMtInfo->suid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = sContext->snapVersion;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
pCond->colList[i].type = pMtInfo->schema->pSchema[i].type;
pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes;
pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId;
SColumnInfo* pColInfo = &pCond->colList[i];
pColInfo->type = pMtInfo->schema->pSchema[i].type;
pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
pCond->pSlotList[i] = i;
}
return TSDB_CODE_SUCCESS;
......@@ -1078,7 +1084,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
&pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->base.dataReader == NULL) {
ASSERT(0);
}
......@@ -1130,7 +1136,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
ASSERT(size == 1);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
......
......@@ -1164,8 +1164,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// }
//
// tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
//
// if (pQueryAttr->limit.offset > blockInfo.rows) {
// pQueryAttr->limit.offset -= blockInfo.rows;
// pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? blockInfo.window.ekey : blockInfo.window.skey;
......@@ -1641,7 +1639,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
pAggSup->currentPageId = -1;
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
pAggSup->pResultRowHashTable = tSimpleHashInit(100, hashFn);
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -223,10 +223,8 @@ static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsA
}
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
bool allColumnsHaveAgg = true;
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pBlock->pBlockAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -235,6 +233,7 @@ static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
return false;
}
#if 0
// if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
......@@ -255,6 +254,7 @@ static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock,
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
}
#endif
return true;
}
......@@ -284,7 +284,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) {
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
pBlock->info.rows = 0;
blockDataEmpty(pBlock);
qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id);
} else {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
......@@ -384,12 +384,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1;
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (pCols == NULL) {
SSDataBlock* p = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (p == NULL) {
return terrno;
}
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
ASSERT(p == pBlock);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
// restore the previous value
......@@ -637,16 +637,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
blockDataCleanup(pBlock);
SDataBlockInfo* pBInfo = &pBlock->info;
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(pTableScanInfo->base.dataReader, &rows, &pBInfo->id.uid, &pBInfo->window);
blockDataEnsureCapacity(pBlock, rows); // todo remove it latter
pBInfo->rows = rows;
ASSERT(pBInfo->id.uid != 0);
ASSERT(pBlock->info.id.uid != 0);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
uint32_t status = 0;
......@@ -777,7 +768,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -878,11 +869,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->base.scanFlag = MAIN_SCAN;
pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode);
pInfo->base.readHandle = *readHandle;
pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
......@@ -993,10 +984,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo;
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
blockDataCleanup(pBlock);
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader,
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
......@@ -1004,21 +993,10 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
return NULL;
}
bool hasBlock = tsdbNextDataBlock(pReader);
if (hasBlock) {
SDataBlockInfo* pBInfo = &pBlock->info;
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->id.uid, &pBInfo->window);
SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL);
blockDataEnsureCapacity(pBlock, rows);
pBlock->info.rows = rows;
relocateColumnData(pBlock, pTableScanInfo->base.matchInfo.pList, pCols, true);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->id.uid);
if (tsdbNextDataBlock(pReader)) {
/*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL);
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
tsdbReaderClose(pReader);
......@@ -2027,20 +2005,13 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pBlock = &pInfo->pRes;
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.id.uid, &pBlock->info.window);
pBlock->info.rows = rows;
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
pBlock->pDataBlock = pCols;
if (pCols == NULL) {
SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
if (pBlock == NULL) {
longjmp(pTaskInfo->env, terrno);
}
......@@ -2286,7 +2257,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
if (pHandle->initTableReader) {
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
pTSInfo->base.dataReader = NULL;
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, &pTSInfo->base.dataReader, NULL);
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL);
if (code != 0) {
terrno = code;
destroyTableScanOperatorInfo(pTableScanOp);
......@@ -2510,17 +2481,14 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t readIdx = source->readerIdx;
SSDataBlock* pBlock = source->inputBlock;
STableMergeScanInfo* pTableScanInfo = pOperator->info;
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
blockDataCleanup(pBlock);
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2532,18 +2500,11 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
bool processThisBlock = processBlockWithProbability(&pInfo->sample);
if (!processThisBlock) {
continue;
}
blockDataCleanup(pBlock);
int32_t rows = 0;
tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.id.uid, &pBlock->info.window);
blockDataEnsureCapacity(pBlock, rows);
pBlock->info.rows = rows;
if (pQueryCond->order == TSDB_ORDER_ASC) {
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
} else {
......@@ -2551,7 +2512,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
uint32_t status = 0;
loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
......@@ -2565,7 +2526,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
pOperator->resultInfo.totalRows += pBlock->info.rows;
pTableScanInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
tsdbReaderClose(pInfo->base.dataReader);
pInfo->base.dataReader = NULL;
......@@ -2645,6 +2606,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
param.readerIdx = i;
param.pOperator = pOperator;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
taosArrayPush(pInfo->sortSourceParams, &param);
SQueryTableDataCond cond;
......
......@@ -559,7 +559,7 @@ typedef struct SMultiwayMergeOperatorInfo {
STupleHandle* prefetchedTuple;
} SMultiwayMergeOperatorInfo;
int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -577,9 +577,15 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SOperatorInfo* pDownstream = pOperator->pDownstream[i];
if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
pDownstream->fpSet._openFn(pDownstream);
}
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i];
ps->param = pDownstream;
ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps);
}
......@@ -714,7 +720,6 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
}
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
......@@ -781,7 +786,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 4096);
initResultSizeInfo(&pOperator->resultInfo, 1024);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
pInfo->groupSort = pMergePhyNode->groupSort;
......@@ -792,7 +797,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL,
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
code = appendDownstream(pOperator, downStreams, numStreams);
......
......@@ -1874,78 +1874,80 @@ static void destroyBlockDistScanOperatorInfo(void* param) {
}
static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = 1;
pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t));
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return terrno;
}
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->colList->colId = 1;
pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP;
pCond->colList->bytes = sizeof(TSKEY);
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
pCond->pSlotList[0] = 0;
return TSDB_CODE_SUCCESS;
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = uid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = -1;
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode,
SExecTaskInfo* pTaskInfo) {
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
{
SQueryTableDataCond cond = {0};
pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pResBlock, 1);
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
{
SQueryTableDataCond cond = {0};
int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
}
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;
}
}
pInfo->readHandle = *readHandle;
pInfo->uid = pBlockScanNode->suid;
pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pInfo->pResBlock, 1);
pInfo->readHandle = *readHandle;
pInfo->uid = pBlockScanNode->suid;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols);
int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator;
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
\ No newline at end of file
......@@ -45,8 +45,18 @@ static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowInd
if (pCol->notFillCol) {
bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfo, rowIndex);
if (!filled) {
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
SGroupKeys* pKey = taosArrayGet(p, i);
SRowVal* p = NULL;
if (FILL_IS_ASC_FILL(pFillInfo)) {
if (pFillInfo->prev.key != 0) {
p = &pFillInfo->prev; // prev has been set value
} else { // otherwise, use the value in the next row
p = &pFillInfo->next;
}
} else {
p = &pFillInfo->next;
}
SGroupKeys* pKey = taosArrayGet(p->pRowVal, i);
doSetVal(pDstColInfo, rowIndex, pKey);
}
} else {
......@@ -246,7 +256,10 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal) {
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) {
......@@ -257,7 +270,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
char* p = colDataGetData(pSrcCol, rowIndex);
saveColData(pRow, i, p, isNull);
saveColData(pRowVal->pRowVal, i, p, isNull);
} else {
ASSERT(0);
}
......@@ -281,7 +294,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
// set the next value for interpolation
if ((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) {
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pFillInfo->next.pRowVal);
copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, &pFillInfo->next);
}
if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
......@@ -303,7 +316,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) {
int32_t nextRowIndex = pFillInfo->index + 1;
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, pFillInfo->next.pRowVal);
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next);
}
// copy rows to dst buffer
......@@ -319,6 +332,9 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
colDataAppend(pDst, index, src, false);
saveColData(pFillInfo->prev.pRowVal, i, src, false);
if (pFillInfo->srcTsSlotId == dstSlotId) {
pFillInfo->prev.key = *(int64_t*)src;
}
} else { // the value is null
if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
......
......@@ -1276,7 +1276,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
return NULL;
}
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
......@@ -1649,23 +1648,34 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo) {
// the primary timestamp column
bool needed = false;
pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
{ // ts column
SColumn c = {0};
c.colId = 1;
c.slotId = pInfo->primaryTsIndex;
c.type = TSDB_DATA_TYPE_TIMESTAMP;
c.bytes = sizeof(int64_t);
taosArrayPush(pInfo->pInterpCols, &c);
for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = pCtx[i].pExpr;
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
needed = true;
break;
}
}
if (needed) {
pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
SGroupKeys key = {0};
key.bytes = c.bytes;
key.type = c.type;
key.isNull = true; // to denote no value is assigned yet
key.pData = taosMemoryCalloc(1, c.bytes);
taosArrayPush(pInfo->pPrevValues, &key);
{ // ts column
SColumn c = {0};
c.colId = 1;
c.slotId = pInfo->primaryTsIndex;
c.type = TSDB_DATA_TYPE_TIMESTAMP;
c.bytes = sizeof(int64_t);
taosArrayPush(pInfo->pInterpCols, &c);
SGroupKeys key;
key.bytes = c.bytes;
key.type = c.type;
key.isNull = true; // to denote no value is assigned yet
key.pData = taosMemoryCalloc(1, c.bytes);
taosArrayPush(pInfo->pPrevValues, &key);
}
}
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -1676,7 +1686,6 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
SColumn c = *pParam->pCol;
taosArrayPush(pInfo->pInterpCols, &c);
needed = true;
SGroupKeys key = {0};
key.bytes = c.bytes;
......@@ -1708,7 +1717,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSuppor
void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
for (int32_t i = 0; i < numOfExpr; i++) {
pCtx[i].isStream = true;
// pCtx[i].isStream = true;
}
}
......@@ -1727,7 +1736,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo->primaryTsIndex = ((SColumnNode*)pPhyNode->window.pTspk)->slotId;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
initResultSizeInfo(&pOperator->resultInfo, 512);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num);
......@@ -1773,11 +1783,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
goto _error;
}
if (isStream) {
ASSERT(num > 0);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, pInfo);
if (pInfo->timeWindowInterpo) {
......@@ -4314,7 +4319,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(&pOperator->resultInfo, 4096);
initResultSizeInfo(&pOperator->resultInfo, 512);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
......
......@@ -34,14 +34,12 @@ struct SSortHandle {
int32_t pageSize;
int32_t numOfPages;
SDiskbasedBuf* pBuf;
SArray* pSortInfo;
SArray* pOrderedSource;
int32_t loops;
uint64_t sortElapsed;
int64_t startTs;
uint64_t totalElapsed;
SArray* pSortInfo;
SArray* pOrderedSource;
int32_t loops;
uint64_t sortElapsed;
int64_t startTs;
uint64_t totalElapsed;
int32_t sourceId;
SSDataBlock* pDataBlock;
......@@ -99,9 +97,9 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
}
static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
// NOTICE: pSource may be, if it is SORT_MULTISOURCE_MERGE
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource =
cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE
SSortSource* pSource = cmpParam->pSources[i];
blockDataDestroy(pSource->src.pBlock);
taosMemoryFreeClear(pSource);
}
......@@ -231,15 +229,15 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
}
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32_t startIndex, int32_t endIndex,
SSortHandle* pHandle) {
cmpParam->pSources = taosArrayGet(pSources, startIndex);
cmpParam->numOfSources = (endIndex - startIndex + 1);
pParam->pSources = taosArrayGet(pSources, startIndex);
pParam->numOfSources = (endIndex - startIndex + 1);
int32_t code = 0;
......@@ -247,7 +245,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
if (pHandle->pBuf == NULL) {
if (!osTempSpaceAvailable()) {
code = TSDB_CODE_NO_AVAIL_DISK;
qError("Sort compare init failed since %s", terrstr(code));
qError("Sort compare init failed since %s, %s", terrstr(code), pHandle->idStr);
return code;
}
......@@ -260,12 +258,12 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
}
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
for (int32_t i = 0; i < pParam->numOfSources; ++i) {
SSortSource* pSource = pParam->pSources[i];
// set current source is done
if (taosArrayGetSize(pSource->pageIdList) == 0) {
setCurrentSourceIsDone(pSource, pHandle);
setCurrentSourceDone(pSource, pHandle);
continue;
}
......@@ -280,15 +278,21 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
releaseBufPage(pHandle->pBuf, pPage);
}
} else {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
qDebug("start init for the multiway merge sort, %s", pHandle->idStr);
int64_t st = taosGetTimestampUs();
for (int32_t i = 0; i < pParam->numOfSources; ++i) {
SSortSource* pSource = pParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
// set current source is done
if (pSource->src.pBlock == NULL) {
setCurrentSourceIsDone(pSource, pHandle);
setCurrentSourceDone(pSource, pHandle);
}
}
int64_t et = taosGetTimestampUs();
qDebug("init for merge sort completed, elapsed time:%.2f ms, %s", (et - st) / 1000.0, pHandle->idStr);
}
return code;
......
......@@ -1543,9 +1543,44 @@ void vectorBitOr(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex,
int32_t numOfRows, int32_t step, __compar_fn_t fp, int32_t optr) {
int32_t num = 0;
bool * pRes = (bool *)pOut->columnData->pData;
bool *pRes = (bool *)pOut->columnData->pData;
if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_JSON || GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_JSON) {
if (IS_MATHABLE_TYPE(GET_PARAM_TYPE(pLeft)) && IS_MATHABLE_TYPE(GET_PARAM_TYPE(pRight))) {
if (!(pLeft->columnData->hasNull || pRight->columnData->hasNull)) {
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
if (pRes[i]) {
++num;
}
}
} else {
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
if (colDataIsNull_f(pLeft->columnData->nullbitmap, leftIndex) ||
colDataIsNull_f(pRight->columnData->nullbitmap, rightIndex)) {
pRes[i] = false;
continue;
}
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
if (pRes[i]) {
++num;
}
}
}
} else {
// if (GET_PARAM_TYPE(pLeft) == TSDB_DATA_TYPE_JSON || GET_PARAM_TYPE(pRight) == TSDB_DATA_TYPE_JSON) {
for (int32_t i = startIndex; i < numOfRows && i >= startIndex; i += step) {
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
......@@ -1556,8 +1591,8 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
continue;
}
char * pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char * pRightData = colDataGetData(pRight->columnData, rightIndex);
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
int64_t leftOut = 0;
int64_t rightOut = 0;
bool freeLeft = false;
......@@ -1592,25 +1627,6 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
taosMemoryFreeClear(pRightData);
}
}
} else {
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
int32_t leftIndex = (i >= pLeft->numOfRows) ? 0 : i;
int32_t rightIndex = (i >= pRight->numOfRows) ? 0 : i;
if (colDataIsNull_s(pLeft->columnData, leftIndex) ||
colDataIsNull_s(pRight->columnData, rightIndex)) {
pRes[i] = false;
continue;
}
char *pLeftData = colDataGetData(pLeft->columnData, leftIndex);
char *pRightData = colDataGetData(pRight->columnData, rightIndex);
pRes[i] = filterDoCompare(fp, optr, pLeftData, pRightData);
if (pRes[i]) {
++num;
}
}
}
return num;
......@@ -1766,7 +1782,7 @@ void vectorIsTrue(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
if (colDataIsNull_s(pOut->columnData, i)) {
int8_t v = 0;
colDataAppendInt8(pOut->columnData, i, &v);
colDataSetNotNull_f(pOut->columnData->nullbitmap, i);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
}
pOut->columnData->hasNull = false;
......
......@@ -244,7 +244,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
capacity = 4;
}
SHashObj *pHashObj = (SHashObj *)taosMemoryCalloc(1, sizeof(SHashObj));
SHashObj *pHashObj = (SHashObj *)taosMemoryMalloc(sizeof(SHashObj));
if (pHashObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -264,7 +264,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashList = (SHashEntry **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *));
pHashObj->hashList = (SHashEntry **)taosMemoryMalloc(pHashObj->capacity * sizeof(void *));
if (pHashObj->hashList == NULL) {
taosMemoryFree(pHashObj);
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -279,7 +279,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
return NULL;
}
void *p = taosMemoryCalloc(pHashObj->capacity, sizeof(SHashEntry));
void *p = taosMemoryMalloc(pHashObj->capacity * sizeof(SHashEntry));
if (p == NULL) {
taosArrayDestroy(pHashObj->pMemBlock);
taosMemoryFree(pHashObj->hashList);
......@@ -290,6 +290,9 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = (void *)((char *)p + i * sizeof(SHashEntry));
pHashObj->hashList[i]->num = 0;
pHashObj->hashList[i]->latch = 0;
pHashObj->hashList[i]->next = NULL;
}
taosArrayPush(pHashObj->pMemBlock, &p);
......
......@@ -507,7 +507,9 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
dBufPrintStatis(pBuf);
bool needRemoveFile = false;
if (pBuf->pFile != NULL) {
needRemoveFile = true;
uDebug(
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
"size:%.2f Kb, %s\n",
......@@ -534,9 +536,13 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
}
}
if (taosRemoveFile(pBuf->path) < 0) {
uDebug("WARNING tPage remove file failed. path=%s", pBuf->path);
if (needRemoveFile) {
int32_t ret = taosRemoveFile(pBuf->path);
if (ret != 0) { // print the error and discard this error info
uDebug("WARNING tPage remove file failed. path=%s, code:%s", pBuf->path, strerror(errno));
}
}
taosMemoryFreeClear(pBuf->path);
size_t n = taosArrayGetSize(pBuf->pIdList);
......
......@@ -78,8 +78,8 @@ $ts2 = $tb2 . .ts
print ===============================groupby_operation
print
print ==== select count(*), c1 from group_tb0 group by c1
sql select count(*), c1 from group_tb0 group by c1
print ==== select count(*), c1 from group_tb0 group by c1 order by c1
sql select count(*), c1 from group_tb0 group by c1 order by c1
print rows: $rows
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
......@@ -98,18 +98,18 @@ endi
if $data90 != 10 then
return -1
endi
if $data01 != 7 then
if $data01 != 0 then
return -1
endi
if $data11 != 6 then
if $data11 != 1 then
return -1
endi
if $data91 != 3 then
if $data91 != 9 then
return -1
endi
print ==== select first(ts),c1 from group_tb0 group by c1;
sql select first(ts),c1 from group_tb0 group by c1;
print ==== select first(ts),c1 from group_tb0 group by c1 order by c1;
sql select first(ts),c1 from group_tb0 group by c1 order by c1;
print rows: $rows
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
......@@ -120,16 +120,16 @@ if $row != 10 then
return -1
endi
if $data00 != @22-01-01 00:00:00.007@ then
if $data00 != @22-01-01 00:00:00.000@ then
return -1
endi
if $data01 != 7 then
if $data01 != 0 then
return -1
endi
if $data90 != @22-01-01 00:00:00.003@ then
if $data90 != @22-01-01 00:00:00.009@ then
return -1
endi
if $data91 != 3 then
if $data91 != 9 then
return -1
endi
......
......@@ -70,10 +70,10 @@ sql select _wstart, t1,t1,count(*),t1,t1 from lr_stb0 where ts>'2018-09-24 00:00
if $row != 2 then
return -1
endi
if $data01 != NULL then
if $data01 != 8 then
return -1
endi
if $data02 != NULL then
if $data02 != 8 then
return -1
endi
if $data03 != NULL then
......
......@@ -121,12 +121,13 @@ if $data01 != 152.420471066 then
return -1
endi
sql select udf2(f2) from udf.t2 group by 1-udf1(f1);
sql select udf2(f2) from udf.t2 group by 1-udf1(f1) order by 1-udf1(f1)
print $rows , $data00 , $data10
if $rows != 2 then
return -1
endi
if $data00 != 2.000000000 then
print expect 2.000000000 , actual: $data00
return -1
endi
if $data10 != 12.083045974 then
......
......@@ -429,10 +429,10 @@ class TDTestCase:
tdSql.checkRows(2)
# nest query
tdSql.query(f"select unique(c1) from (select _rowts , t1 ,c1 , tbname from {dbname}.stb1 ) ")
tdSql.query(f"select unique(c1) v from (select _rowts , t1 ,c1 , tbname from {dbname}.stb1 ) order by v")
tdSql.checkRows(11)
tdSql.checkData(0,0,6)
tdSql.checkData(10,0,3)
tdSql.checkData(1,0,0)
tdSql.checkData(10,0,9)
tdSql.query(f"select unique(t1) from (select _rowts , t1 , tbname from {dbname}.stb1 )")
tdSql.checkRows(2)
tdSql.checkData(0,0,4)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册