提交 2313ebc6 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/row_optimize

......@@ -303,7 +303,7 @@ def pre_test_build_win() {
set CL=/MP8
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake"
time /t
cmake .. -G "NMake Makefiles JOM" -DBUILD_TEST=true || exit 7
cmake .. -G "NMake Makefiles JOM" -DBUILD_TEST=true -DBUILD_TOOLS=true || exit 7
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6"
time /t
jom -j 6 || exit 8
......
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e00ebd9
GIT_TAG efa2a5f
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -20,7 +20,18 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
<DocCardList items={useCurrentSidebarCategory().items}/>
```
### Join TDengine Community
## Study TDengine Knowledge Map
The TDengine Knowledge Map covers the various knowledge points of TDengine, revealing the invocation relationships and data flow between various conceptual entities. Learning and understanding the TDengine Knowledge Map will help you quickly master the TDengine knowledge system.
<figure>
<center>
<a href="pathname:///img/tdengine-map.svg" target="_blank"><img src="/img/tdengine-map.svg" width="80%" /></a>
<figcaption>Diagram 1. TDengine Knowledge Map</figcaption>
</center>
</figure>
## Join TDengine Community
<table width="100%">
<tr align="center" style={{border:0}}>
......
......@@ -301,7 +301,7 @@ SELECT TIMEZONE();
### Syntax
```txt
WHERE (column|tbname) **match/MATCH/nmatch/NMATCH** _regex_
WHERE (column|tbname) match/MATCH/nmatch/NMATCH _regex_
```
### Specification
......
......@@ -4,6 +4,7 @@ from taos.tmq import *
conn = taos.connect()
print("init")
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
conn.select_db("py_tmq")
......@@ -15,7 +16,6 @@ conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
print("create topic")
conn.execute("drop topic if exists topic_ctb_column")
conn.execute(
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
)
......
......@@ -302,7 +302,7 @@ SELECT TIMEZONE();
### 语法
```txt
WHERE (column|tbname) **match/MATCH/nmatch/NMATCH** _regex_
WHERE (column|tbname) match/MATCH/nmatch/NMATCH _regex_
```
### 正则表达式规范
......
......@@ -221,12 +221,12 @@ if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" =
# community-version compile
cmake ../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
elif [ "$verMode" == "cloud" ]; then
cmake ../../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DBUILD_CLOUD=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
cmake ../../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DBUILD_TAOSX=true -DBUILD_CLOUD=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
elif [ "$verMode" == "cluster" ]; then
if [[ "$dbName" != "taos" ]]; then
replace_enterprise_$dbName
fi
cmake ../../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
cmake ../../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DBUILD_TAOSX=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
fi
else
echo "input cpuType=${cpuType} error!!!"
......
......@@ -1106,6 +1106,8 @@ int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId
return terrno;
}
pRequest->syncQuery = true;
STscObj *pTscObj = pRequest->pTscObj;
SCatalog *pCtg = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
......
......@@ -36,7 +36,7 @@ int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t num
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
} else {
return pColumnInfoData->info.bytes * numOfRows + BitmapLen(numOfRows);
return ((pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) ? 0 : pColumnInfoData->info.bytes * numOfRows) + BitmapLen(numOfRows);
}
}
......@@ -1084,8 +1084,6 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
return 0;
}
int32_t varColSort(SColumnInfoData* pColumnInfoData, SBlockOrderInfo* pOrder) { return 0; }
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) {
// Allocate the additional buffer.
int64_t p0 = taosGetTimestampUs();
......@@ -1962,6 +1960,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
memset(pBuf, 0, sizeof(pBuf));
char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
dataSize = TMIN(dataSize, 50);
memcpy(pBuf, varDataVal(pData), dataSize);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf;
......
......@@ -148,8 +148,8 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype);
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
return terrno != 0 ? terrno : -1;
}
......
......@@ -203,10 +203,13 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
goto _err;
}
SBatchDeleteReq deleteReq;
SBatchDeleteReq deleteReq = {0};
SSubmitReq *pSubmitReq =
tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, &pTsmaStat->pTSma->schemaTag, true,
pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
// TODO deleteReq
taosArrayDestroy(deleteReq.deleteReqs);
if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
......
......@@ -308,9 +308,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
}
if (vnodeIsRoleLeader(pTq->pVnode)) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -236,6 +236,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
pDeleteReq->suid = suid;
pDeleteReq->deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, pDeleteReq);
continue;
}
......
......@@ -287,12 +287,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
hasRes = true;
p->ts = pColVal->ts;
uint8_t* px = p->colVal.value.pData;
p->colVal = pColVal->colVal;
if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
p->colVal = pColVal->colVal;
} else {
if (COL_VAL_IS_VALUE(&pColVal->colVal)) {
memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
}
if (COL_VAL_IS_VALUE(&pColVal->colVal) && IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
p->colVal.value.pData = px;
memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
p->colVal.value.nData = pColVal->colVal.value.nData;
p->colVal.type = pColVal->colVal.type;
p->colVal.flag = pColVal->colVal.flag;
p->colVal.cid = pColVal->colVal.cid;
}
}
}
......
......@@ -305,7 +305,7 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SHashObj* pTableMap =
......@@ -315,10 +315,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
}
int64_t st = taosGetTimestampUs();
initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
initBlockScanInfoBuf(pBuf, numOfTables);
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
pScanInfo->uid = idList[j].uid;
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey;
......@@ -3785,9 +3785,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader;
pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables);
pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(pReader);
tsdbReaderClose(p);
*ppReader = NULL;
code = TSDB_CODE_TDB_OUT_OF_MEMORY;
......
......@@ -2,10 +2,6 @@ aux_source_directory(src EXECUTOR_SRC)
#add_library(executor ${EXECUTOR_SRC})
add_library(executor STATIC ${EXECUTOR_SRC})
#set_target_properties(executor PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libexecutor.a"
# INTERFACE_INCLUDE_DIRECTORIES "${TD_SOURCE_DIR}/include/libs/executor"
# )
target_link_libraries(executor
PRIVATE os util common function parser planner qcom vnode scalar nodes index stream
......
......@@ -235,16 +235,6 @@ typedef enum {
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef struct SSourceDataInfo {
int32_t index;
SRetrieveTableRsp* pRsp;
uint64_t totalRows;
int64_t startTime;
int32_t code;
EX_SOURCE_STATUS status;
const char* taskId;
} SSourceDataInfo;
typedef struct SLoadRemoteDataInfo {
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
......@@ -371,23 +361,8 @@ typedef struct STagScanInfo {
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableList;
} STagScanInfo;
typedef struct SLastrowScanInfo {
SSDataBlock* pRes;
SReadHandle readHandle;
void* pLastrowReader;
SColMatchInfo matchInfo;
int32_t* pSlotIds;
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock* pBufferredRes;
SArray* pUidList;
int32_t indexOfBufferedRes;
} SLastrowScanInfo;
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
......@@ -504,40 +479,6 @@ typedef struct {
SSnapContext* sContext;
} SStreamRawScanInfo;
typedef struct SSysTableIndex {
int8_t init;
SArray* uids;
int32_t lastIdx;
} SSysTableIndex;
typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp;
SRetrieveTableReq req;
SEpSet epSet;
tsem_t ready;
SReadHandle readHandle;
int32_t accountId;
const char* pUser;
bool sysInfo;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
SMTbCursor* pCur; // cursor for iterate the local table meta store.
SSysTableIndex* pIdx; // idx for local table meta
SColMatchInfo matchInfo;
SName name;
SSDataBlock* pRes;
int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo;
} SSysTableScanInfo;
typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
STsdbReader* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
} SBlockDistInfo;
// todo remove this
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
......@@ -603,24 +544,6 @@ typedef struct SAggOperatorInfo {
SExprSupp scalarExprSup;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SLimitInfo limitInfo;
bool mergeDataBlocks;
SSDataBlock* pFinalRes;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprSupp scalarSup;
uint64_t groupId;
SSDataBlock* pNextGroupRes;
} SIndefOperatorInfo;
typedef struct SFillOperatorInfo {
struct SFillInfo* pFillInfo;
SSDataBlock* pRes;
......@@ -638,42 +561,12 @@ typedef struct SFillOperatorInfo {
SExprSupp noFillExprSupp;
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
} SGroupbyOperatorInfo;
typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
} SDataGroupInfo;
// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SExprSupp scalarSup;
} SPartitionOperatorInfo;
typedef struct SWindowRowsSup {
STimeWindow win;
TSKEY prevTs;
......@@ -754,6 +647,23 @@ typedef struct SStreamPartitionOperatorInfo {
SSDataBlock* pDelRes;
} SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
typedef struct SStreamFillOperatorInfo {
SStreamFillSupporter* pFillSup;
SSDataBlock* pRes;
......@@ -800,33 +710,6 @@ typedef struct SStateWindowOperatorInfo {
STimeWindowAggSupp twAggSup;
} SStateWindowOperatorInfo;
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SColMatchInfo matchInfo;
int32_t bufPageSize;
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
} SSortOperatorInfo;
typedef struct SJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
int32_t inputOrder;
SSDataBlock* pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock* pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
} SJoinOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
......@@ -850,7 +733,6 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
......@@ -880,9 +762,6 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts);
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);
SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
......@@ -965,9 +844,8 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order,
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t getMaximumIdleDurationSec();
......@@ -995,9 +873,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
......
......@@ -111,22 +111,6 @@ typedef struct SStreamFillInfo {
int32_t delIndex;
} SStreamFillInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
......
......@@ -36,12 +36,13 @@ typedef struct SMultiMergeSource {
typedef struct SSortSource {
SMultiMergeSource src;
union {
struct {
SArray* pageIdList;
int32_t pageIndex;
};
struct {
SArray* pageIdList;
int32_t pageIndex;
};
struct {
void* param;
bool onlyRef;
};
} SSortSource;
......
......@@ -25,15 +25,29 @@
#include "thash.h"
#include "ttypes.h"
typedef struct SCacheRowsScanInfo {
SSDataBlock* pRes;
SReadHandle readHandle;
void* pLastrowReader;
SColMatchInfo matchInfo;
int32_t* pSlotIds;
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
SSDataBlock* pBufferredRes;
SArray* pUidList;
int32_t indexOfBufferedRes;
} SCacheRowsScanInfo;
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
static void destroyLastrowScanOperator(void* param);
static void destroyCacheScanOperator(void* param);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -97,14 +111,14 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyLastrowScanOperator, NULL);
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, NULL);
pOperator->cost.openCost = 0;
return pOperator;
_error:
pTaskInfo->code = code;
destroyLastrowScanOperator(pInfo);
destroyCacheScanOperator(pInfo);
taosMemoryFree(pOperator);
return NULL;
}
......@@ -114,7 +128,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return NULL;
}
SLastrowScanInfo* pInfo = pOperator->info;
SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pTaskInfo->pTableInfoList;
......@@ -157,9 +171,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes);
colDataAppend(pDst, 0, p, isNull);
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {
colDataAppendNULL(pDst, 0);
} else {
char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes);
colDataAppend(pDst, 0, p, false);
}
}
pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
......@@ -226,6 +243,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
return pInfo->pRes;
} else {
pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader);
}
}
......@@ -234,8 +253,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
}
void destroyLastrowScanOperator(void* param) {
SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param;
void destroyCacheScanOperator(void* param) {
SCacheRowsScanInfo* pInfo = (SCacheRowsScanInfo*)param;
blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferredRes);
taosMemoryFree(pInfo->pSlotIds);
......
......@@ -250,6 +250,8 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
return code;
}
taosArrayClear(pInserter->pDataBlocks);
code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
if (code) {
return code;
......
......@@ -41,6 +41,16 @@ typedef struct SFetchRspHandleWrapper {
int32_t sourceIndex;
} SFetchRspHandleWrapper;
typedef struct SSourceDataInfo {
int32_t index;
SRetrieveTableRsp* pRsp;
uint64_t totalRows;
int64_t startTime;
int32_t code;
EX_SOURCE_STATUS status;
const char* taskId;
} SSourceDataInfo;
static void destroyExchangeOperatorInfo(void* param);
static void freeBlock(void* pParam);
static void freeSourceDataInfo(void* param);
......@@ -52,6 +62,7 @@ static int32_t getCompletedSources(const SArray* pArray);
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
SExecTaskInfo* pTaskInfo) {
......@@ -647,3 +658,80 @@ int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return TSDB_CODE_SUCCESS;
}
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
if (pLimitInfo->remainGroupOffset > 0) {
if (pLimitInfo->currentGroupId == 0) { // it is the first group
pLimitInfo->currentGroupId = pBlock->info.groupId;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
// now it is the data from a new group
pLimitInfo->remainGroupOffset -= 1;
// ignore data block in current group
if (pLimitInfo->remainGroupOffset > 0) {
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
}
}
// set current group id of the project operator
pLimitInfo->currentGroupId = pBlock->info.groupId;
}
// here check for a new group data, we need to handle the data of the previous group.
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
pLimitInfo->numOfOutputGroups += 1;
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
pOperator->status = OP_EXEC_DONE;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_DONE;
}
// reset the value for a new group data
pLimitInfo->numOfOutputRows = 0;
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
// existing rows that belongs to previous group.
if (pBlock->info.rows > 0) {
return PROJECT_RETRIEVE_DONE;
}
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pLimitInfo->currentGroupId = pBlock->info.groupId;
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
}
// check for the limitation in each group
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
pOperator->status = OP_EXEC_DONE;
}
return PROJECT_RETRIEVE_DONE;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
pLimitInfo->slimit.limit != -1) {
return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE;
}
}
......@@ -24,7 +24,6 @@
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tsort.h"
#include "ttime.h"
#include "executorimpl.h"
......@@ -297,8 +296,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
}
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
typedef struct {
bool hasAgg;
int32_t numOfRows;
......@@ -1347,42 +1344,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
}
}
// static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
//
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
//
// if (pQueryAttr->limit.offset == pBlockInfo->rows) { // current block will ignore completed
// pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQueryAttr) ? pBlockInfo->window.ekey + step :
// pBlockInfo->window.skey + step; pQueryAttr->limit.offset = 0; return;
// }
//
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// pQueryAttr->pos = (int32_t)pQueryAttr->limit.offset;
// } else {
// pQueryAttr->pos = pBlockInfo->rows - (int32_t)pQueryAttr->limit.offset - 1;
// }
//
// assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
//
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
// TSKEY *keys = (TSKEY *) pColInfoData->pData;
//
// // update the offset value
// pTableQueryInfo->lastKey = keys[pQueryAttr->pos];
// pQueryAttr->limit.offset = 0;
//
// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
//
// //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numBlocksOfStep:%d, numOfRes:%d,
// lastKey:%"PRId64, GET_TASKID(pRuntimeEnv),
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
// }
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
//
......@@ -1723,159 +1684,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pInfo->pRes;
}
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
if (result == NULL || length == NULL) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
int32_t size = tSimpleHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize =
sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
// no result
if (getTotalBufSize(pSup->pResultBuf) == 0) {
*result = NULL;
*length = 0;
return TSDB_CODE_SUCCESS;
}
*result = (char*)taosMemoryCalloc(1, totalSize);
if (*result == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t offset = sizeof(int32_t);
*(int32_t*)(*result + offset) = size;
offset += sizeof(int32_t);
// prepare memory
SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
void* pPage = getBufPage(pSup->pResultBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
int32_t iter = 0;
void* pIter = NULL;
while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
void* key = tSimpleHashGetKey(pIter, &keyLen);
SResultRowPosition* p1 = (SResultRowPosition*)pIter;
pPage = (SFilePage*)getBufPage(pSup->pResultBuf, p1->pageId);
pRow = (SResultRow*)((char*)pPage + p1->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
// recalculate the result size
int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
if (realTotalSize > totalSize) {
char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
if (tmp == NULL) {
taosMemoryFree(*result);
*result = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
} else {
*result = tmp;
}
}
// save key
*(int32_t*)(*result + offset) = keyLen;
offset += sizeof(int32_t);
memcpy(*result + offset, key, keyLen);
offset += keyLen;
// save value
*(int32_t*)(*result + offset) = pSup->resultRowSize;
offset += sizeof(int32_t);
memcpy(*result + offset, pRow, pSup->resultRowSize);
offset += pSup->resultRowSize;
}
*(int32_t*)(*result) = offset;
*length = offset;
return TDB_CODE_SUCCESS;
}
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) {
if (pLimitInfo->remainGroupOffset > 0) {
if (pLimitInfo->currentGroupId == 0) { // it is the first group
pLimitInfo->currentGroupId = pBlock->info.groupId;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->currentGroupId != pBlock->info.groupId) {
// now it is the data from a new group
pLimitInfo->remainGroupOffset -= 1;
// ignore data block in current group
if (pLimitInfo->remainGroupOffset > 0) {
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
}
}
// set current group id of the project operator
pLimitInfo->currentGroupId = pBlock->info.groupId;
}
// here check for a new group data, we need to handle the data of the previous group.
if (pLimitInfo->currentGroupId != 0 && pLimitInfo->currentGroupId != pBlock->info.groupId) {
pLimitInfo->numOfOutputGroups += 1;
if ((pLimitInfo->slimit.limit > 0) && (pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
pOperator->status = OP_EXEC_DONE;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_DONE;
}
// reset the value for a new group data
pLimitInfo->numOfOutputRows = 0;
pLimitInfo->remainOffset = pLimitInfo->limit.offset;
// existing rows that belongs to previous group.
if (pBlock->info.rows > 0) {
return PROJECT_RETRIEVE_DONE;
}
}
// here we reach the start position, according to the limit/offset requirements.
// set current group id
pLimitInfo->currentGroupId = pBlock->info.groupId;
if (pLimitInfo->remainOffset >= pBlock->info.rows) {
pLimitInfo->remainOffset -= pBlock->info.rows;
blockDataCleanup(pBlock);
return PROJECT_RETRIEVE_CONTINUE;
} else if (pLimitInfo->remainOffset < pBlock->info.rows && pLimitInfo->remainOffset > 0) {
blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset);
pLimitInfo->remainOffset = 0;
}
// check for the limitation in each group
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) {
pOperator->status = OP_EXEC_DONE;
}
return PROJECT_RETRIEVE_DONE;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if ((!holdDataInBuf) || (pBlock->info.rows >= pOperator->resultInfo.threshold) || pLimitInfo->slimit.offset != -1 ||
pLimitInfo->slimit.limit != -1) {
return PROJECT_RETRIEVE_DONE;
} else { // not full enough, continue to accumulate the output data in the buffer.
return PROJECT_RETRIEVE_CONTINUE;
}
}
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
......@@ -2056,6 +1864,8 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
} else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
taosVariantDestroy(&pExprInfo->base.pParam[j].param);
}
}
......
......@@ -27,6 +27,36 @@
#include "thash.h"
#include "ttypes.h"
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SExprSupp scalarSup;
} SGroupbyOperatorInfo;
// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
SOptrBasicInfo binfo;
SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group
SExprSupp scalarSup;
} SPartitionOperatorInfo;
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData,
......
......@@ -24,6 +24,21 @@
#include "tmsg.h"
#include "ttypes.h"
typedef struct SJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
int32_t inputOrder;
SSDataBlock* pLeft;
int32_t leftPos;
SColumnInfo leftCol;
SSDataBlock* pRight;
int32_t rightPos;
SColumnInfo rightCol;
SNode* pCondAfterMerge;
} SJoinOperatorInfo;
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
static void destroyMergeJoinOperator(void* param);
......
......@@ -17,6 +17,24 @@
#include "executorimpl.h"
#include "functionMgt.h"
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SLimitInfo limitInfo;
bool mergeDataBlocks;
SSDataBlock* pFinalRes;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprSupp scalarSup;
uint64_t groupId;
SSDataBlock* pNextGroupRes;
} SIndefOperatorInfo;
static SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator);
static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator);
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator);
......
......@@ -17,6 +17,18 @@
#include "executorimpl.h"
#include "tdatablock.h"
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SColMatchInfo matchInfo;
int32_t bufPageSize;
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
} SSortOperatorInfo;
static SSDataBlock* doSort(SOperatorInfo* pOperator);
static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
......@@ -176,10 +188,10 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[0];
ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps);
int32_t code = tsortOpen(pInfo->pSortHandle);
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
......@@ -377,10 +389,10 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
param->childOpInfo = pOperator->pDownstream[0];
param->grpSortOpInfo = pInfo;
ps->param = param;
ps->onlyRef = false;
tsortAddSource(pInfo->pCurrSortHandle, ps);
int32_t code = tsortOpen(pInfo->pCurrSortHandle);
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
......@@ -471,6 +483,9 @@ void destroyGroupSortOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->matchInfo.pList);
tsortDestroySortHandle(pInfo->pCurrSortHandle);
pInfo->pCurrSortHandle = NULL;
taosMemoryFreeClear(param);
}
......@@ -563,6 +578,7 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pOperator->pDownstream[i];
ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps);
}
......
此差异已折叠。
......@@ -709,6 +709,7 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
pFillSup->pResMap = NULL;
releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal);
pFillSup->cur.pRowVal = NULL;
cleanupExprSupp(&pFillSup->notFillExprSup);
taosMemoryFree(pFillSup);
return NULL;
......@@ -1417,25 +1418,13 @@ static void doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock
blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pDstBlock->info.groupId = pSrcBlock->info.groupId;
SColumnInfoData* pDst = taosArrayGet(pDstBlock->pDataBlock, pInfo->primaryTsCol);
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, pInfo->primarySrcSlotId);
colDataAssign(pDst, pSrc, pDstBlock->info.rows, &pDstBlock->info);
int32_t numOfNotFill = pInfo->pFillSup->numOfAllCols - pInfo->pFillSup->numOfFillCols;
for (int32_t i = 0; i < numOfNotFill; ++i) {
SFillColInfo* pCol = &pInfo->pFillSup->pAllColInfo[i + pInfo->pFillSup->numOfFillCols];
ASSERT(pCol->notFillCol);
SExprInfo* pExpr = pCol->pExpr;
int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId;
int32_t dstSlotId = pExpr->base.resSchema.slotId;
pDstBlock->info.rows = 0;
pSup = &pInfo->pFillSup->notFillExprSup;
setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
pDstBlock->info.groupId = pSrcBlock->info.groupId;
SColumnInfoData* pDst1 = taosArrayGet(pDstBlock->pDataBlock, dstSlotId);
SColumnInfoData* pSrc1 = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
colDataAssign(pDst1, pSrc1, pDstBlock->info.rows, &pDstBlock->info);
}
blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
}
......@@ -1577,6 +1566,14 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod
destroyStreamFillSupporter(pFillSup);
return NULL;
}
SExprInfo* noFillExpr = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &numOfNotFillCols);
code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamFillSupporter(pFillSup);
return NULL;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pFillSup->pResMap = tSimpleHashInit(16, hashFn);
pFillSup->hasDelete = false;
......
......@@ -1618,6 +1618,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
nodesDestroyNode((SNode*)pInfo->pPhyNode);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupGroupResInfo(&pInfo->groupResInfo);
cleanupExprSupp(&pInfo->scalarSupp);
taosMemoryFreeClear(param);
}
......@@ -2703,6 +2704,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
releaseOutputBuf(pChInfo->pState, pWinRes, pChResult);
}
if (num > 0 && pUpdatedMap) {
saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap);
......@@ -5362,15 +5364,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册