提交 124b52cd 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo { ...@@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo {
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
// todo support the disk-based sort // todo support the disk-based sort
typedef struct SOrderOperatorInfo { typedef struct SSortOperatorInfo {
int32_t colIndex; int32_t colIndex;
int32_t order; int32_t order;
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
} SOrderOperatorInfo; } SSortOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
...@@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); SOperatorInfo* createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
...@@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
case OP_Order: { case OP_Order: {
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); pRuntimeEnv->proot = createSortOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
break; break;
} }
...@@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return NULL; return NULL;
} }
SOrderOperatorInfo* pInfo = pOperator->info; SSortOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
...@@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
} }
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { SOperatorInfo *createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); SSortOperatorInfo* pInfo = calloc(1, sizeof(SSortOperatorInfo));
{ {
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
...@@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
} }
......
...@@ -29,8 +29,9 @@ typedef struct SCorEpSet { ...@@ -29,8 +29,9 @@ typedef struct SCorEpSet {
} SCorEpSet; } SCorEpSet;
typedef struct SBlockOrderInfo { typedef struct SBlockOrderInfo {
bool nullFirst;
int32_t order; int32_t order;
int32_t colIndex; int32_t slotId;
SColumnInfoData* pColData; SColumnInfoData* pColData;
} SBlockOrderInfo; } SBlockOrderInfo;
...@@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); ...@@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
......
...@@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); ...@@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
* @param pBlocks
* @param numOfInputBlock
* @param type
* @return
*/
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
* *
...@@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
*/ */
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds);
/**
* Retrieve the produced results information, if current query is not paused or completed,
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param tinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext);
/** /**
* kill the ongoing query and free the query handle and corresponding resources automatically * kill the ongoing query and free the query handle and corresponding resources automatically
* @param tinfo qhandle * @param tinfo qhandle
...@@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/ */
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
//================================================================================================
// query handle management
/**
* Query handle mgmt object
* @param vgId
* @return
*/
void* qOpenTaskMgmt(int32_t vgId);
/**
* broadcast the close information and wait for all query stop.
* @param pExecutor
*/
void qTaskMgmtNotifyClosing(void* pExecutor);
/**
* Re-open the query handle management module when opening the vnode again.
* @param pExecutor
*/
void qQueryMgmtReOpen(void* pExecutor);
/**
* Close query mgmt and clean up resources.
* @param pExecutor
*/
void qCleanupTaskMgmt(void* pExecutor);
/**
* Add the query into the query mgmt object
* @param pMgmt
* @param qId
* @param qInfo
* @return
*/
void** qRegisterTask(void* pMgmt, uint64_t qId, void* qInfo);
/**
* acquire the query handle according to the key from query mgmt object.
* @param pMgmt
* @param key
* @return
*/
void** qAcquireTask(void* pMgmt, uint64_t key);
/** /**
* release the query handle and decrease the reference count in cache * release the query handle and decrease the reference count in cache
* @param pMgmt * @param pMgmt
...@@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key); ...@@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key);
*/ */
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
/**
* De-register the query handle from the management module and free it immediately.
* @param pMgmt
* @param pQInfo
* @return
*/
void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
......
...@@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ...@@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
typedef struct SSDataBlockSortHelper { typedef struct SSDataBlockSortHelper {
SArray* orderInfo; // SArray<SBlockOrderInfo> SArray* orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
bool nullFirst;
} SSDataBlockSortHelper; } SSDataBlockSortHelper;
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
...@@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { ...@@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
} }
if (rightNull) { if (rightNull) {
return pHelper->nullFirst ? 1 : -1; return pOrder->nullFirst ? 1 : -1;
} }
if (leftNull) { if (leftNull) {
return pHelper->nullFirst ? -1 : 1; return pOrder->nullFirst ? -1 : 1;
} }
} }
...@@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) { ...@@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) {
} }
} }
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
ASSERT(pDataBlock != NULL && pOrderInfo != NULL); ASSERT(pDataBlock != NULL && pOrderInfo != NULL);
if (pDataBlock->info.rows <= 1) { if (pDataBlock->info.rows <= 1) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs ...@@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
if (pColInfoData->hasNull) { if (pColInfoData->hasNull) {
sortColumnHasNull = true; sortColumnHasNull = true;
} }
...@@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs ...@@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
int64_t p0 = taosGetTimestampUs(); int64_t p0 = taosGetTimestampUs();
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
} }
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
...@@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* ...@@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId);
pInfo->pColData = pColInfo; pInfo->pColData = pColInfo;
sortValLengthPerRow += pColInfo->info.bytes; sortValLengthPerRow += pColInfo->info.bytes;
} }
...@@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF ...@@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// Allocate the additional buffer. // Allocate the additional buffer.
int64_t p0 = taosGetTimestampUs(); int64_t p0 = taosGetTimestampUs();
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
uint32_t rows = pDataBlock->info.rows; uint32_t rows = pDataBlock->info.rows;
SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock); SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock);
......
...@@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) { ...@@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) {
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0}; SBlockOrderInfo order = {.nullFirst = true, .order = TSDB_ORDER_ASC, .slotId = 0};
taosArrayPush(pOrderInfo, &order); taosArrayPush(pOrderInfo, &order);
blockDataSort(b, pOrderInfo, true); blockDataSort(b, pOrderInfo);
blockDataDestroy(b); blockDataDestroy(b);
taosArrayDestroy(pOrderInfo); taosArrayDestroy(pOrderInfo);
......
...@@ -430,9 +430,10 @@ typedef struct STagScanInfo { ...@@ -430,9 +430,10 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
int32_t blockType; // current block type int32_t blockType; // current block type
bool blockValid; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
...@@ -572,11 +573,13 @@ typedef struct SGroupbyOperatorInfo { ...@@ -572,11 +573,13 @@ typedef struct SGroupbyOperatorInfo {
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo;
STimeWindow curWindow; // current time window STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows int32_t numOfRows; // number of rows
int32_t start; // start row index int32_t start; // start row index
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
...@@ -593,8 +596,7 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -593,8 +596,7 @@ typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
bool hasVarCol; bool hasVarCol;
SArray *orderInfo; // SArray<SBlockOrderInfo> SArray* pSortInfo;
bool nullFirst;
int32_t numOfSources; int32_t numOfSources;
SSortHandle *pSortHandle; SSortHandle *pSortHandle;
...@@ -613,12 +615,10 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -613,12 +615,10 @@ typedef struct SSortedMergeOperatorInfo {
SAggSupporter aggSup; SAggSupporter aggSup;
} SSortedMergeOperatorInfo; } SSortedMergeOperatorInfo;
typedef struct SOrderOperatorInfo { typedef struct SSortOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
bool hasVarCol; // has variable length column, such as binary/varchar/nchar SArray* pSortInfo;
SArray *orderInfo;
bool nullFirst;
SSortHandle *pSortHandle; SSortHandle *pSortHandle;
int32_t bufPageSize; int32_t bufPageSize;
int32_t numOfRowsInRes; int32_t numOfRowsInRes;
...@@ -629,7 +629,7 @@ typedef struct SOrderOperatorInfo { ...@@ -629,7 +629,7 @@ typedef struct SOrderOperatorInfo {
uint64_t totalSize; // total load bytes from remote uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo; } SSortOperatorInfo;
typedef struct SDistinctDataInfo { typedef struct SDistinctDataInfo {
int32_t index; int32_t index;
...@@ -655,15 +655,15 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -655,15 +655,15 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
......
...@@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* ...@@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @param type * @param type
* @return * @return
*/ */
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr); SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
/** /**
* *
......
...@@ -19,8 +19,6 @@ ...@@ -19,8 +19,6 @@
#include "executil.h" #include "executil.h"
#include "executorimpl.h" #include "executorimpl.h"
//#include "queryLog.h"
#include "tbuffer.h"
#include "tcompression.h" #include "tcompression.h"
#include "tlosertree.h" #include "tlosertree.h"
......
...@@ -14,11 +14,12 @@ ...@@ -14,11 +14,12 @@
*/ */
#include "executor.h" #include "executor.h"
#include "tdatablock.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "vnode.h" #include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void** input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
...@@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->readerHandle, input[0], 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} else { } else {
ASSERT(!pInfo->blockValid); for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = input[i];
SSDataBlock* pDataBlock = input; SSDataBlock* p = createOneDataBlock(pDataBlock);
pInfo->pRes->info = pDataBlock->info; p->info = pDataBlock->info;
taosArrayClear(pInfo->pRes->pDataBlock);
taosArrayAddAll(pInfo->pRes->pDataBlock, pDataBlock->pDataBlock);
// set current block valid. taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
pInfo->blockValid = true; taosArrayPush(pInfo->pBlockLists, &p);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
qSetMultiStreamInput(tinfo, (void**) &input, 1, type);
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if (input == NULL) { if (pBlocks == NULL || numOfBlocks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -23,20 +23,19 @@ ...@@ -23,20 +23,19 @@
#include "tsort.h" #include "tsort.h"
#include "tutil.h" #include "tutil.h"
typedef struct STupleHandle { struct STupleHandle {
SSDataBlock* pBlock; SSDataBlock* pBlock;
int32_t rowIndex; int32_t rowIndex;
} STupleHandle; };
typedef struct SSortHandle { struct SSortHandle {
int32_t type; int32_t type;
int32_t pageSize; int32_t pageSize;
int32_t numOfPages; int32_t numOfPages;
SDiskbasedBuf *pBuf; SDiskbasedBuf *pBuf;
SArray *pOrderInfo; SArray *pSortInfo;
bool nullFirst;
SArray *pOrderedSource; SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp; _sort_fetch_block_fn_t fetchfp;
...@@ -60,7 +59,7 @@ typedef struct SSortHandle { ...@@ -60,7 +59,7 @@ typedef struct SSortHandle {
bool inMemSort; bool inMemSort;
bool needAdjust; bool needAdjust;
STupleHandle tupleHandle; STupleHandle tupleHandle;
} SSortHandle; };
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
...@@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) { ...@@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
* @param type * @param type
* @return * @return
*/ */
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) { SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type; pSortHandle->type = type;
pSortHandle->pageSize = pageSize; pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages; pSortHandle->numOfPages = numOfPages;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pOrderInfo = pOrderInfo; pSortHandle->pDataBlock = createOneDataBlock(pBlock);
pSortHandle->nullFirst = nullFirst;
pSortHandle->cmpParam.orderInfo = pOrderInfo; pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols);
tsortSetComparFp(pSortHandle, msortComparFn); tsortSetComparFp(pSortHandle, msortComparFn);
if (idstr != NULL) { if (idstr != NULL) {
...@@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { ...@@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
for(int32_t i = 0; i < pInfo->size; ++i) { for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
bool leftNull = false; bool leftNull = false;
if (pLeftColInfoData->hasNull) { if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
} }
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false; bool rightNull = false;
if (pRightColInfoData->hasNull) { if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
...@@ -415,6 +414,9 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { ...@@ -415,6 +414,9 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
static int32_t doInternalMergeSort(SSortHandle* pHandle) { static int32_t doInternalMergeSort(SSortHandle* pHandle) {
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
if (numOfSources == 0) {
return 0;
}
// Calculate the I/O counts to complete the data sort. // Calculate the I/O counts to complete the data sort.
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages)); double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
...@@ -542,7 +544,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -542,7 +544,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
if (size > sortBufSize) { if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
...@@ -555,7 +557,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -555,7 +557,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t size = blockDataGetSize(pHandle->pDataBlock); size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
// All sorted data can fit in memory, external memory sort is not needed. Return to directly // All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize) { if (size <= sortBufSize) {
...@@ -603,6 +605,10 @@ int32_t tsortOpen(SSortHandle* pHandle) { ...@@ -603,6 +605,10 @@ int32_t tsortOpen(SSortHandle* pHandle) {
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf)); ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
} }
if (numOfSources == 0) {
return 0;
}
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle); code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) { ...@@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
taosArrayPush(pExprInfo, &exp1); taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup); SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup);
...@@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) { ...@@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1); // taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL); SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
......
...@@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) { ...@@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
for(int32_t i = 0; i < pInfo->size; ++i) { for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i); SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
bool leftNull = false; bool leftNull = false;
if (pLeftColInfoData->hasNull) { if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
} }
SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false; bool rightNull = false;
if (pRightColInfoData->hasNull) { if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册