提交 58149dc4 编写于 作者: X Xiaoyu Wang

merge 3.0

...@@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo { ...@@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo {
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
// todo support the disk-based sort // todo support the disk-based sort
typedef struct SOrderOperatorInfo { typedef struct SSortOperatorInfo {
int32_t colIndex; int32_t colIndex;
int32_t order; int32_t order;
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
} SOrderOperatorInfo; } SSortOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
...@@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); SOperatorInfo* createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
......
...@@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
case OP_Order: { case OP_Order: {
pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); pRuntimeEnv->proot = createSortOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
break; break;
} }
...@@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return NULL; return NULL;
} }
SOrderOperatorInfo* pInfo = pOperator->info; SSortOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
...@@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { ...@@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
} }
SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { SOperatorInfo *createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); SSortOperatorInfo* pInfo = calloc(1, sizeof(SSortOperatorInfo));
{ {
SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
...@@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
} }
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param;
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_CONSUMER_H_
#define _TD_CONSUMER_H_
#include "tlist.h"
#include "tarray.h"
#include "hash.h"
#ifdef __cplusplus
extern "C" {
#endif
//consumer handle
struct tmq_consumer_t;
typedef struct tmq_consumer_t tmq_consumer_t;
//consumer config
struct tmq_consumer_config_t;
typedef struct tmq_consumer_config_t tmq_consumer_config_t;
//response err
struct tmq_resp_err_t;
typedef struct tmq_resp_err_t tmq_resp_err_t;
struct tmq_message_t;
typedef struct tmq_message_t tmq_message_t;
struct tmq_col_batch_t;
typedef struct tmq_col_batch_t tmq_col_batch_t;
//get content of message
tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id);
tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*);
//consumer config
int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap);
//consumer initialization
//resouces are supposed to be free by users by calling tmq_consumer_destroy
tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap);
//subscribe
tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*);
tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*);
//consume
//resouces are supposed to be free by users by calling tmq_message_destroy
tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time);
//destroy message and free memory
void tmq_message_destroy(tmq_message_t*);
//close consumer
int32_t tmq_consumer_close(tmq_consumer_t*);
//destroy consumer
void tmq_consumer_destroy(tmq_message_t*);
#ifdef __cplusplus
}
#endif
#endif /*_TD_CONSUMER_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_STREAM_H_
#define _TD_STREAM_H_
#ifdef __cplusplus
extern "C" {
#endif
#ifdef __cplusplus
}
#endif
#endif /*_TD_STREAM_H_*/
\ No newline at end of file
...@@ -29,8 +29,9 @@ typedef struct SCorEpSet { ...@@ -29,8 +29,9 @@ typedef struct SCorEpSet {
} SCorEpSet; } SCorEpSet;
typedef struct SBlockOrderInfo { typedef struct SBlockOrderInfo {
bool nullFirst;
int32_t order; int32_t order;
int32_t colIndex; int32_t slotId;
SColumnInfoData* pColData; SColumnInfoData* pColData;
} SBlockOrderInfo; } SBlockOrderInfo;
...@@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); ...@@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
typedef struct SRpcMsg SRpcMsg; typedef struct SRpcMsg SRpcMsg;
typedef struct SEpSet SEpSet; typedef struct SEpSet SEpSet;
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMgmtWrapper SMgmtWrapper;
typedef enum { typedef enum {
QUERY_QUEUE, QUERY_QUEUE,
FETCH_QUEUE, FETCH_QUEUE,
...@@ -38,10 +39,11 @@ typedef enum { ...@@ -38,10 +39,11 @@ typedef enum {
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef struct { typedef struct {
SMgmtWrapper* pWrapper; SMgmtWrapper* pWrapper;
...@@ -51,14 +53,16 @@ typedef struct { ...@@ -51,14 +53,16 @@ typedef struct {
SendMnodeReqFp sendMnodeReqFp; SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
} SMsgCb; } SMsgCb;
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle); ...@@ -54,6 +54,16 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
* @param pBlocks
* @param numOfInputBlock
* @param type
* @return
*/
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type);
/** /**
* Update the table id list, add or remove. * Update the table id list, add or remove.
* *
...@@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -86,16 +96,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
*/ */
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds);
/**
* Retrieve the produced results information, if current query is not paused or completed,
* this function will be blocked to wait for the query execution completed or paused,
* in which case enough results have been produced already.
*
* @param tinfo
* @return
*/
int32_t qRetrieveQueryResultInfo(qTaskInfo_t tinfo, bool* buildRes, void* pRspContext);
/** /**
* kill the ongoing query and free the query handle and corresponding resources automatically * kill the ongoing query and free the query handle and corresponding resources automatically
* @param tinfo qhandle * @param tinfo qhandle
...@@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t ...@@ -158,50 +158,6 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
*/ */
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
//================================================================================================
// query handle management
/**
* Query handle mgmt object
* @param vgId
* @return
*/
void* qOpenTaskMgmt(int32_t vgId);
/**
* broadcast the close information and wait for all query stop.
* @param pExecutor
*/
void qTaskMgmtNotifyClosing(void* pExecutor);
/**
* Re-open the query handle management module when opening the vnode again.
* @param pExecutor
*/
void qQueryMgmtReOpen(void* pExecutor);
/**
* Close query mgmt and clean up resources.
* @param pExecutor
*/
void qCleanupTaskMgmt(void* pExecutor);
/**
* Add the query into the query mgmt object
* @param pMgmt
* @param qId
* @param qInfo
* @return
*/
void** qRegisterTask(void* pMgmt, uint64_t qId, void* qInfo);
/**
* acquire the query handle according to the key from query mgmt object.
* @param pMgmt
* @param key
* @return
*/
void** qAcquireTask(void* pMgmt, uint64_t key);
/** /**
* release the query handle and decrease the reference count in cache * release the query handle and decrease the reference count in cache
* @param pMgmt * @param pMgmt
...@@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key); ...@@ -211,13 +167,6 @@ void** qAcquireTask(void* pMgmt, uint64_t key);
*/ */
void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
/**
* De-register the query handle from the management module and free it immediately.
* @param pMgmt
* @param pQInfo
* @return
*/
void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
......
...@@ -30,6 +30,11 @@ enum { ...@@ -30,6 +30,11 @@ enum {
STREAM_TASK_STATUS__STOP, STREAM_TASK_STATUS__STOP,
}; };
enum {
STREAM_CREATED_BY__USER = 1,
STREAM_CREATED_BY__SMA,
};
#if 0 #if 0
// pipe -> fetch/pipe queue // pipe -> fetch/pipe queue
// merge -> merge queue // merge -> merge queue
......
...@@ -107,20 +107,20 @@ void rpcClose(void *); ...@@ -107,20 +107,20 @@ void rpcClose(void *);
void * rpcMallocCont(int contLen); void * rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void * rpcReallocCont(void *ptr, int contLen); void * rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock
// These functions will not be called in the child process
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
// just release client conn to rpc instance, no close sock
void rpcReleaseHandle(void *handle, int8_t type); //
void rpcRefHandle(void *handle, int8_t type);
void rpcUnrefHandle(void *handle, int8_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -55,9 +55,9 @@ int32_t taosProcRun(SProcObj *pProc); ...@@ -55,9 +55,9 @@ int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc); void taosProcStop(SProcObj *pProc);
bool taosProcIsChild(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc);
int32_t taosProcChildId(SProcObj *pProc); int32_t taosProcChildId(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype); ProcFuncType ftype);
int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType ftype); ProcFuncType ftype);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ...@@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
typedef struct SSDataBlockSortHelper { typedef struct SSDataBlockSortHelper {
SArray* orderInfo; // SArray<SBlockOrderInfo> SArray* orderInfo; // SArray<SBlockOrderInfo>
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
bool nullFirst;
} SSDataBlockSortHelper; } SSDataBlockSortHelper;
int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
...@@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { ...@@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
} }
if (rightNull) { if (rightNull) {
return pHelper->nullFirst ? 1 : -1; return pOrder->nullFirst ? 1 : -1;
} }
if (leftNull) { if (leftNull) {
return pHelper->nullFirst ? -1 : 1; return pOrder->nullFirst ? -1 : 1;
} }
} }
...@@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) { ...@@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) {
} }
} }
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
ASSERT(pDataBlock != NULL && pOrderInfo != NULL); ASSERT(pDataBlock != NULL && pOrderInfo != NULL);
if (pDataBlock->info.rows <= 1) { if (pDataBlock->info.rows <= 1) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs ...@@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
if (pColInfoData->hasNull) { if (pColInfoData->hasNull) {
sortColumnHasNull = true; sortColumnHasNull = true;
} }
...@@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs ...@@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
int64_t p0 = taosGetTimestampUs(); int64_t p0 = taosGetTimestampUs();
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
} }
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
...@@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* ...@@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId);
pInfo->pColData = pColInfo; pInfo->pColData = pColInfo;
sortValLengthPerRow += pColInfo->info.bytes; sortValLengthPerRow += pColInfo->info.bytes;
} }
...@@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF ...@@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
// Allocate the additional buffer. // Allocate the additional buffer.
int64_t p0 = taosGetTimestampUs(); int64_t p0 = taosGetTimestampUs();
SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo};
uint32_t rows = pDataBlock->info.rows; uint32_t rows = pDataBlock->info.rows;
SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock); SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock);
......
...@@ -24,7 +24,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { ...@@ -24,7 +24,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype);
} }
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
} }
...@@ -32,8 +32,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { ...@@ -32,8 +32,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
} }
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
}
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) {
(*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type);
} }
\ No newline at end of file
...@@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) { ...@@ -167,10 +167,10 @@ TEST(testCase, Datablock_test) {
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0}; SBlockOrderInfo order = {.nullFirst = true, .order = TSDB_ORDER_ASC, .slotId = 0};
taosArrayPush(pOrderInfo, &order); taosArrayPush(pOrderInfo, &order);
blockDataSort(b, pOrderInfo, true); blockDataSort(b, pOrderInfo);
blockDataDestroy(b); blockDataDestroy(b);
taosArrayDestroy(pOrderInfo); taosArrayDestroy(pOrderInfo);
......
...@@ -137,8 +137,8 @@ void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc ...@@ -137,8 +137,8 @@ void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
......
...@@ -56,7 +56,7 @@ void dndCleanupServer(SDnode *pDnode); ...@@ -56,7 +56,7 @@ void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -309,7 +309,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { ...@@ -309,7 +309,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
return 0; return 0;
} }
static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pMgmt->clientRpc == NULL) { if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE; terrno = TSDB_CODE_DND_OFFLINE;
return -1; return -1;
...@@ -319,7 +319,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { ...@@ -319,7 +319,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
return 0; return 0;
} }
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pReq) { int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
} else { } else {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { ...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
} }
} }
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
if (pDnodeWrapper != NULL) { if (pDnodeWrapper != NULL) {
...@@ -362,7 +362,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { ...@@ -362,7 +362,7 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
} }
} }
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
int32_t code = -1; int32_t code = -1;
do { do {
......
...@@ -29,7 +29,7 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper); ...@@ -29,7 +29,7 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd ...@@ -54,7 +54,7 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
} }
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) { void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pReq) {
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
SEpSet epSet = {0}; SEpSet epSet = {0};
......
...@@ -731,10 +731,10 @@ typedef struct { ...@@ -731,10 +731,10 @@ typedef struct {
int32_t vgNum; int32_t vgNum;
SRWLatch lock; SRWLatch lock;
int8_t status; int8_t status;
int8_t sourceType;
int8_t sinkType;
// int32_t sqlLen; // int32_t sqlLen;
int32_t sinkVgId; // 0 for automatic int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
......
...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); ...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); ...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId); int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,8 +26,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -26,8 +26,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
// TODO encode tasks // TODO encode tasks
if (pObj->tasks) { if (pObj->tasks) {
...@@ -69,8 +72,11 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -69,8 +72,11 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL; pObj->tasks = NULL;
int32_t sz; int32_t sz;
......
...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { ...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup; return pVgroup;
} }
int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL; void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0);
...@@ -151,9 +151,9 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, ...@@ -151,9 +151,9 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream,
pTask->execType = TASK_EXEC__NONE; pTask->execType = TASK_EXEC__NONE;
// sink // sink
if (smaId != -1) { if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA; pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->sinkType = TASK_SINK__TABLE; pTask->sinkType = TASK_SINK__TABLE;
} }
...@@ -166,7 +166,45 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, ...@@ -166,7 +166,45 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream,
return 0; return 0;
} }
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(pStream->fixedSinkVgId != 0);
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayPush(tasks, &pTask);
pTask->nodeId = pStream->fixedSinkVgId;
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) {
return -1;
}
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->sourceType = TASK_SOURCE__MERGE;
// exec
pTask->execType = TASK_EXEC__NONE;
// sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
}
//
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
...@@ -185,7 +223,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ...@@ -185,7 +223,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink // add extra sink
hasExtraSink = true; hasExtraSink = true;
mndAddSinkToStream(pMnode, pTrans, pStream, smaId); if (pStream->fixedSinkVgId == 0) {
mndAddShuffledSinkToStream(pMnode, pTrans, pStream);
} else {
mndAddFixedSinkToStream(pMnode, pTrans, pStream);
}
} }
for (int32_t level = 0; level < totLevel; level++) { for (int32_t level = 0; level < totLevel; level++) {
...@@ -221,12 +263,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ...@@ -221,12 +263,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
pTask->sinkType = TASK_SINK__SHOW; pTask->sinkType = TASK_SINK__SHOW;
pTask->showSink.reserved = 0; pTask->showSink.reserved = 0;
if (!hasExtraSink) { if (!hasExtraSink) {
if (smaId != -1) { #if 1
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA; pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId; pTask->smaSink.smaId = pStream->smaId;
} else { } else {
pTask->sinkType = TASK_SINK__TABLE; pTask->sinkType = TASK_SINK__TABLE;
} }
#endif
} }
} else { } else {
pTask->sinkType = TASK_SINK__NONE; pTask->sinkType = TASK_SINK__NONE;
...@@ -286,35 +330,47 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i ...@@ -286,35 +330,47 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i
if (hasExtraSink) { if (hasExtraSink) {
// add dispatcher // add dispatcher
pTask->dispatchType = TASK_DISPATCH__SHUFFLE; if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
ASSERT(pDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->db);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pSdb, pDb);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
qDestroyQueryPlan(pPlan);
return -1; // put taskId to useDbRsp
} // TODO: optimize
sdbRelease(pSdb, pDb); SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
// put taskId to useDbRsp SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
// TODO: optimize int32_t sinkLvSize = taosArrayGetSize(sinkLv);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; for (int32_t i = 0; i < sz; i++) {
int32_t sz = taosArrayGetSize(pVgs); SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); for (int32_t j = 0; j < sinkLvSize; j++) {
int32_t sinkLvSize = taosArrayGetSize(sinkLv); SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
for (int32_t i = 0; i < sz; i++) { /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); if (pLastLevelTask->nodeId == pVgInfo->vgId) {
for (int32_t j = 0; j < sinkLvSize; j++) { pVgInfo->taskId = pLastLevelTask->taskId;
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/ break;
if (pLastLevelTask->nodeId == pVgInfo->vgId) { }
pVgInfo->taskId = pLastLevelTask->taskId;
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
break;
} }
} }
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
} }
} }
#endif #endif
......
...@@ -415,6 +415,10 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -415,6 +415,10 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__SMA;
// TODO
streamObj.fixedSinkVgId = 0;
streamObj.smaId = smaObj.uid;
/*streamObj.physicalPlan = "";*/ /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented"; streamObj.logicalPlan = "not implemented";
...@@ -428,7 +432,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -428,7 +432,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
......
...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { ...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return code; return code;
} }
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast ...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1; return -1;
} }
if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) { if (mndScheduleStream(pMnode, pTrans, pStream) < 0) {
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
return -1; return -1;
} }
...@@ -300,6 +300,10 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -300,6 +300,10 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.createdBy = STREAM_CREATED_BY__USER;
// TODO
streamObj.fixedSinkVgId = 0;
streamObj.smaId = 0;
/*streamObj.physicalPlan = "";*/ /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = "not implemented"; streamObj.logicalPlan = "not implemented";
...@@ -310,7 +314,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -310,7 +314,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
} }
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) { if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
......
...@@ -157,6 +157,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -157,6 +157,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
} }
} }
#if 0
int j = 0; int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) { for (int32_t i = 0; i < colNumNeed; i++) {
col_id_t colId = *(col_id_t*)taosArrayGet(pHandle->pColIdList, i); col_id_t colId = *(col_id_t*)taosArrayGet(pHandle->pColIdList, i);
...@@ -183,6 +184,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { ...@@ -183,6 +184,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
blockDataEnsureColumnCapacity(&colInfo, numOfRows); blockDataEnsureColumnCapacity(&colInfo, numOfRows);
taosArrayPush(pArray, &colInfo); taosArrayPush(pArray, &colInfo);
} }
#endif
STSRowIter iter = {0}; STSRowIter iter = {0};
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterInit(&iter, pTschema);
......
...@@ -430,9 +430,10 @@ typedef struct STagScanInfo { ...@@ -430,9 +430,10 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
int32_t blockType; // current block type int32_t blockType; // current block type
bool blockValid; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
...@@ -572,11 +573,13 @@ typedef struct SGroupbyOperatorInfo { ...@@ -572,11 +573,13 @@ typedef struct SGroupbyOperatorInfo {
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo;
STimeWindow curWindow; // current time window STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows int32_t numOfRows; // number of rows
int32_t start; // start row index int32_t start; // start row index
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
...@@ -593,8 +596,7 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -593,8 +596,7 @@ typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
bool hasVarCol; bool hasVarCol;
SArray *orderInfo; // SArray<SBlockOrderInfo> SArray* pSortInfo;
bool nullFirst;
int32_t numOfSources; int32_t numOfSources;
SSortHandle *pSortHandle; SSortHandle *pSortHandle;
...@@ -613,12 +615,10 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -613,12 +615,10 @@ typedef struct SSortedMergeOperatorInfo {
SAggSupporter aggSup; SAggSupporter aggSup;
} SSortedMergeOperatorInfo; } SSortedMergeOperatorInfo;
typedef struct SOrderOperatorInfo { typedef struct SSortOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
bool hasVarCol; // has variable length column, such as binary/varchar/nchar SArray* pSortInfo;
SArray *orderInfo;
bool nullFirst;
SSortHandle *pSortHandle; SSortHandle *pSortHandle;
int32_t bufPageSize; int32_t bufPageSize;
int32_t numOfRowsInRes; int32_t numOfRowsInRes;
...@@ -629,7 +629,7 @@ typedef struct SOrderOperatorInfo { ...@@ -629,7 +629,7 @@ typedef struct SOrderOperatorInfo {
uint64_t totalSize; // total load bytes from remote uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo; } SSortOperatorInfo;
typedef struct SDistinctDataInfo { typedef struct SDistinctDataInfo {
int32_t index; int32_t index;
...@@ -655,15 +655,15 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -655,15 +655,15 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
......
...@@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* ...@@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @param type * @param type
* @return * @return
*/ */
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr); SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
/** /**
* *
......
...@@ -19,8 +19,6 @@ ...@@ -19,8 +19,6 @@
#include "executil.h" #include "executil.h"
#include "executorimpl.h" #include "executorimpl.h"
//#include "queryLog.h"
#include "tbuffer.h"
#include "tcompression.h" #include "tcompression.h"
#include "tlosertree.h" #include "tlosertree.h"
......
...@@ -14,11 +14,12 @@ ...@@ -14,11 +14,12 @@
*/ */
#include "executor.h" #include "executor.h"
#include "tdatablock.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "vnode.h" #include "vnode.h"
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void** input, size_t numOfBlocks, int32_t type, char* id) {
ASSERT(pOperator != NULL); ASSERT(pOperator != NULL);
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
...@@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -31,7 +32,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
return doSetStreamBlock(pOperator->pDownstream[0], input, type, id); return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else { } else {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
...@@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -43,20 +44,20 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
} }
if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (tqReadHandleSetMsg(pInfo->readerHandle, input, 0) < 0) { if (tqReadHandleSetMsg(pInfo->readerHandle, input[0], 0) < 0) {
qError("submit msg messed up when initing stream block, %s" PRIx64, id); qError("submit msg messed up when initing stream block, %s" PRIx64, id);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} else { } else {
ASSERT(!pInfo->blockValid); for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = input[i];
SSDataBlock* pDataBlock = input; SSDataBlock* p = createOneDataBlock(pDataBlock);
pInfo->pRes->info = pDataBlock->info; p->info = pDataBlock->info;
taosArrayClear(pInfo->pRes->pDataBlock);
taosArrayAddAll(pInfo->pRes->pDataBlock, pDataBlock->pDataBlock);
// set current block valid. taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
pInfo->blockValid = true; taosArrayPush(pInfo->pBlockLists, &p);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t ...@@ -64,17 +65,21 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, int32_t t
} }
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) { int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type) {
qSetMultiStreamInput(tinfo, (void**) &input, 1, type);
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, void** pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if (input == NULL) { if (pBlocks == NULL || numOfBlocks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void*)input, type, GET_TASKID(pTaskInfo)); int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
} else { } else {
......
...@@ -23,20 +23,19 @@ ...@@ -23,20 +23,19 @@
#include "tsort.h" #include "tsort.h"
#include "tutil.h" #include "tutil.h"
typedef struct STupleHandle { struct STupleHandle {
SSDataBlock* pBlock; SSDataBlock* pBlock;
int32_t rowIndex; int32_t rowIndex;
} STupleHandle; };
typedef struct SSortHandle { struct SSortHandle {
int32_t type; int32_t type;
int32_t pageSize; int32_t pageSize;
int32_t numOfPages; int32_t numOfPages;
SDiskbasedBuf *pBuf; SDiskbasedBuf *pBuf;
SArray *pOrderInfo; SArray *pSortInfo;
bool nullFirst;
SArray *pOrderedSource; SArray *pOrderedSource;
_sort_fetch_block_fn_t fetchfp; _sort_fetch_block_fn_t fetchfp;
...@@ -60,7 +59,7 @@ typedef struct SSortHandle { ...@@ -60,7 +59,7 @@ typedef struct SSortHandle {
bool inMemSort; bool inMemSort;
bool needAdjust; bool needAdjust;
STupleHandle tupleHandle; STupleHandle tupleHandle;
} SSortHandle; };
static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param);
...@@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) { ...@@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
* @param type * @param type
* @return * @return
*/ */
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) { SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type; pSortHandle->type = type;
pSortHandle->pageSize = pageSize; pSortHandle->pageSize = pageSize;
pSortHandle->numOfPages = numOfPages; pSortHandle->numOfPages = numOfPages;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->pSortInfo = pSortInfo;
pSortHandle->pOrderInfo = pOrderInfo; pSortHandle->pDataBlock = createOneDataBlock(pBlock);
pSortHandle->nullFirst = nullFirst;
pSortHandle->cmpParam.orderInfo = pOrderInfo; pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols);
tsortSetComparFp(pSortHandle, msortComparFn); tsortSetComparFp(pSortHandle, msortComparFn);
if (idstr != NULL) { if (idstr != NULL) {
...@@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { ...@@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
for(int32_t i = 0; i < pInfo->size; ++i) { for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
bool leftNull = false; bool leftNull = false;
if (pLeftColInfoData->hasNull) { if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
} }
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false; bool rightNull = false;
if (pRightColInfoData->hasNull) { if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
...@@ -415,6 +414,9 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { ...@@ -415,6 +414,9 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
static int32_t doInternalMergeSort(SSortHandle* pHandle) { static int32_t doInternalMergeSort(SSortHandle* pHandle) {
size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource); size_t numOfSources = taosArrayGetSize(pHandle->pOrderedSource);
if (numOfSources == 0) {
return 0;
}
// Calculate the I/O counts to complete the data sort. // Calculate the I/O counts to complete the data sort.
double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages)); double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages));
...@@ -542,7 +544,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -542,7 +544,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
if (size > sortBufSize) { if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
...@@ -555,7 +557,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { ...@@ -555,7 +557,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
size_t size = blockDataGetSize(pHandle->pDataBlock); size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk. // Perform the in-memory sort and then flush data in the buffer into disk.
blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
// All sorted data can fit in memory, external memory sort is not needed. Return to directly // All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize) { if (size <= sortBufSize) {
...@@ -603,6 +605,10 @@ int32_t tsortOpen(SSortHandle* pHandle) { ...@@ -603,6 +605,10 @@ int32_t tsortOpen(SSortHandle* pHandle) {
ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf)); ASSERT(numOfSources <= getNumOfInMemBufPages(pHandle->pBuf));
} }
if (numOfSources == 0) {
return 0;
}
code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle); code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, 0, numOfSources - 1, pHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
...@@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) { ...@@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
taosArrayPush(pExprInfo, &exp1); taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup); SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup);
...@@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) { ...@@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
// taosArrayPush(pExprInfo, &exp1); // taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL); SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
......
...@@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) { ...@@ -98,14 +98,14 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
for(int32_t i = 0; i < pInfo->size; ++i) { for(int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i); SBlockOrderInfo* pOrder = (SBlockOrderInfo*)TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pLeftColInfoData = (SColumnInfoData*)TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
bool leftNull = false; bool leftNull = false;
if (pLeftColInfoData->hasNull) { if (pLeftColInfoData->hasNull) {
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg);
} }
SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); SColumnInfoData* pRightColInfoData = (SColumnInfoData*) TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false; bool rightNull = false;
if (pRightColInfoData->hasNull) { if (pRightColInfoData->hasNull) {
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg);
......
...@@ -396,6 +396,8 @@ void checkFstCheckIteratorRange1() { ...@@ -396,6 +396,8 @@ void checkFstCheckIteratorRange1() {
fw->Put("c", 3); fw->Put("c", 3);
fw->Put("d", 4); fw->Put("d", 4);
fw->Put("e", 5); fw->Put("e", 5);
fw->Put("f", 5);
fw->Put("G", 5);
delete fw; delete fw;
FstReadMemory* m = new FstReadMemory(1024 * 64); FstReadMemory* m = new FstReadMemory(1024 * 64);
...@@ -413,6 +415,33 @@ void checkFstCheckIteratorRange1() { ...@@ -413,6 +415,33 @@ void checkFstCheckIteratorRange1() {
assert(result.size() == 3); assert(result.size() == 3);
taosMemoryFree(ctx); taosMemoryFree(ctx);
} }
{
// prefix search
std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
// [b, e)
m->SearchRange(ctx, "b", GT, "e", LT, result);
assert(result.size() == 2);
taosMemoryFree(ctx);
}
{
// prefix search
std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
// [b, e)
m->SearchRange(ctx, "b", GT, "e", LE, result);
assert(result.size() == 3);
taosMemoryFree(ctx);
}
{
// prefix search
std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
// [b, e)
m->SearchRange(ctx, "b", GE, "e", LE, result);
assert(result.size() == 4);
taosMemoryFree(ctx);
}
} }
void checkFstCheckIteratorRange2() { void checkFstCheckIteratorRange2() {
FstWriter* fw = new FstWriter; FstWriter* fw = new FstWriter;
...@@ -424,7 +453,7 @@ void checkFstCheckIteratorRange2() { ...@@ -424,7 +453,7 @@ void checkFstCheckIteratorRange2() {
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
fw->Put("ab", 1); fw->Put("ab", 1);
fw->Put("bd", 2); fw->Put("b", 2);
fw->Put("cdd", 3); fw->Put("cdd", 3);
fw->Put("cde", 3); fw->Put("cde", 3);
fw->Put("ddd", 4); fw->Put("ddd", 4);
...@@ -438,16 +467,41 @@ void checkFstCheckIteratorRange2() { ...@@ -438,16 +467,41 @@ void checkFstCheckIteratorRange2() {
return; return;
} }
{ {
// prefix search // range search
std::vector<uint64_t> result; std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
// [b, e) // [b, e)
m->SearchRange(ctx, "b", GE, "ed", LT, result); m->SearchRange(ctx, "b", GE, "ed", LT, result);
assert(result.size() == 4); assert(result.size() == 4);
taosMemoryFree(ctx); taosMemoryFree(ctx);
} }
{
// range search
std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
// [b, e)
m->SearchRange(ctx, "b", GE, "ed", LE, result);
assert(result.size() == 5);
taosMemoryFree(ctx);
}
{
// range search
std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
// [b, e)
m->SearchRange(ctx, "b", GT, "ed", LE, result);
assert(result.size() == 4);
taosMemoryFree(ctx);
}
{
// range search
std::vector<uint64_t> result;
AutomationCtx* ctx = automCtxCreate((void*)"he", AUTOMATION_ALWAYS);
// [b, e)
m->SearchRange(ctx, "b", GT, "ed", LT, result);
assert(result.size() == 3);
taosMemoryFree(ctx);
}
} }
void fst_get(Fst* fst) { void fst_get(Fst* fst) {
......
...@@ -74,7 +74,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* ...@@ -74,7 +74,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
pMsg->msgType = pTask->dispatchMsgType; pMsg->msgType = pTask->dispatchMsgType;
/*pMsg->noResp = 1;*/ pMsg->noResp = 1;
return 0; return 0;
} }
......
...@@ -214,9 +214,11 @@ static void uvHandleReq(SSrvConn* pConn) { ...@@ -214,9 +214,11 @@ static void uvHandleReq(SSrvConn* pConn) {
// no ref here // no ref here
} }
if (pHead->noResp == 0) { // if pHead->noResp = 1,
transMsg.handle = pConn; // 1. server application should not send resp on handle
} // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist
transMsg.handle = pConn;
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
......
...@@ -207,8 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -207,8 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
} }
} }
static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen, static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
ProcFuncType funcType) { int32_t rawBodyLen, ProcFuncType funcType) {
const int32_t headLen = CEIL8(rawHeadLen); const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
...@@ -471,12 +471,12 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -471,12 +471,12 @@ void taosProcCleanup(SProcObj *pProc) {
} }
} }
int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType funcType) { ProcFuncType funcType) {
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType);
} }
int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType funcType) { ProcFuncType funcType) {
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType);
} }
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$dbNamme = d0
print =============== create database
sql create database $dbNamme vgroups 1
sql show databases
print $data00 $data01 $data02
if $rows != 2 then
return -1
endi
sql use $dbNamme
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct0 using stb tags(1000)
#sql create table ct1 using stb tags(2000)
#sql create table ct3 using stb tags(3000)
sql show tables
if $rows != 1 then
return -1
endi
print =============== insert data
$tbPrefix = ct
$tbNum = 1
$rowNum = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$c2 = $x + 10
$c3 = $x * 10
$c4 = $x - 10
sql insert into $tb values ($tstart , $x , $c2 , $c3 , $c4 )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
sql select ts, c2-c1, c3/c1, c4+c1, c1*9, c1%3 from ct0
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
print ===> $data30 $data31 $data32 $data33 $data34 $data35
if $rows != 10 then
return -1
endi
if $data01 != 10.000000000 then
return -1
endi
if $data02 != -nan then
return -1
endi
if $data03 != -10.000000000 then
return -1
endi
if $data91 != 10.000000000 then
return -1
endi
if $data92 != 10.000000000 then
return -1
endi
if $data93 != 8.000000000 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 4
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$dbPrefix = group_db
$tbPrefix = group_tb
$mtPrefix = group_mt
$tbNum = 8
$rowNum = 100
$totalNum = $tbNum * $rowNum
print =============== groupby.sim
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
print ==== create db, stable, ctables, insert data
sql drop database if exists $db -x step1
step1:
sql create database if not exists $db keep 3650
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$tg2 = ' . abc
$tg2 = $tg2 . '
sql create table $tb using $mt tags( $i , $tg2 )
$x = 0
while $x < $rowNum
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = ' . binary
$binary = $binary . $c
$binary = $binary . '
$nchar = ' . nchar
$nchar = $nchar . $c
$nchar = $nchar . '
sql insert into $tb values ($tstart , $c , $c , $x , $x , $c , $c , $c , $binary , $nchar )
#print ==== insert into $tb values ($tstart , $c , $c , $x , $x , $c , $c , $c , $binary , $nchar )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
sleep 100
$i1 = 1
$i2 = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
$dbPrefix = group_db
$tbPrefix = group_tb
$mtPrefix = group_mt
$tb1 = $tbPrefix . $i1
$tb2 = $tbPrefix . $i2
$ts1 = $tb1 . .ts
$ts2 = $tb2 . .ts
print ===============================groupby_operation
print
print ==== select count(*), c1 from group_tb0 group by c1
sql select count(*), c1 from group_tb0 group by c1
print rows: $rows
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
if $row != 20 then
return -1
endi
if $data00 != 100 then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data10 != 100 then
return -1
endi
if $data11 != 1 then
return -1
endi
sql select first(ts),c1 from group_tb0 where c1<20 group by c1;
if $row != 20 then
return -1
endi
if $data00 != @70-01-01 08:01:40.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data90 != @70-01-01 08:01:40.009@ then
return -1
endi
if $data91 != 9 then
return -1
endi
sql select first(ts), ts, c1 from group_tb0 where c1 < 20 group by c1;
print $row
if $row != 20 then
return -1
endi
if $data00 != $data01 then
return -1
endi
if $data10 != $data11 then
return -1
endi
if $data20 != $data21 then
return -1
endi
if $data90 != $data91 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data12 != 1 then
return -1
endi
if $data92 != 9 then
return -1
endi
sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1;
if $row != 20 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data01 != 0 then
return -1
endi
print $data02
if $data02 != 0.000000000 then
return -1
endi
if $data03 != 0 then
return -1
endi
print $data04
if $data04 != 0.00000 then
return -1
endi
if $data10 != 100 then
return -1
endi
if $data11 != 1 then
return -1
endi
print $data12
if $data12 != 1.000000000 then
return -1
endi
if $data13 != 1 then
return -1
endi
if $data14 != 1.00000 then
print expect 1.00000, actual:$data14
return -1
endi
sql_error select sum(c1), ts, c1 from group_tb0 where c1<20 group by c1;
sql_error select first(ts), ts, c2 from group_tb0 where c1 < 20 group by c1;
sql_error select sum(c3), ts, c2 from group_tb0 where c1 < 20 group by c1;
sql_error select sum(c3), first(ts), c2 from group_tb0 where c1 < 20 group by c1;
sql_error select first(c3), ts, c1, c2 from group_tb0 where c1 < 20 group by c1;
sql_error select first(c3), last(c3), ts, c1 from group_tb0 where c1 < 20 group by c1;
sql_error select ts from group_tb0 group by c1;
#===========================interval=====not support======================
sql_error select count(*), c1 from group_tb0 where c1<20 interval(1y) group by c1;
#=====tbname must be the first in the group by clause=====================
sql_error select count(*) from group_tb0 where c1 < 20 group by c1, tbname;
#super table group by normal columns
sql select count(*), c1 from group_mt0 where c1< 20 group by c1;
if $row != 20 then
return -1
endi
if $data00 != 800 then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data10 != 800 then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data90 != 800 then
return -1
endi
if $data91 != 9 then
return -1
endi
sql select first(c1), c1, ts from group_mt0 where c1<20 group by c1;
if $row != 20 then
return -1
endi
if $data00 != $data01 then
return -1
endi
if $data02 != @70-01-01 08:01:40.000@ then
return -1
endi
if $data10 != $data11 then
return -1
endi
if $data12 != @70-01-01 08:01:40.001@ then
return -1
endi
if $data20 != $data21 then
return -1
endi
if $data22 != @70-01-01 08:01:40.002@ then
return -1
endi
if $data90 != $data91 then
return -1
endi
if $data92 != @70-01-01 08:01:40.009@ then
return -1
endi
sql select first(c1), last(ts), first(ts), last(c1),c1,sum(c1),avg(c1),count(*) from group_mt0 where c1<20 group by c1;
if $row != 20 then
return -1
endi
if $data00 != $data03 then
return -1
endi
if $data01 != @70-01-01 08:01:49.900@ then
return -1
endi
if $data02 != @70-01-01 08:01:40.000@ then
return -1
endi
if $data07 != 800 then
return -1
endi
if $data10 != $data13 then
return -1
endi
if $data11 != @70-01-01 08:01:49.901@ then
return -1
endi
if $data12 != @70-01-01 08:01:40.001@ then
return -1
endi
if $data17 != 800 then
return -1
endi
if $data90 != $data93 then
return -1
endi
if $data91 != @70-01-01 08:01:49.909@ then
return -1
endi
if $data92 != @70-01-01 08:01:40.009@ then
return -1
endi
if $data97 != 800 then
return -1
endi
if $data95 != 7200 then
return -1
endi
if $data94 != 9 then
return -1
endi
sql select c1,sum(c1),avg(c1),count(*) from group_mt0 where c1<5 group by c1;
if $row != 5 then
return -1
endi
if $data00 != 0 then
return -1
endi
if $data11 != 800 then
return -1
endi
sql select first(c1), last(ts), first(ts), last(c1),sum(c1),avg(c1),count(*) from group_mt0 where c1<20 group by tbname,c1;
if $row != 160 then
return -1
endi
print $data00
if $data00 != 0 then
return -1
endi
if $data01 != @70-01-01 08:01:49.900@ then
return -1
endi
print $data01
if $data02 != @70-01-01 08:01:40.000@ then
return -1
endi
if $data03 != 0 then
return -1
endi
if $data04 != 0 then
return -1
endi
if $data06 != 100 then
return -1
endi
if $data07 != @group_tb0@ then
return -1
endi
if $data90 != 9 then
return -1
endi
if $data91 != @70-01-01 08:01:49.909@ then
return -1
endi
if $data92 != @70-01-01 08:01:40.009@ then
return -1
endi
if $data93 != 9 then
return -1
endi
if $data94 != 900 then
return -1
endi
if $data96 != 100 then
return -1
endi
if $data97 != @group_tb0@ then
return -1
endi
sql select count(*),first(ts),last(ts),min(c3) from group_tb1 group by c4;
if $rows != 10000 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data01 != @70-01-01 08:01:40.000@ then
return -1
endi
if $data02 != @70-01-01 08:01:40.000@ then
return -1
endi
if $data03 != 0 then
return -1
endi
sql select count(*),first(ts),last(ts),min(c3) from group_tb1 group by c4 limit 1;
if $rows != 1 then
return -1
endi
sql select count(*),first(ts),last(ts),min(c3) from group_tb1 group by c4 limit 20 offset 9990;
if $rows != 10 then
return -1
endi
sql select count(*),first(ts),last(ts),min(c3),max(c3),sum(c3),avg(c3),sum(c4)/count(c4) from group_tb1 group by c4;
if $rows != 10000 then
return -1
endi
print ---------------------------------> group by binary|nchar data add cases
sql select count(*) from group_tb1 group by c8;
if $rows != 100 then
return -1
endi
sql select count(*),sum(c4), count(c4), sum(c4)/count(c4) from group_tb1 group by c8
if $rows != 100 then
return -1
endi
if $data00 != 100 then
return -1
endi
if $data01 != 495000 then
return -1
endi
if $data02 != 100 then
return -1
endi
if $data03 != 4950.000000000 then
print expect 4950.000000000 , acutal $data03
return -1
endi
if $data10 != 100 then
return -1
endi
if $data11 != 495100 then
return -1
endi
if $data13 != 4951.000000000 then
return -1
endi
print ====================> group by normal column + slimit + soffset
sql select count(*), c8 from group_mt0 group by c8 limit 1 offset 0;
if $rows != 100 then
return -1
endi
sql select sum(c2),c8,avg(c2), sum(c2)/count(*) from group_mt0 group by c8 slimit 2 soffset 99
if $rows != 1 then
return -1
endi
if $data00 != 79200.000000000 then
return -1
endi
if $data01 != @binary99@ then
return -1
endi
if $data02 != 99.000000000 then
return -1
endi
if $data03 != 99.000000000 then
return -1
endi
print ============>td-1765
sql select percentile(c4, 49),min(c4),max(c4),avg(c4),stddev(c4) from group_tb0 group by c8;
if $rows != 100 then
return -1
endi
if $data00 != 4851.000000000 then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data02 != 9900 then
return -1
endi
if $data03 != 4950.000000000 then
return -1
endi
if $data04 != 2886.607004772 then
return -1
endi
if $data10 != 4852.000000000 then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data12 != 9901 then
return -1
endi
if $data13 != 4951.000000000 then
return -1
endi
if $data14 != 2886.607004772 then
return -1
endi
print ================>td-2090
sql select leastsquares(c2, 1, 1) from group_tb1 group by c8;
if $rows != 100 then
return -1
endi
if $data00 != @{slop:0.000000, intercept:0.000000}@ then
return -1
endi
if $data10 != @{slop:0.000000, intercept:1.000000}@ then
return -1
endi
if $data90 != @{slop:0.000000, intercept:9.000000}@ then
return -1
endi
#=========================== group by multi tags ======================
sql create table st (ts timestamp, c int) tags (t1 int, t2 int, t3 int, t4 int);
sql create table t1 using st tags(1, 1, 1, 1);
sql create table t2 using st tags(1, 2, 2, 2);
sql insert into t1 values ('2020-03-27 04:11:16.000', 1)('2020-03-27 04:11:17.000', 2) ('2020-03-27 04:11:18.000', 3) ('2020-03-27 04:11:19.000', 4) ;
sql insert into t1 values ('2020-03-27 04:21:16.000', 1)('2020-03-27 04:31:17.000', 2) ('2020-03-27 04:51:18.000', 3) ('2020-03-27 05:10:19.000', 4) ;
sql insert into t2 values ('2020-03-27 04:11:16.000', 1)('2020-03-27 04:11:17.000', 2) ('2020-03-27 04:11:18.000', 3) ('2020-03-27 04:11:19.000', 4) ;
sql insert into t2 values ('2020-03-27 04:21:16.000', 1)('2020-03-27 04:31:17.000', 2) ('2020-03-27 04:51:18.000', 3) ('2020-03-27 05:10:19.000', 4) ;
print =================>TD-2665
sql_error create table txx as select avg(c) as t from st;
sql_error create table txx1 as select avg(c) as t from t1;
sql select stddev(c),stddev(c) from st group by c;
if $rows != 4 then
return -1
endi
print =================>TD-2236
sql select first(ts),last(ts) from t1 group by c;
if $rows != 4 then
return -1
endi
if $data00 != @20-03-27 04:11:16.000@ then
return -1
endi
if $data01 != @20-03-27 04:21:16.000@ then
return -1
endi
if $data10 != @20-03-27 04:11:17.000@ then
return -1
endi
if $data11 != @20-03-27 04:31:17.000@ then
return -1
endi
if $data20 != @20-03-27 04:11:18.000@ then
return -1
endi
if $data21 != @20-03-27 04:51:18.000@ then
return -1
endi
if $data30 != @20-03-27 04:11:19.000@ then
return -1
endi
if $data31 != @20-03-27 05:10:19.000@ then
return -1
endi
print ===============>
sql select stddev(c),c from st where t2=1 or t2=2 group by c;
if $rows != 4 then
return -1
endi
if $data00 != 0.000000000 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != 0.000000000 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data20 != 0.000000000 then
return -1
endi
if $data21 != 3 then
return -1
endi
if $data30 != 0.000000000 then
return -1
endi
if $data31 != 4 then
return -1
endi
sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c;
sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2;
if $rows != 40 then
return -1
endi
if $data01 != 1.000000000 then
return -1
endi
if $data02 != t1 then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data11 != 1.000000000 then
return -1
endi
if $data12 != t1 then
return -1
endi
if $data13 != 1 then
return -1
endi
if $data14 != 1 then
return -1
endi
sql select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,t1,t2 limit 1;
if $rows != 2 then
return -1
endi
if $data11 != 1.000000000 then
return -1
endi
if $data12 != t2 then
return -1
endi
if $data13 != 1 then
return -1
endi
if $data14 != 2 then
return -1
endi
sql create table m1 (ts timestamp, k int, f1 int) tags(a int);
sql create table tm0 using m1 tags(0);
sql create table tm1 using m1 tags(1);
sql insert into tm0 values('2020-1-1 1:1:1', 1, 10);
sql insert into tm0 values('2020-1-1 1:1:2', 1, 20);
sql insert into tm1 values('2020-2-1 1:1:1', 2, 10);
sql insert into tm1 values('2020-2-1 1:1:2', 2, 20);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 100
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
sleep 100
sql use group_db0;
print =========================>TD-4894
sql select count(*),k from m1 group by k;
if $rows != 2 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
if $data11 != 2 then
return -1
endi
sql_error select count(*) from m1 group by tbname,k,f1;
sql_error select count(*) from m1 group by tbname,k,a;
sql_error select count(*) from m1 group by k, tbname;
sql_error select count(*) from m1 group by k,f1;
sql_error select count(*) from tm0 group by tbname;
sql_error select count(*) from tm0 group by a;
sql_error select count(*) from tm0 group by k,f1;
sql_error select count(*),f1 from m1 group by tbname,k;
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -41,6 +41,38 @@ sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6 ) ...@@ -41,6 +41,38 @@ sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6 )
sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7 ) sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7 )
sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8 ) sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8 )
print =============== insert data into child table ct2 (d)
sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2 )
sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3 )
sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4 )
sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5 )
sql insert into ct2 values ( '2022-01-03 10:00:01.000', 6 )
sql insert into ct2 values ( '2022-01-03 20:00:01.000', 7 )
print =============== insert data into child table ct3 (n)
sql insert into ct3 values ( '2021-12-21 01:01:01.000', NULL )
sql insert into ct3 values ( '2021-12-31 01:01:01.000', 1 )
sql insert into ct3 values ( '2022-01-01 01:01:06.000', 2 )
sql insert into ct3 values ( '2022-01-07 01:01:10.000', 3 )
sql insert into ct3 values ( '2022-01-31 01:01:16.000', 4 )
sql insert into ct3 values ( '2022-02-01 01:01:20.000', 5 )
sql insert into ct3 values ( '2022-02-28 01:01:26.000', 6 )
sql insert into ct3 values ( '2022-03-01 01:01:30.000', 7 )
sql insert into ct3 values ( '2022-03-08 01:01:36.000', 8 )
print =============== insert data into child table ct4 (y)
sql insert into ct4 values ( '2020-10-21 01:01:01.000', 1 )
sql insert into ct4 values ( '2020-12-31 01:01:01.000', 2 )
sql insert into ct4 values ( '2021-01-01 01:01:06.000', 3 )
sql insert into ct4 values ( '2021-05-07 01:01:10.000', 4 )
sql insert into ct4 values ( '2021-09-30 01:01:16.000', 5 )
sql insert into ct4 values ( '2022-02-01 01:01:20.000', 6 )
sql insert into ct4 values ( '2022-10-28 01:01:26.000', 7 )
sql insert into ct4 values ( '2022-12-01 01:01:30.000', 8 )
sql insert into ct4 values ( '2022-12-31 01:01:36.000', 9 )
print ================ start query ======================
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct1 interval(10s, 2s)
print ===> rows: $rows print ===> rows: $rows
...@@ -102,15 +134,6 @@ if $data80 != 1 then ...@@ -102,15 +134,6 @@ if $data80 != 1 then
return -1 return -1
endi endi
print =============== insert data into child table ct2 (d)
sql insert into ct2 values ( '2022-01-01 01:00:01.000', 1 )
sql insert into ct2 values ( '2022-01-01 10:00:01.000', 2 )
sql insert into ct2 values ( '2022-01-01 20:00:01.000', 3 )
sql insert into ct2 values ( '2022-01-02 10:00:01.000', 4 )
sql insert into ct2 values ( '2022-01-02 20:00:01.000', 5 )
sql insert into ct2 values ( '2022-01-03 10:00:01.000', 6 )
sql insert into ct2 values ( '2022-01-03 20:00:01.000', 7 )
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h)
print ===> rows: $rows print ===> rows: $rows
...@@ -140,45 +163,19 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45 ...@@ -140,45 +163,19 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45
print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55 print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55
print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65 print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65
print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75 print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75
#if $rows != 8 then if $rows != 8 then
# return -1 return -1
#endi endi
#if $data00 != 1 then if $data00 != 1 then
# return -1 return -1
#endi endi
#if $data10 != 2 then if $data10 != 2 then
# return -1 return -1
#endi endi
#if $data20 != 2 then if $data70 != 1 then
# return -1 return -1
#endi endi
#if $data30 != 2 then
# return -1
#endi
#if $data40 != 2 then
# return -1
#endi
#if $data50 != 2 then
# return -1
#endi
#if $data60 != 2 then
# return -1
#endi
#if $data70 != 1 then
# return -1
#endi
print =============== insert data into child table ct3 (n)
sql insert into ct3 values ( '2021-12-21 01:01:01.000', NULL );
sql insert into ct3 values ( '2021-12-31 01:01:01.000', 1 );
sql insert into ct3 values ( '2022-01-01 01:01:06.000', 2 );
sql insert into ct3 values ( '2022-01-07 01:01:10.000', 3 );
sql insert into ct3 values ( '2022-01-31 01:01:16.000', 4 );
sql insert into ct3 values ( '2022-02-01 01:01:20.000', 5 );
sql insert into ct3 values ( '2022-02-28 01:01:26.000', 6 );
sql insert into ct3 values ( '2022-03-01 01:01:30.000', 7 );
sql insert into ct3 values ( '2022-03-08 01:01:36.000', 8 );
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w)
print ===> rows: $rows print ===> rows: $rows
...@@ -236,18 +233,6 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74 ...@@ -236,18 +233,6 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74
# return -1 # return -1
#endi #endi
print =============== insert data into child table ct4 (y)
sql insert into ct4 values ( '2020-10-21 01:01:01.000', 1 )
sql insert into ct4 values ( '2020-12-31 01:01:01.000', 2 )
sql insert into ct4 values ( '2021-01-01 01:01:06.000', 3 )
sql insert into ct4 values ( '2021-05-07 01:01:10.000', 4 )
sql insert into ct4 values ( '2021-09-30 01:01:16.000', 5 )
sql insert into ct4 values ( '2022-02-01 01:01:20.000', 6 )
sql insert into ct4 values ( '2022-10-28 01:01:26.000', 7 )
sql insert into ct4 values ( '2022-12-01 01:01:30.000', 8 )
sql insert into ct4 values ( '2022-12-31 01:01:36.000', 9 )
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n)
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n) print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct4 interval(1y, 6n)
print ===> rows: $rows print ===> rows: $rows
...@@ -305,6 +290,27 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74 ...@@ -305,6 +290,27 @@ print ===> rows7: $data70 $data71 $data72 $data73 $data74
# return -1 # return -1
#endi #endi
#=================================================
print =============== stop and restart taosd
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
......
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
run tsim/user/basic1.sim run tsim/user/basic1.sim
run tsim/db/basic1.sim run tsim/db/basic1.sim
run tsim/db/basic2.sim
run tsim/db/basic3.sim
run tsim/db/basic6.sim run tsim/db/basic6.sim
run tsim/db/basic7.sim run tsim/db/basic7.sim
run tsim/db/error1.sim run tsim/db/error1.sim
...@@ -13,12 +15,15 @@ run tsim/insert/basic1.sim ...@@ -13,12 +15,15 @@ run tsim/insert/basic1.sim
run tsim/insert/backquote.sim run tsim/insert/backquote.sim
run tsim/insert/null.sim run tsim/insert/null.sim
#run tsim/parser/groupby-basic.sim
#run tsim/parser/fourArithmetic-basic.sim
run tsim/query/interval.sim run tsim/query/interval.sim
#run tsim/query/interval-offset.sim # TD-14266 run tsim/query/interval-offset.sim
run tsim/show/basic.sim run tsim/show/basic.sim
run tsim/table/basic1.sim run tsim/table/basic1.sim
run tsim/tmq/basic.sim run tsim/tmq/basic.sim
#run tsim/tmq/basic1.sim
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
# vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$dbNamme = d0
print =============== create database , vgroup 1
sql create database $dbNamme vgroups 1
sql show databases
print $data00 $data01 $data02
if $rows != 2 then
return -1
endi
sql use $dbNamme
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct0 using stb tags(1000)
sql create table ct1 using stb tags(2000)
#sql create table ct3 using stb tags(3000)
print =============== create normal table
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
print =============== create multi topics. notes: now only support:
print =============== 1. columns from stb; 2. * from ctb; 3. columns from ctb
print =============== will support: * from stb; function from stb/ctb
sql create topic topic_stb_column as select ts, c1, c3 from stb
#sql create topic topic_stb_all as select * from stb
#sql create topic topic_stb_function as select ts, abs(c1), sina(c2) from stb
sql create topic topic_ctb_column as select ts, c1, c3 from ct0
sql create topic topic_ctb_all as select * from ct0
#sql create topic topic_ctb_function as select ts, abs(c1), sina(c2) from ct0
sql create topic topic_ntb_column as select ts, c1, c3 from ntb
sql create topic topic_ntb_all as select * from ntb
#sql create topic topic_ntb_function as select ts, abs(c1), sina(c2) from ntb
sql show tables
if $rows != 3 then
return -1
endi
print =============== insert data
$tbPrefix = ct
$tbNum = 2
$rowNum = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = ' . binary
$binary = $binary . $c
$binary = $binary . '
sql insert into $tb values ($tstart , $c , $x , $binary )
sql insert into ntb values ($tstart , $c , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
#root@trd02 /home $ tmq_sim --help
# -c Configuration directory, default is
# -d The name of the database for cosumer, no default
# -t The topic string for cosumer, no default
# -k The key-value string for cosumer, no default
# -g showMsgFlag, default is 0
#
$totalMsgCnt = $rowNum * $tbNum
print inserted totalMsgCnt: $totalMsgCnt
# supported key:
# group.id:<xxx>
# enable.auto.commit:<true | false>
# auto.offset.reset:<earliest | latest | none>
# td.connect.ip:<fqdn | ipaddress>
# td.connect.user:root
# td.connect.pass:taosdata
# td.connect.port:6030
# td.connect.db:db
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
return -1
endi
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#print cmd result----> $system_content
#if $system_content != @{consume success: 20}@ then
# return -1
#endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
return -1
endi
print =============== create database , vgroup 4
$dbNamme = d1
sql create database $dbNamme vgroups 4
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -151,8 +151,6 @@ void parseInputString() { ...@@ -151,8 +151,6 @@ void parseInputString() {
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
printf("\n\n");
token = strtok(g_stConfInfo.keyString, delim); token = strtok(g_stConfInfo.keyString, delim);
while(token != NULL) { while(token != NULL) {
//printf("%s\n", token ); //printf("%s\n", token );
...@@ -226,7 +224,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -226,7 +224,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
if ((err = tmq_subscribe(tmq, topics))) { if ((err = tmq_subscribe(tmq, topics))) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
return; exit(-1);
} }
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册