提交 69eaa097 编写于 作者: S shenglian zhou

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0_udfd

......@@ -88,6 +88,12 @@ def pre_test(){
cmake .. > /dev/null
make -j4> /dev/null
'''
sh'''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
'''
return 1
}
......@@ -97,6 +103,7 @@ pipeline {
environment{
WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
}
stages {
stage('pre_build'){
......@@ -113,13 +120,18 @@ pipeline {
timeout(time: 45, unit: 'MINUTES'){
pre_test()
sh'''
cd ${WKC}/tests
./test-all.sh b1fq
'''
sh'''
cd ${WKC}/debug
ctest
'''
sh'''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
sh'''
cd ${WKC}/tests
./test-all.sh b1fq
'''
}
}
}
......
cmake_minimum_required(VERSION 3.16)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(CMAKE_VERBOSE_MAKEFILE OFF)
#set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
......
......@@ -46,7 +46,7 @@ typedef struct SScanLogicNode {
struct STableMeta* pMeta;
SVgroupsInfo* pVgroupList;
EScanType scanType;
uint8_t scanFlag; // denotes reversed scan of data or not
uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
STimeWindow scanRange;
SName tableName;
bool showRewrite;
......@@ -189,9 +189,6 @@ typedef struct SScanPhysiNode {
SNodeList* pScanCols;
uint64_t uid; // unique id of the table
int8_t tableType;
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
int32_t reverse; // reverse scan count
SName tableName;
} SScanPhysiNode;
......@@ -207,7 +204,7 @@ typedef struct SSystemTableScanPhysiNode {
typedef struct STableScanPhysiNode {
SScanPhysiNode scan;
uint8_t scanFlag; // denotes reversed scan of data or not
uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
STimeWindow scanRange;
double ratio;
int32_t dataRequired;
......
......@@ -37,6 +37,8 @@ typedef struct SPlanContext {
bool isStmtQuery;
void* pTransporter;
struct SCatalog* pCatalog;
char* pMsg;
int32_t msgLen;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -38,11 +38,12 @@ typedef struct SRpcConnInfo {
typedef struct SRpcMsg {
tmsg_t msgType;
void *pCont;
void * pCont;
int contLen;
int32_t code;
void *handle; // rpc handle returned to app
void *ahandle; // app handle set by client
void * handle; // rpc handle returned to app
void * ahandle; // app handle set by client
int64_t refId; // refid, used by server
int noResp; // has response or not(default 0, 0: resp, 1: no resp);
int persistHandle; // persist handle or not
......@@ -54,8 +55,8 @@ typedef struct {
uint16_t clientPort;
SRpcMsg rpcMsg;
int32_t rspLen;
void *pRsp;
void *pNode;
void * pRsp;
void * pNode;
} SNodeMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *);
......@@ -64,7 +65,7 @@ typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *);
typedef struct SRpcInit {
uint16_t localPort; // local port
char *label; // for debug purpose
char * label; // for debug purpose
int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
......@@ -97,23 +98,23 @@ typedef struct {
typedef struct {
int32_t msgType;
void *val;
void * val;
int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg);
} SRpcBrokenlinkVal;
typedef struct {
SHashObj *args;
SHashObj * args;
SRpcBrokenlinkVal brokenVal;
} SRpcCtx;
int32_t rpcInit();
void rpcCleanup();
void *rpcOpen(const SRpcInit *pRpc);
void * rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
void *rpcMallocCont(int contLen);
void * rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
void * rpcReallocCont(void *ptr, int contLen);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
......
......@@ -623,6 +623,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636)
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -53,8 +53,8 @@ int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, EProcFuncType ftype);
void taosProcRemoveHandle(SProcObj *pProc, void *handle);
void *handle, int64_t handleRef, EProcFuncType ftype);
int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle);
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle));
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype);
......
......@@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite,
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter
.pTransporter = pRequest->pTscObj->pAppInfo->pTransporter,
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
};
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
if (TSDB_CODE_SUCCESS == code) {
......
......@@ -237,18 +237,20 @@ static int32_t taosLoadCfg(SConfig *pCfg, const char *inputCfgDir, const char *e
char cfgFile[PATH_MAX + 100] = {0};
taosExpandDir(inputCfgDir, cfgDir, PATH_MAX);
snprintf(cfgFile, sizeof(cfgFile), "%s" TD_DIRSEP "taos.cfg", cfgDir);
if (taosIsDir(cfgDir)) {
snprintf(cfgFile, sizeof(cfgFile), "%s" TD_DIRSEP "taos.cfg", cfgDir);
} else {
tstrncpy(cfgFile, cfgDir, sizeof(cfgDir));
}
if (cfgLoad(pCfg, CFG_STYPE_APOLLO_URL, apolloUrl) != 0) {
uError("failed to load from apollo url:%s since %s", apolloUrl, terrstr());
return -1;
}
if (cfgLoad(pCfg, CFG_STYPE_CFG_FILE, cfgDir) != 0) {
if (cfgLoad(pCfg, CFG_STYPE_CFG_FILE, cfgFile) != 0) {
uInfo("cfg file:%s not read since %s", cfgFile, terrstr());
return 0;
}
if (cfgLoad(pCfg, CFG_STYPE_CFG_FILE, cfgFile) != 0) {
uError("failed to load from cfg file:%s since %s", cfgFile, terrstr());
return -1;
}
if (cfgLoad(pCfg, CFG_STYPE_ENV_FILE, envFile) != 0) {
......
......@@ -190,7 +190,7 @@ int main(int argc, char const *argv[]) {
}
if (dmInitLog() != 0) {
printf("failed to start since init log error\n");
dError("failed to start since init log error");
return -1;
}
......
......@@ -17,7 +17,7 @@
#include "dmImp.h"
static void dmUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
if (pDnode->data.dnodeId == 0) {
if (pDnode->data.dnodeId == 0 || pDnode->data.clusterId == 0) {
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pDnode->data.latch);
pDnode->data.dnodeId = pCfg->dnodeId;
......@@ -57,6 +57,7 @@ void dmSendStatusReq(SDnode *pDnode) {
req.dnodeVer = pDnode->data.dnodeVer;
req.dnodeId = pDnode->data.dnodeId;
req.clusterId = pDnode->data.clusterId;
if (req.clusterId == 0) req.dnodeId = 0;
req.rebootTime = pDnode->data.rebootTime;
req.updateTime = pDnode->data.updateTime;
req.numOfCores = tsNumOfCores;
......@@ -145,10 +146,10 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
dError("node:%s, failed to create since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been created", pWrapper->name);
(void)dmOpenNode(pWrapper);
pWrapper->required = true;
pWrapper->deployed = true;
pWrapper->procType = pDnode->ptype;
(void)dmOpenNode(pWrapper);
}
taosThreadMutexUnlock(&pDnode->mutex);
......@@ -170,13 +171,13 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
dError("node:%s, failed to drop since %s", pWrapper->name, terrstr());
} else {
dDebug("node:%s, has been dropped", pWrapper->name);
pWrapper->required = false;
pWrapper->deployed = false;
}
dmReleaseWrapper(pWrapper);
if (code == 0) {
pWrapper->required = false;
pWrapper->deployed = false;
dmCloseNode(pWrapper);
taosRemoveDir(pWrapper->path);
}
......
......@@ -59,6 +59,10 @@ static inline int32_t dmBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
pMsg->clientIp = connInfo.clientIp;
pMsg->clientPort = connInfo.clientPort;
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
if ((pRpc->msgType & 1u)) {
assert(pRpc->refId != 0);
}
// assert(pRpc->handle != NULL && pRpc->refId != 0 && pMsg->rpcMsg.refId != 0);
return 0;
}
......@@ -67,12 +71,15 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = NULL;
uint16_t msgType = pRpc->msgType;
bool needRelease = false;
if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) {
dmSetMnodeEpSet(pWrapper->pDnode, pEpSet);
}
if (dmMarkWrapper(pWrapper) != 0) goto _OVER;
needRelease = true;
if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
......@@ -84,7 +91,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType),
pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
PROC_FUNC_REQ);
pRpc->refId, PROC_FUNC_REQ);
} else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1);
......@@ -107,7 +114,7 @@ _OVER:
}
}
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId};
tmsgSendRsp(&rsp);
}
dTrace("msg:%p, is freed", pMsg);
......@@ -115,7 +122,9 @@ _OVER:
rpcFreeCont(pRpc->pCont);
}
dmReleaseWrapper(pWrapper);
if (needRelease) {
dmReleaseWrapper(pWrapper);
}
}
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
......@@ -134,7 +143,8 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pDnode->status != DND_STAT_RUNNING) {
dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (isReq) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle};
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
......@@ -143,7 +153,8 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (isReq && pMsg->pCont == NULL) {
dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle};
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
return;
}
......@@ -151,7 +162,8 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pWrapper == NULL) {
dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle);
if (isReq) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle};
SRpcMsg rspMsg = {
.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle, .refId = pMsg->refId};
rpcSendResponse(&rspMsg);
}
rpcFreeCont(pMsg->pCont);
......@@ -170,6 +182,9 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle);
if (isReq) {
assert(pMsg->refId != 0);
}
dmProcessRpcMsg(pWrapper, pMsg, pEpSet);
}
......@@ -317,7 +332,7 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
if (code != 0) {
dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
if (pRpc->msgType & 1U) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno, .refId = pRpc->refId};
dmSendRsp(pWrapper, &rsp);
}
......@@ -346,7 +361,7 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break;
case PROC_FUNC_RSP:
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
dmSendRpcRsp(pWrapper->pDnode, pMsg);
break;
default:
......
......@@ -105,7 +105,7 @@ void dmStopMonitorThread(SDnode *pDnode) {
}
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnode *pDnode = pInfo->ahandle;
SDnode * pDnode = pInfo->ahandle;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg);
......@@ -150,7 +150,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (pRpc->msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId};
rpcSendResponse(&rsp);
}
......
......@@ -177,7 +177,7 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
SServerStatusRsp statusRsp = {0};
dmGetServerStatus(pDnode, &statusRsp);
SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle};
SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
if (rspLen < 0) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -53,7 +53,7 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1;
}
if (createReq.dnodeId != pDnode->data.dnodeId) {
if (pDnode->data.dnodeId != 0 && createReq.dnodeId != pDnode->data.dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->data.dnodeId);
return -1;
......
......@@ -17,7 +17,8 @@
#include "bmInt.h"
static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, .refId = pMsg->rpcMsg.refId};
tmsgSendRsp(&rpcRsp);
dTrace("msg:%p, is freed", pMsg);
......@@ -38,6 +39,7 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
static inline void bmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
......@@ -101,7 +103,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SBnodeMgmt * pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......@@ -110,7 +112,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SBnodeMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......
......@@ -134,9 +134,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
if (alterReq.dnodeId != pDnode->data.dnodeId) {
if (pDnode->data.dnodeId != 0 && alterReq.dnodeId != pDnode->data.dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to alter mnode since %s, dnodeId:%d input:%d", terrstr(), pDnode->data.dnodeId, alterReq.dnodeId);
dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pDnode->data.dnodeId);
return -1;
} else {
return mmAlter(pMgmt, &alterReq);
......
......@@ -161,9 +161,9 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
SMnodeOpt option = {0};
if (!deployed) {
dInfo("mnode start to deploy");
if (pWrapper->procType == DND_PROC_CHILD) {
// if (pWrapper->procType == DND_PROC_CHILD) {
pWrapper->pDnode->data.dnodeId = 1;
}
// }
mmBuildOptionForDeploy(pMgmt, &option);
} else {
dInfo("mnode start to open");
......
......@@ -19,6 +19,7 @@
static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
......
......@@ -19,6 +19,7 @@
static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
......
......@@ -19,6 +19,7 @@
static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
......@@ -149,7 +150,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
}
int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSnodeMgmt * pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -162,7 +163,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSnodeMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......@@ -171,7 +172,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSnodeMgmt * pMgmt = pWrapper->pMgmt;
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) {
......@@ -185,7 +186,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSnodeMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......
......@@ -23,6 +23,7 @@
static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
......@@ -126,6 +127,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.code = 0;
rsp.handle = pRpc->handle;
rsp.ahandle = pRpc->ahandle;
rsp.refId = pRpc->refId;
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
tmsgSendRsp(&rsp);
......@@ -134,13 +136,14 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
// sync integration response
for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SNodeMsg *pMsg;
SRpcMsg *pRpc;
SRpcMsg * pRpc;
pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
pRpc = &pMsg->rpcMsg;
rsp.ahandle = pRpc->ahandle;
rsp.handle = pRpc->handle;
rsp.refId = pRpc->refId;
rsp.pCont = NULL;
rsp.contLen = 0;
......@@ -172,11 +175,9 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
SNodeMsg * pMsg = NULL;
SRpcMsg rsp;
// static int64_t version = 0;
for (int32_t i = 0; i < numOfMsgs; ++i) {
#if 1
// sync integration
......@@ -208,6 +209,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
rsp.ahandle = pMsg->rpcMsg.ahandle;
rsp.handle = pMsg->rpcMsg.handle;
rsp.refId = pMsg->rpcMsg.refId;
tmsgSendRsp(&rsp);
}
#endif
......@@ -216,7 +218,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
SNodeMsg * pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
......@@ -229,7 +231,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
SNodeMsg * pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
......@@ -246,7 +248,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg * pRpc = &pMsg->rpcMsg;
SMsgHead *pHead = pRpc->pCont;
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
......@@ -315,7 +317,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SVnodesMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
......@@ -323,7 +325,7 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SVnodesMgmt * pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......@@ -333,7 +335,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHead *pHead = pRpc->pCont;
SMsgHead * pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1;
......@@ -346,6 +348,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
} else {
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
switch (qtype) {
case QUERY_QUEUE:
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
......
......@@ -135,6 +135,7 @@ typedef struct {
int32_t failedTimes;
void* rpcHandle;
void* rpcAHandle;
int64_t rpcRefId;
void* rpcRsp;
int32_t rpcRspLen;
SArray* redoLogs;
......
......@@ -193,9 +193,9 @@ TRANS_ENCODE_OVER:
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
STrans *pTrans = NULL;
char *pData = NULL;
SSdbRow * pRow = NULL;
STrans * pTrans = NULL;
char * pData = NULL;
int32_t dataLen = 0;
int8_t sver = 0;
int32_t redoLogNum = 0;
......@@ -456,7 +456,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
}
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
if (pTrans == NULL) {
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
......@@ -484,6 +484,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->createdTime = taosGetTimestampMs();
pTrans->rpcHandle = pReq->handle;
pTrans->rpcAHandle = pReq->ahandle;
pTrans->rpcRefId = pReq->refId;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
......@@ -625,7 +626,7 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT
if (mndIsBasicTrans(pNewTrans)) return 0;
STrans *pTrans = NULL;
void *pIter = NULL;
void * pIter = NULL;
int32_t code = 0;
while (1) {
......@@ -703,6 +704,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pNew->rpcHandle = pTrans->rpcHandle;
pNew->rpcAHandle = pTrans->rpcAHandle;
pNew->rpcRefId = pTrans->rpcRefId;
pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen;
pTrans->rpcRsp = NULL;
......@@ -767,6 +769,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
.code = pTrans->code,
.ahandle = pTrans->rpcAHandle,
.refId = pTrans->rpcRefId,
.pCont = rpcCont,
.contLen = pTrans->rpcRspLen};
tmsgSendRsp(&rspMsg);
......@@ -827,7 +830,7 @@ HANDLE_ACTION_RSP_OVER:
}
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
int32_t arraySize = taosArrayGetSize(pArray);
if (arraySize == 0) return 0;
......@@ -1202,11 +1205,11 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndProcessKillTransReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode;
SMnode * pMnode = pReq->pNode;
SKillTransReq killReq = {0};
int32_t code = -1;
SUserObj *pUser = NULL;
STrans *pTrans = NULL;
SUserObj * pUser = NULL;
STrans * pTrans = NULL;
if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -1246,7 +1249,7 @@ KILL_OVER:
void mndTransPullup(SMnode *pMnode) {
STrans *pTrans = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
......@@ -1261,11 +1264,11 @@ void mndTransPullup(SMnode *pMnode) {
static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
STrans *pTrans = NULL;
int32_t cols = 0;
char *pWrite;
char * pWrite;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans);
......
......@@ -22,21 +22,21 @@ int vnodeQueryOpen(SVnode *pVnode) {
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
STbCfg *pTbCfg = NULL;
STbCfg *pStbCfg = NULL;
STbCfg * pTbCfg = NULL;
STbCfg * pStbCfg = NULL;
tb_uid_t uid;
int32_t nCols;
int32_t nTagCols;
SSchemaWrapper *pSW = NULL;
STableMetaRsp *pTbMetaMsg = NULL;
STableMetaRsp * pTbMetaMsg = NULL;
STableMetaRsp metaRsp = {0};
SSchema *pTagSchema;
SSchema * pTagSchema;
SRpcMsg rpcMsg;
int msgLen = 0;
int32_t code = 0;
char tableFName[TSDB_TABLE_FNAME_LEN];
int32_t rspLen = 0;
void *pRsp = NULL;
void * pRsp = NULL;
STableInfoReq infoReq = {0};
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
......@@ -142,6 +142,7 @@ _exit:
rpcMsg.handle = pMsg->handle;
rpcMsg.ahandle = pMsg->ahandle;
rpcMsg.refId = pMsg->refId;
rpcMsg.pCont = pRsp;
rpcMsg.contLen = rspLen;
rpcMsg.code = code;
......
......@@ -113,7 +113,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SVnode *pVnode = (SVnode *)(pFsm->data);
SVnode * pVnode = (SVnode *)(pFsm->data);
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
SRpcMsg applyMsg;
syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
......@@ -133,6 +133,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {
applyMsg.handle = saveRpcMsg.handle;
applyMsg.ahandle = saveRpcMsg.ahandle;
applyMsg.refId = saveRpcMsg.refId;
} else {
applyMsg.handle = NULL;
applyMsg.ahandle = NULL;
......
......@@ -344,11 +344,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count);
if (pTagScanNode->reverse) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse);
}
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -361,10 +356,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendVerboseExecInfo(pResNode->pExecInfo, tbuf, &tlen));
if (tlen) {
......@@ -388,11 +379,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count);
if (pTblScanNode->scan.reverse) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse);
}
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -405,10 +391,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
......@@ -434,11 +416,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count);
if (pSTblScanNode->scan.reverse) {
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse);
}
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -451,10 +428,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pSTblScanNode->scan.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pSTblScanNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
......
......@@ -40,6 +40,11 @@ bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t minFunction(SqlFunctionCtx* pCtx);
int32_t maxFunction(SqlFunctionCtx *pCtx);
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t avgFunction(SqlFunctionCtx* pCtx);
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stddevFunction(SqlFunctionCtx* pCtx);
......
......@@ -104,7 +104,7 @@ static int32_t translateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
return TSDB_CODE_SUCCESS;
}
......@@ -479,6 +479,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = stddevFunction,
.finalizeFunc = stddevFinalize
},
{
.name = "avg",
.type = FUNCTION_TYPE_AVG,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateInNumOutDou,
.getEnvFunc = getAvgFuncEnv,
.initFunc = avgFunctionSetup,
.processFunc = avgFunction,
.finalizeFunc = avgFinalize
},
{
.name = "percentile",
.type = FUNCTION_TYPE_PERCENTILE,
......
......@@ -20,6 +20,59 @@
#include "tdatablock.h"
#include "tpercentile.h"
typedef struct SSumRes {
union {
int64_t isum;
uint64_t usum;
double dsum;
};
} SSumRes;
typedef struct SAvgRes {
double result;
SSumRes sum;
int64_t count;
} SAvgRes;
typedef struct STopBotResItem {
SVariant v;
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
struct {
int32_t pageId;
int32_t offset;
} tuplePos; // tuple data of this chosen row
} STopBotResItem;
typedef struct STopBotRes {
int32_t pageId;
// int32_t num;
STopBotResItem *pItems;
} STopBotRes;
typedef struct SStddevRes {
double result;
int64_t count;
union {double quadraticDSum; int64_t quadraticISum;};
union {double dsum; int64_t isum;};
} SStddevRes;
typedef struct SPercentileInfo {
double result;
tMemBucket *pMemBucket;
int32_t stage;
double minval;
double maxval;
int64_t numOfElems;
} SPercentileInfo;
typedef struct SDiffInfo {
bool hasPrev;
bool includeNull;
bool ignoreNegative;
bool firstOutput;
union { int64_t i64; double d64;} prev;
} SDiffInfo;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
......@@ -28,13 +81,50 @@
(_info)->numOfRes = (res); \
} while (0)
typedef struct SSumRes {
union {
int64_t isum;
uint64_t usum;
double dsum;
};
} SSumRes;
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t *d = (_t *)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (pResultInfo->initialized) {
......@@ -135,7 +225,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) {
int32_t type = pInput->pData[0]->info.type;
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (pInput->colDataAggIsSet) {
numOfElem = pInput->numOfRows - pAgg->numOfNull;
ASSERT(numOfElem >= 0);
......@@ -190,6 +280,145 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(double);
return true;
}
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
memset(pRes, 0, sizeof(SAvgRes));
return true;
}
int32_t avgFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
switch (type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t* plist = (int8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.isum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t* plist = (int16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.isum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t* plist = (int32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.isum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t* plist = (int64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.isum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.dsum += plist[i];
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double* plist = (double*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pAvgRes->count += 1;
pAvgRes->sum.dsum += plist[i];
}
break;
}
default:
break;
}
// data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
return TSDB_CODE_SUCCESS;
}
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (IS_INTEGER_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.isum / ((double) pAvgRes->count);
} else {
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
}
return functionFinalize(pCtx, pBlock, slotId);
}
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
return FUNC_DATA_REQUIRED_STATIS_LOAD;
}
......@@ -292,49 +521,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t *d = (_t *)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
int32_t numOfElems = 0;
......@@ -479,13 +665,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS;
}
typedef struct SStddevRes {
double result;
int64_t count;
union {double quadraticDSum; int64_t quadraticISum;};
union {double dsum; int64_t isum;};
} SStddevRes;
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SStddevRes);
return true;
......@@ -588,8 +767,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
pStddevRes->dsum += plist[i];
pStddevRes->quadraticDSum += plist[i] * plist[i];
}
break;
}
......@@ -603,8 +782,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
pStddevRes->dsum += plist[i];
pStddevRes->quadraticDSum += plist[i] * plist[i];
}
break;
}
......@@ -619,21 +798,21 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
}
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
double avg = pStddevRes->isum / ((double) pStddevRes->count);
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
double avg;
if (IS_INTEGER_TYPE(type)) {
avg = pStddevRes->isum / ((double) pStddevRes->count);
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
} else {
avg = pStddevRes->dsum / ((double) pStddevRes->count);
pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg);
}
return functionFinalize(pCtx, pBlock, slotId);
}
typedef struct SPercentileInfo {
double result;
tMemBucket *pMemBucket;
int32_t stage;
double minval;
double maxval;
int64_t numOfElems;
} SPercentileInfo;
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SPercentileInfo);
return true;
......@@ -928,14 +1107,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS;
}
typedef struct SDiffInfo {
bool hasPrev;
bool includeNull;
bool ignoreNegative;
bool firstOutput;
union { int64_t i64; double d64;} prev;
} SDiffInfo;
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo);
return true;
......@@ -1168,21 +1339,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
}
typedef struct STopBotResItem {
SVariant v;
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
struct {
int32_t pageId;
int32_t offset;
} tuplePos; // tuple data of this chosen row
} STopBotResItem;
typedef struct STopBotRes {
int32_t pageId;
// int32_t num;
STopBotResItem *pItems;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
......@@ -1335,4 +1491,4 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
return pEntryInfo->numOfRes;
// return functionFinalize(pCtx, pBlock, slotId);
}
\ No newline at end of file
}
......@@ -671,9 +671,6 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
static const char* jkScanPhysiPlanScanCols = "ScanCols";
static const char* jkScanPhysiPlanTableId = "TableId";
static const char* jkScanPhysiPlanTableType = "TableType";
static const char* jkScanPhysiPlanScanOrder = "ScanOrder";
static const char* jkScanPhysiPlanScanCount = "ScanCount";
static const char* jkScanPhysiPlanReverseScanCount = "ReverseScanCount";
static const char* jkScanPhysiPlanTableName = "TableName";
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
......@@ -689,15 +686,6 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanOrder, pNode->order);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanCount, pNode->count);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanReverseScanCount, pNode->reverse);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
}
......@@ -718,15 +706,6 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanOrder, &pNode->order);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanCount, &pNode->count);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkScanPhysiPlanReverseScanCount, &pNode->reverse);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
}
......@@ -742,7 +721,8 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
return jsonToPhysiScanNode(pJson, pObj);
}
static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag";
static const char* jkTableScanPhysiPlanScanCount = "ScanCount";
static const char* jkTableScanPhysiPlanReverseScanCount = "ReverseScanCount";
static const char* jkTableScanPhysiPlanStartKey = "StartKey";
static const char* jkTableScanPhysiPlanEndKey = "EndKey";
static const char* jkTableScanPhysiPlanRatio = "Ratio";
......@@ -759,7 +739,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanFlag, pNode->scanFlag);
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanCount, pNode->scanSeq[0]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanReverseScanCount, pNode->scanSeq[1]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey);
......@@ -800,7 +783,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
int32_t code = jsonToPhysiScanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanFlag, &pNode->scanFlag);
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanCount, &pNode->scanSeq[0]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanReverseScanCount, &pNode->scanSeq[1]);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->scanRange.skey);
......
......@@ -611,6 +611,7 @@ function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D).
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
function_expression(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNodeNoArg(pCxt, &B)); }
//function_expression(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
%type noarg_func { SToken }
%destructor noarg_func { }
......
......@@ -83,7 +83,7 @@ static EDealRes calcConstOperator(SOperatorNode** pNode, void* pContext) {
static EDealRes calcConstFunction(SFunctionNode** pNode, void* pContext) {
SFunctionNode* pFunc = *pNode;
if (!fmIsScalarFunc(pFunc->funcId)) {
if (!fmIsScalarFunc(pFunc->funcId) || fmIsUserDefinedFunc(pFunc->funcId)) {
return DEAL_RES_CONTINUE;
}
SNode* pParam = NULL;
......
......@@ -543,7 +543,9 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
TSDB_DATA_TYPE_BLOB == rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
}
if (TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) {
if ((TSDB_DATA_TYPE_TIMESTAMP == ldt.type && TSDB_DATA_TYPE_TIMESTAMP == rdt.type) ||
(TSDB_DATA_TYPE_TIMESTAMP == ldt.type && IS_VAR_DATA_TYPE(rdt.type)) ||
(TSDB_DATA_TYPE_TIMESTAMP == rdt.type && IS_VAR_DATA_TYPE(ldt.type))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
}
......@@ -2614,37 +2616,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
(FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq);
}
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
SCMCreateTopicReq createReq = {0};
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
// tNameGetFullDbName(&name, pReq->name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name);
pReq->igExists = pStmt->ignoreExists;
pReq->withTbName = pStmt->pOptions->withTable;
pReq->withSchema = pStmt->pOptions->withSchema;
pReq->withTag = pStmt->pOptions->withTag;
pReq->sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == pReq->sql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pStmt->pQuery) {
strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName);
pCxt->pParseCxt->topicQuery = true;
int32_t code = translateQuery(pCxt, pStmt->pQuery);
code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL);
}
if (TSDB_CODE_SUCCESS != code) {
return code;
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
}
} else {
strcpy(createReq.subscribeDbName, pStmt->subscribeDbName);
strcpy(pReq->subscribeDbName, pStmt->subscribeDbName);
}
createReq.sql = strdup(pCxt->pParseCxt->pSql);
if (NULL == createReq.sql) {
return TSDB_CODE_OUT_OF_MEMORY;
return code;
}
static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
if (NULL == pStmt->pQuery) {
return TSDB_CODE_SUCCESS;
}
SName name;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db));
// tNameGetFullDbName(&name, createReq.name);
tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), createReq.name);
createReq.igExists = pStmt->ignoreExists;
createReq.withTbName = pStmt->pOptions->withTable;
createReq.withSchema = pStmt->pOptions->withSchema;
createReq.withTag = pStmt->pOptions->withTag;
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList &&
NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) {
return TSDB_CODE_SUCCESS;
}
}
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq);
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY);
}
static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) {
SCMCreateTopicReq createReq = {0};
int32_t code = checkCreateTopic(pCxt, pStmt);
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTopicReq(pCxt, pStmt, &createReq);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq);
}
tFreeSCMCreateTopicReq(&createReq);
return code;
}
......
......@@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "soffset/offset can not be less than 0";
case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY:
return "slimit/soffset only available for PARTITION BY query";
case TSDB_CODE_PAR_INVALID_TOPIC_QUERY:
return "Invalid topic query";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
......
......@@ -199,7 +199,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*);
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*);
pScan->scanFlag = MAIN_SCAN;
pScan->scanSeq[0] = 1;
pScan->scanSeq[1] = 0;
pScan->scanRange = TSWINDOW_INITIALIZER;
pScan->tableName.type = TSDB_TABLE_NAME_T;
pScan->tableName.acctId = pCxt->pPlanCxt->acctId;
......
......@@ -20,11 +20,14 @@
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0)
#define OPTIMIZE_FLAG_CPD OPTIMIZE_FLAG_MASK(1)
#define OPTIMIZE_FLAG_OPK OPTIMIZE_FLAG_MASK(2)
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef struct SOptimizeContext {
SPlanContext* pPlanCxt;
bool optimized;
} SOptimizeContext;
......@@ -57,7 +60,23 @@ typedef enum ECondAction {
// after supporting outer join, there are other possibilities
} ECondAction;
EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
typedef bool (*FMayBeOptimized)(SLogicNode* pNode);
static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) {
if (func(pNode)) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pScanNode = optFindPossibleNode((SLogicNode*)pChild, func);
if (NULL != pScanNode) {
return pScanNode;
}
}
return NULL;
}
EDealRes osdHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
*((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType);
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
......@@ -65,9 +84,9 @@ EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static bool haveNormalCol(SNodeList* pList) {
static bool osdHaveNormalCol(SNodeList* pList) {
bool res = false;
nodesWalkExprsPostOrder(pList, haveNormalColImpl, &res);
nodesWalkExprsPostOrder(pList, osdHaveNormalColImpl, &res);
return res;
}
......@@ -89,21 +108,7 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
}
return !haveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
static SLogicNode* osdFindPossibleScanNode(SLogicNode* pNode) {
if (osdMayBeOptimized(pNode)) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pScanNode = osdFindPossibleScanNode((SLogicNode*)pChild);
if (NULL != pScanNode) {
return pScanNode;
}
}
return NULL;
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
static SNodeList* osdGetAllFuncs(SLogicNode* pNode) {
......@@ -138,7 +143,7 @@ static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs,
}
static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) {
pInfo->pScan = (SScanLogicNode*)osdFindPossibleScanNode(pLogicNode);
pInfo->pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, osdMayBeOptimized);
if (NULL == pInfo->pScan) {
return TSDB_CODE_SUCCESS;
}
......@@ -345,7 +350,7 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
}
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
if (NULL == pScan->node.pConditions) {
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
return TSDB_CODE_SUCCESS;
}
......@@ -359,7 +364,10 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
pScan->node.pConditions = pOtherCond;
}
if (TSDB_CODE_SUCCESS != code) {
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
pCxt->optimized = true;
} else {
nodesDestroyNode(pPrimaryKeyCond);
nodesDestroyNode(pOtherCond);
}
......@@ -367,7 +375,7 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
return code;
}
static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
static bool cpdBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
SNode* pTableCol = NULL;
FOREACH(pTableCol, pTableCols) {
if (nodesEqualNode(pCondCol, pTableCol)) {
......@@ -380,9 +388,9 @@ static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) {
SCpdIsMultiTableCondCxt* pCxt = pContext;
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
if (belongThisTable(pNode, pCxt->pLeftCols)) {
if (cpdBelongThisTable(pNode, pCxt->pLeftCols)) {
pCxt->havaLeftCol = true;
} else if (belongThisTable(pNode, pCxt->pRightCols)) {
} else if (cpdBelongThisTable(pNode, pCxt->pRightCols)) {
pCxt->haveRightCol = true;
}
return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE;
......@@ -508,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
SColumnNode* pCol = (SColumnNode*)pNode;
if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
return false;
}
return cpdBelongThisTable(pNode, pTableCols);
}
static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
if (QUERY_NODE_OPERATOR != nodeType(pCond)) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets;
SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions;
if (cpdIsPrimaryKey(pOper->pLeft, pLeftCols)) {
return cpdIsPrimaryKey(pOper->pRight, pRightCols);
} else if (cpdIsPrimaryKey(pOper->pLeft, pRightCols)) {
return cpdIsPrimaryKey(pOper->pRight, pLeftCols);
}
return false;
}
static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
SNode* pCond = NULL;
FOREACH(pCond, pOnCond->pParameterList) {
if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression");
return TSDB_CODE_FAILED;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->pOnConditions) {
snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "not support cross join");
return TSDB_CODE_FAILED;
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
} else {
return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
}
}
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->node.pConditions) {
if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pJoin->node.pConditions) {
return cpdCheckJoinOnCond(pCxt, pJoin);
}
SNode* pOnCond = NULL;
SNode* pLeftChildCond = NULL;
SNode* pRightChildCond = NULL;
......@@ -527,7 +600,11 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi
code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond);
}
if (TSDB_CODE_SUCCESS != code) {
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD);
pCxt->optimized = true;
code = cpdCheckJoinOnCond(pCxt, pJoin);
} else {
nodesDestroyNode(pOnCond);
nodesDestroyNode(pLeftChildCond);
nodesDestroyNode(pRightChildCond);
......@@ -572,15 +649,129 @@ static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
return cpdPushCondition(pCxt, pLogicNode);
}
static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) {
if (1 != LIST_LENGTH(pSortKeys)) {
return false;
}
SNode* pNode = ((SOrderByExprNode*)nodesListGetNode(pSortKeys, 0))->pExpr;
return (QUERY_NODE_COLUMN == nodeType(pNode) ? (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) : false);
}
static bool opkSortMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SORT != nodeType(pNode)) {
return false;
}
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OPK)) {
return false;
}
return true;
}
static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return nodesListMakeAppend(pScanNodes, pNode);
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
if (TSDB_CODE_SUCCESS == code) {
code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes);
}
return code;
case QUERY_NODE_LOGIC_PLAN_AGG:
*pNotOptimize = true;
return code;
default:
break;
}
if (1 != LIST_LENGTH(pNode->pChildren)) {
*pNotOptimize = true;
}
return opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes);
}
static int32_t opkGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) {
bool notOptimize = false;
int32_t code = opkGetScanNodesImpl(pNode, &notOptimize, pScanNodes);
if (TSDB_CODE_SUCCESS != code || notOptimize) {
nodesClearList(*pScanNodes);
}
return code;
}
static EOrder opkGetPrimaryKeyOrder(SSortLogicNode* pSort) {
return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order;
}
static SNode* opkRewriteDownNode(SSortLogicNode* pSort) {
SNode* pDownNode = nodesListGetNode(pSort->node.pChildren, 0);
// todo
pSort->node.pChildren = NULL;
return pDownNode;
}
static int32_t opkDoOptimized(SOptimizeContext* pCxt, SSortLogicNode* pSort, SNodeList* pScanNodes) {
EOrder order = opkGetPrimaryKeyOrder(pSort);
if (ORDER_DESC == order) {
SNode* pScan = NULL;
FOREACH(pScan, pScanNodes) {
((SScanLogicNode*)pScan)->scanSeq[0] = 0;
((SScanLogicNode*)pScan)->scanSeq[1] = 1;
}
}
if (NULL == pSort->node.pParent) {
// todo
return TSDB_CODE_SUCCESS;
}
SNode* pDownNode = opkRewriteDownNode(pSort);
SNode* pNode;
FOREACH(pNode, pSort->node.pParent->pChildren) {
if (nodesEqualNode(pNode, pSort)) {
REPLACE_NODE(pDownNode);
break;
}
}
nodesDestroyNode(pSort);
return TSDB_CODE_SUCCESS;
}
static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) {
OPTIMIZE_FLAG_SET_MASK(pSort->node.optimizedFlag, OPTIMIZE_FLAG_OPK);
if (!opkIsPrimaryKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pScanNodes = NULL;
int32_t code = opkGetScanNodes(nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes);
if (TSDB_CODE_SUCCESS == code && NULL != pScanNodes) {
code = opkDoOptimized(pCxt, pSort, pScanNodes);
}
nodesClearList(pScanNodes);
return code;
}
static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized);
if (NULL == pSort) {
return TSDB_CODE_SUCCESS;
}
return opkOptimizeImpl(pCxt, pSort);
}
static const SOptimizeRule optimizeRuleSet[] = {
{ .pName = "OptimizeScanData", .optimizeFunc = osdOptimize },
{ .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize }
{ .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize },
{ .pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize }
};
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
SOptimizeContext cxt = { .optimized = false };
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) {
SOptimizeContext cxt = { .pPlanCxt = pCxt, .optimized = false };
do {
cxt.optimized = false;
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
......@@ -594,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
}
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return applyOptimizeRule(pLogicNode);
return applyOptimizeRule(pCxt, pLogicNode);
}
......@@ -398,9 +398,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
if (TSDB_CODE_SUCCESS == code) {
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
pScanPhysiNode->order = TSDB_ORDER_ASC;
pScanPhysiNode->count = 1;
pScanPhysiNode->reverse = 0;
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
}
......@@ -432,7 +429,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
return TSDB_CODE_OUT_OF_MEMORY;
}
pTableScan->scanFlag = pScanLogicNode->scanFlag;
memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq));
pTableScan->scanRange = pScanLogicNode->scanRange;
pTableScan->ratio = pScanLogicNode->ratio;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planTestUtil.h"
#include "planner.h"
using namespace std;
class PlanOptimizeTest : public PlannerTestBase {
};
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
useDb("root", "test");
run("select * from t1 order by ts");
run("select * from t1 order by ts desc");
run("select c1 from t1 order by ts");
run("select c1 from t1 order by ts desc");
}
......@@ -25,10 +25,10 @@ extern "C" {
#include "ttimer.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 3000
#define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 3000
enum {
QW_PHASE_PRE_QUERY = 1,
......@@ -60,7 +60,6 @@ enum {
QW_WRITE,
};
enum {
QW_NOT_EXIST_RET_ERR = 1,
QW_NOT_EXIST_ADD,
......@@ -73,25 +72,26 @@ typedef struct SQWDebug {
} SQWDebug;
typedef struct SQWConnInfo {
void *handle;
void *ahandle;
void * handle;
void * ahandle;
int64_t refId;
} SQWConnInfo;
typedef struct SQWMsg {
void *node;
int32_t code;
char *msg;
int32_t msgLen;
SQWConnInfo connInfo;
void * node;
int32_t code;
char * msg;
int32_t msgLen;
SQWConnInfo connInfo;
} SQWMsg;
typedef struct SQWHbInfo {
SSchedulerHbRsp rsp;
SQWConnInfo connInfo;
SSchedulerHbRsp rsp;
SQWConnInfo connInfo;
} SQWHbInfo;
typedef struct SQWPhaseInput {
int32_t code;
int32_t code;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
......@@ -100,41 +100,40 @@ typedef struct SQWPhaseOutput {
#endif
} SQWPhaseOutput;
typedef struct SQWTaskStatus {
int64_t refId; // job's refId
int32_t code;
int8_t status;
typedef struct SQWTaskStatus {
int64_t refId; // job's refId
int32_t code;
int8_t status;
} SQWTaskStatus;
typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t phase;
int8_t taskType;
int8_t explain;
bool queryFetched;
bool queryEnd;
bool queryContinue;
bool queryInQueue;
int32_t rspCode;
SQWConnInfo ctrlConnInfo;
SQWConnInfo dataConnInfo;
int8_t events[QW_EVENT_MAX];
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
SRWLatch lock;
int8_t phase;
int8_t taskType;
int8_t explain;
bool queryFetched;
bool queryEnd;
bool queryContinue;
bool queryInQueue;
int32_t rspCode;
SQWConnInfo ctrlConnInfo;
SQWConnInfo dataConnInfo;
int8_t events[QW_EVENT_MAX];
void *taskHandle;
void *sinkHandle;
} SQWTaskCtx;
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
int32_t lastAccessTs; // timestamp in second
SRWLatch hbConnLock;
SQWConnInfo hbConnInfo;
SQueryNodeEpId hbEpId;
SQueryNodeEpId hbEpId;
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
SHashObj * tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus;
// Qnode/Vnode level task management
......@@ -142,100 +141,146 @@ typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
int8_t nodeType;
int32_t nodeId;
void *timer;
void * timer;
tmr_h hbTimer;
SRWLatch schLock;
// SRWLatch ctxLock;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb;
} SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_IDS() sId, qId, tId, rId
#define QW_FPARAMS() mgmt, QW_IDS()
#define QW_GET_EVENT_VALUE(ctx, event) atomic_load_8(&(ctx)->events[event])
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_IS_EVENT_RECEIVED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_RECEIVED)
#define QW_IS_EVENT_PROCESSED(ctx, event) (atomic_load_8(&(ctx)->events[event]) == QW_EVENT_PROCESSED)
#define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED)
#define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED)
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code)
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) \
do { \
*(uint64_t *)(id) = (qId); \
*(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \
} while (0)
#define QW_GET_QTID(id, qId, tId) \
do { \
(qId) = *(uint64_t *)(id); \
(tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
} while (0)
#define QW_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define QW_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define QW_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DUMP(param, ...) do { if (gQWDebug.dumpEnable) { qDebug("QW:%p " param, mgmt, __VA_ARGS__); } } while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) qDebugL("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:0x%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
#define QW_DUMP(param, ...) \
do { \
if (gQWDebug.dumpEnable) { \
qDebug("QW:%p " param, mgmt, __VA_ARGS__); \
} \
} while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
qDebugL("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) \
do { \
if (gQWDebug.lockEnable) { \
qDebug(__VA_ARGS__); \
} \
} while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
#define QW_LOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \
assert(atomic_load_32((_lock)) >= 0); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \
} while (0)
#define QW_UNLOCK(type, _lock) \
do { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \
} while (0)
#ifdef __cplusplus
}
......
此差异已折叠。
......@@ -1247,26 +1247,28 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
}
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum != 1) {
return TSDB_CODE_FAILED;
int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI);
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts);
}
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum != 1) {
return TSDB_CODE_FAILED;
int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI);
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendInt64(pOutput->columnData, i, &ts);
}
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum != 1) {
return TSDB_CODE_FAILED;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppend(pOutput->columnData, i, tsTimezoneStr, false);
}
colDataAppend(pOutput->columnData, pOutput->numOfRows, (char *)colDataGetData(pInput->columnData, 0), false);
pOutput->numOfRows = pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
......
......@@ -369,7 +369,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
SSubplan * child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -401,7 +401,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < parentNum; ++n) {
SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
SSubplan * parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
......@@ -491,7 +491,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SSchLevel level = {0};
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
SSchLevel *pLevel = NULL;
SSchLevel * pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START;
......@@ -1267,7 +1267,7 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
SSchTask *pTask = NULL;
SSchTask * pTask = NULL;
SSchJob *pJob = schAcquireJob(pParam->refId);
if (NULL == pJob) {
......@@ -1617,8 +1617,8 @@ _return:
int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
SSchHbCallbackParam *param = NULL;
SMsgSendInfo *pMsgSendInfo = NULL;
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SMsgSendInfo * pMsgSendInfo = NULL;
SQueryNodeAddr * addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
......@@ -1759,10 +1759,10 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
}
SRpcCtxVal dst = {0};
void *pIter = taosHashIterate(pSrc->args, NULL);
void * pIter = taosHashIterate(pSrc->args, NULL);
while (pIter) {
SRpcCtxVal *pVal = (SRpcCtxVal *)pIter;
int32_t *msgType = taosHashGetKey(pIter, NULL);
int32_t * msgType = taosHashGetKey(pIter, NULL);
dst = *pVal;
dst.val = NULL;
......@@ -1916,7 +1916,7 @@ _return:
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
void * msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
bool persistHandle = false;
......@@ -2673,7 +2673,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SSchTask * pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
taosArrayPush(pSub, &subDesc);
......@@ -2734,7 +2734,7 @@ void schedulerFreeTaskList(SArray *taskList) {
void schedulerDestroy(void) {
if (schMgmt.jobRef) {
SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0);
int64_t refId = 0;
int64_t refId = 0;
while (pJob) {
refId = pJob->refId;
......@@ -2751,12 +2751,12 @@ void schedulerDestroy(void) {
}
if (schMgmt.hbConnections) {
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
while (pIter != NULL) {
SSchHbTrans *hb = pIter;
schFreeRpcCtx(&hb->rpcCtx);
pIter = taosHashIterate(schMgmt.hbConnections, pIter);
}
}
taosHashCleanup(schMgmt.hbConnections);
schMgmt.hbConnections = NULL;
}
......
......@@ -448,4 +448,4 @@ static void syncIOTickPing(void *param, void *tmrId) {
syncPingDestroy(pMsg);
taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
}
\ No newline at end of file
}
......@@ -327,6 +327,10 @@ void transQueueClear(STransQueue* queue);
*/
void transQueueDestroy(STransQueue* queue);
/*
* init global func
*/
void transThreadOnce();
#ifdef __cplusplus
}
#endif
......
......@@ -16,6 +16,8 @@
#include "transComm.h"
// static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context;
int ret = -1;
......@@ -361,5 +363,10 @@ void transQueueDestroy(STransQueue* queue) {
transQueueClear(queue);
taosArrayDestroy(queue->q);
}
// int32_t transGetExHandle() {
// static
//}
// void transThreadOnce() {
// taosThreadOnce(&transModuleInit, );
//}
#endif
......@@ -139,12 +139,6 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) {
return -1;
}
if (taosRealPath(fullDir, NULL, PATH_MAX) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get realpath of dir:%s since %s", inputDir, terrstr());
return -1;
}
taosMemoryFreeClear(pItem->str);
pItem->str = strdup(fullDir);
if (pItem->str == NULL) {
......@@ -172,9 +166,8 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty
static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int32_t ival = (int32_t)atoi(value);
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "], use last src:%s value:%d",
pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax,
cfgStypeStr(pItem->stype), pItem->i32);
uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
......@@ -187,10 +180,8 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st
static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int64_t ival = (int64_t)atoi(value);
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64
"], use last src:%s value:%" PRId64,
pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax,
cfgStypeStr(pItem->stype), pItem->i64);
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
......@@ -203,9 +194,8 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st
static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
float fval = (float)atof(value);
if (fval < pItem->fmin || fval > pItem->fmax) {
uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f], use last src:%s value:%f", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax, cfgStypeStr(pItem->stype),
pItem->fval);
uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
......@@ -219,8 +209,8 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s
char *tmp = strdup(value);
if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str);
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
return -1;
}
......@@ -232,9 +222,8 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s
static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetDir(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str);
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
return -1;
}
......@@ -245,8 +234,8 @@ static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType styp
static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetLocale(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str);
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
return -1;
}
......@@ -257,8 +246,8 @@ static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType s
static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetCharset(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str);
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
return -1;
}
......@@ -269,8 +258,8 @@ static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType
static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetTimezone(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str);
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgStypeStr(stype), value, terrstr());
return -1;
}
......@@ -606,15 +595,19 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
char *line = NULL, *name, *value, *value2, *value3;
int32_t olen, vlen, vlen2, vlen3;
ssize_t _bytes = 0;
if (taosIsDir(filepath)) {
return -1;
}
int32_t code = 0;
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
// success when the file does not exist
if (errno == ENOENT) {
terrno = TAOS_SYSTEM_ERROR(errno);
uInfo("failed to load from cfg file %s since %s, use default parameters", filepath, terrstr());
return 0;
} else {
uError("failed to load from cfg file %s since %s", filepath, terrstr());
return -1;
}
}
while (!taosEOFFile(pFile)) {
......@@ -643,17 +636,24 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
if (vlen3 != 0) value3[vlen3] = 0;
}
cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
}
}
taosCloseFile(&pFile);
if (line != NULL) taosMemoryFreeClear(line);
uInfo("load from cfg file %s success", filepath);
return 0;
if (code == 0 || (code != 0 && terrno == TSDB_CODE_CFG_NOT_FOUND)) {
uInfo("load from cfg file %s success", filepath);
return 0;
} else {
uError("failed to load from cfg file %s since %s", filepath, terrstr());
return -1;
}
}
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -76,7 +76,7 @@
./test.sh -f tsim/insert/backquote.sim -m
./test.sh -f tsim/parser/fourArithmetic-basic.sim -m
./test.sh -f tsim/query/interval-offset.sim -m
./test.sh -f tsim/tmq/basic1.sim -m
#./test.sh -f tsim/tmq/basic1.sim -m
./test.sh -f tsim/stable/vnode3.sim -m
./test.sh -f tsim/qnode/basic1.sim -m
./test.sh -f tsim/mnode/basic1.sim -m
......@@ -85,6 +85,6 @@
./test.sh -f tsim/sma/tsmaCreateInsertData.sim
# --- valgrind
./test.sh -f tsim/valgrind/checkError.sim -v
#./test.sh -f tsim/valgrind/checkError.sim -v
#======================b1-end===============
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册