未验证 提交 00c0ce25 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #14008 from taosdata/feature/stream

feat(stream): support snode
......@@ -88,9 +88,9 @@ int32_t create_stream() {
/*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn,
"create stream stream1 trigger at_once into abc2.outstb as select _wstartts, sum(k) from st1 "
"partition by tbname interval(10m) ");
pRes = taos_query(
pConn,
"create stream stream1 trigger at_once into abc1.outstb as select _wstartts, sum(k) from st1 interval(10m) ");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -107,11 +107,4 @@ int main(int argc, char* argv[]) {
code = init_env();
}
create_stream();
#if 0
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop(tmq, topic_list);
#endif
}
......@@ -34,7 +34,6 @@ typedef enum {
WRITE_QUEUE,
APPLY_QUEUE,
SYNC_QUEUE,
MERGE_QUEUE,
QUEUE_MAX,
} EQueueType;
......
......@@ -16,8 +16,8 @@
#ifndef _TD_SNODE_H_
#define _TD_SNODE_H_
#include "tmsgcb.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "trpc.h"
#ifdef __cplusplus
......@@ -68,8 +68,8 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad);
* @param pMsg The request message
* @param pRsp The response message
*/
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg);
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -36,7 +36,6 @@ typedef struct SPlanContext {
int64_t watermark;
char* pMsg;
int32_t msgLen;
// double filesFactor;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -152,7 +152,7 @@ void* streamDataBlockDecode(const void* buf, SStreamDataBlock* pInput);
typedef struct {
char* qmsg;
// followings are not applicable to encoder and decoder
void* inputHandle;
// void* inputHandle;
void* executor;
} STaskExec;
......@@ -240,12 +240,13 @@ struct SStreamTask {
int8_t inputType;
int8_t status;
int8_t sourceType;
int8_t execType;
int8_t sinkType;
int8_t dispatchType;
int16_t dispatchMsgType;
int8_t dataScan;
// node info
int32_t childId;
int32_t nodeId;
......
......@@ -95,9 +95,12 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:
......
......@@ -55,7 +55,9 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, get from snode-unique queue", pMsg);
sndProcessUMsg(pMgmt->pSnode, pMsg);
if (sndProcessUMsg(pMgmt->pSnode, pMsg) < 0) {
ASSERT(0);
}
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->pCont);
......@@ -67,7 +69,9 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from snode-shared queue", pMsg);
sndProcessSMsg(pMgmt->pSnode, pMsg);
if (sndProcessSMsg(pMgmt->pSnode, pMsg) < 0) {
ASSERT(0);
}
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->pCont);
......
......@@ -170,10 +170,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pSyncQ, pMsg);
break;
case MERGE_QUEUE:
dTrace("vgId:%d, msg:%p put into vnode-merge queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pMergeQ, pMsg);
break;
case APPLY_QUEUE:
dTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pApplyQ, pMsg);
......@@ -196,8 +192,6 @@ int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); }
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dTrace("msg:%p, put into vnode-mgmt queue", pMsg);
taosWriteQitem(pMgmt->mgmtWorker.queue, pMsg);
......@@ -243,9 +237,6 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
case FETCH_QUEUE:
size = taosQueueItemSize(pVnode->pFetchQ);
break;
case MERGE_QUEUE:
size = taosQueueItemSize(pVnode->pMergeQ);
break;
default:
break;
}
......
......@@ -63,9 +63,8 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
.topicQuery = false,
.streamQuery = true,
.rSmaQuery = true,
.triggerType = STREAM_TRIGGER_AT_ONCE,
.triggerType = triggerType,
.watermark = watermark,
/*.filesFactor = filesFactor,*/
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
......@@ -270,7 +269,6 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->sourceType = TASK_SOURCE__MERGE;
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
// exec
......@@ -316,7 +314,6 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj*
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
// source
pTask->sourceType = TASK_SOURCE__MERGE;
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
// exec
......@@ -427,6 +424,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->dataScan = 1;
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
......@@ -470,6 +469,8 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask);
pTask->dataScan = 1;
// input
pTask->inputType = TASK_INPUT_TYPE__SUMBIT_BLOCK;
......
......@@ -56,7 +56,6 @@ SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
......
......@@ -76,45 +76,158 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
}
static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) {
/*SStreamExecMsgHead *pHead = pMsg->pCont;*/
/*int32_t taskId = pHead->streamTaskId;*/
/*SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);*/
/*if (pTask == NULL) {*/
/*return -1;*/
/*}*/
static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msg = pMsg->pCont;
int32_t msgLen = pMsg->contLen;
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
}
tDecoderClear(&decoder);
pTask->status = TASK_STATUS__IDLE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_INPUT_STATUS__NORMAL;
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
pTask->pMsgCb = &pNode->msgCb;
ASSERT(pTask->execType != TASK_EXEC__NONE);
SReadHandle handle = {
.pMsgCb = &pNode->msgCb,
};
/*pTask->exec.inputHandle = NULL;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
streamSetupTrigger(pTask);
qInfo("deploy stream: stream id %ld task id %d child id %d on snode", pTask->streamId, pTask->taskId, pTask->childId);
return 0;
FAIL:
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
return -1;
}
static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRunReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamTaskProcessRunReq(pTask, &pNode->msgCb);
return 0;
}
void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &pNode->msgCb, &req, &rsp);
return 0;
}
static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRecoverReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamProcessRecoverReq(pTask, &pNode->msgCb, pReq, pMsg);
return 0;
}
static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamProcessDispatchRsp(pTask, &pNode->msgCb, pRsp);
return 0;
}
static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamProcessRecoverRsp(pTask, pRsp);
return 0;
}
static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msg = pMsg->pCont;
int32_t msgLen = pMsg->contLen;
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
int32_t code = taosHashRemove(pMeta->pHash, &pReq->taskId, sizeof(int32_t));
ASSERT(code == 0);
if (code == 0) {
// sendrsp
}
return code;
}
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deploy
// stream stop/resume
// operator exec
if (pMsg->msgType == TDMT_STREAM_TASK_DEPLOY) {
void *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask *pTask = taosMemoryMalloc(sizeof(SStreamTask));
if (pTask == NULL) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY:
return sndProcessTaskDeployReq(pSnode, pMsg);
case TDMT_VND_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pMsg);
default:
ASSERT(0);
return;
}
SDecoder decoder;
tDecoderInit(&decoder, msg, pMsg->contLen - sizeof(SMsgHead));
tDecodeSStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
sndMetaDeployTask(pSnode->pMeta, pTask);
/*} else if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
/*sndProcessTaskExecReq(pSnode, pMsg);*/
} else {
ASSERT(0);
}
return 0;
}
void sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// operator exec
/*if (pMsg->msgType == TDMT_SND_TASK_EXEC) {*/
/*sndProcessTaskExecReq(pSnode, pMsg);*/
/*} else {*/
ASSERT(0);
/*}*/
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
return sndProcessTaskRunReq(pSnode, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return sndProcessTaskDispatchReq(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER:
return sndProcessTaskRecoverReq(pSnode, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return sndProcessTaskDispatchRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg);
default:
ASSERT(0);
}
return 0;
}
......@@ -125,10 +125,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
if (offset.type == TMQ_OFFSET__SNAPSHOT) {
tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey,
pTq->pVnode->config.vgId, offset.uid, offset.ts);
TD_VID(pTq->pVnode), offset.uid, offset.ts);
} else if (offset.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey,
pTq->pVnode->config.vgId, offset.version);
TD_VID(pTq->pVnode), offset.version);
} else {
ASSERT(0);
}
......@@ -159,7 +159,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pOffset != NULL) {
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey,
pTq->pVnode->config.vgId, pOffset->version);
TD_VID(pTq->pVnode), pOffset->version);
fetchOffset = pOffset->version + 1;
} else {
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
......@@ -167,13 +167,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetCommittedVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) {
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId,
pTq->pVnode->config.vgId, pReq->subKey);
tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return -1;
}
tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey,
pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset);
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
}
}
......@@ -183,14 +183,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
/*ASSERT(pHandle);*/
if (pHandle == NULL) {
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, pTq->pVnode->config.vgId,
tqError("tmq poll: no consumer handle for consumer %ld in vg %d, subkey %s", consumerId, TD_VID(pTq->pVnode),
pReq->subKey);
return -1;
}
if (pHandle->consumerId != consumerId) {
tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld",
consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId);
consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId);
return -1;
}
......@@ -304,7 +304,6 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0;
}
// TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req);
......@@ -346,10 +345,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
tsdbGetCtbIdList(pTq->pVnode->pMeta, req.suid, tbUidList);
tqDebug("vg %d, tq try get suid: %ld", pTq->pVnode->config.vgId, req.suid);
tqDebug("vg %d, tq try get suid: %ld", TD_VID(pTq->pVnode), req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vg %d, idx %d, uid: %ld", pTq->pVnode->config.vgId, i, tbUid);
tqDebug("vg %d, idx %d, uid: %ld", TD_VID(pTq->pVnode), i, tbUid);
}
for (int32_t i = 0; i < 5; i++) {
tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList);
......@@ -400,16 +399,21 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
// exec
if (pTask->execType != TASK_EXEC__NONE) {
// expand runners
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
};
pTask->exec.inputHandle = pStreamReader;
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
if (pTask->dataScan) {
STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.pMsgCb = &pTq->pVnode->msgCb,
.vnode = pTq->pVnode,
};
/*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
} else {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
ASSERT(pTask->exec.executor);
}
}
// sink
......@@ -431,7 +435,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
streamSetupTrigger(pTask);
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, pTq->pVnode->config.vgId);
tqInfo("deploy stream task id %d child id %d on vg %d", pTask->taskId, pTask->childId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
......@@ -464,7 +468,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
continue;
}
if (streamLaunchByWrite(pTask, pTq->pVnode->config.vgId, &pTq->pVnode->msgCb) < 0) {
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode), &pTq->pVnode->msgCb) < 0) {
continue;
}
} else {
......@@ -534,9 +538,9 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
ASSERT(code == 0);
if (code == 0) {
// sendrsp
}
ASSERT(code == 0);
return code;
}
......@@ -99,7 +99,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
}
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
if (msg == NULL || streamReadHandle == NULL) {
if (msg == NULL) {
return NULL;
}
......
......@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tref.h"
#include "dataSinkMgt.h"
#include "os.h"
#include "tmsg.h"
#include "tref.h"
#include "tudf.h"
#include "executor.h"
......@@ -24,15 +24,13 @@
#include "query.h"
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;
int32_t exchangeObjRefPool = -1;
static void initRefPool() {
exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
}
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL);
assert(pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
......@@ -47,57 +45,57 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (handle) {
void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
}
_error:
_error:
// if failed to add ref for all tables in this query, abort current query
return code;
}
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
int waitMoment(SQInfo* pQInfo) {
if (pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
if (pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
if (pos) {
pos += 3;
ms = atoi(pos);
while(*pos >= '0' && *pos <= '9'){
pos ++;
while (*pos >= '0' && *pos <= '9') {
pos++;
}
char unit_char = *pos;
if(unit_char == 'h'){
ms *= 3600*1000;
} else if(unit_char == 'm'){
ms *= 60*1000;
} else if(unit_char == 's'){
if (unit_char == 'h') {
ms *= 3600 * 1000;
} else if (unit_char == 'm') {
ms *= 60 * 1000;
} else if (unit_char == 's') {
ms *= 1000;
}
}
if(ms == 0) return 0;
if (ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if(ms < 1000) {
if (ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while(used_ms < ms) {
while (used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isTaskKilled(pQInfo)){
if (isTaskKilled(pQInfo)) {
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
......@@ -108,15 +106,14 @@ int waitMoment(SQInfo* pQInfo){
}
#endif
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
*pRes = NULL;
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
(void*)curOwner);
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return pTaskInfo->code;
}
......@@ -152,18 +149,18 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
cleanUpUdfs();
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, total, 0, el/1000.0);
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
int32_t qKillTask(qTaskInfo_t qinfo) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
......@@ -182,7 +179,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
}
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
......@@ -195,7 +192,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
}
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
......@@ -205,18 +202,18 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
}
void qDestroyTask(qTaskInfo_t qTaskHandle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
queryCostStatis(pTaskInfo); // print the query cost summary
queryCostStatis(pTaskInfo); // print the query cost summary
doDestroyTask(pTaskInfo);
}
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
int32_t capacity = 0;
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t capacity = 0;
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
}
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
......
......@@ -141,8 +141,8 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock,
SqlFunctionCtx* pCtx, int32_t numOfExprs);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
uint64_t groupId);
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
......@@ -1230,8 +1230,9 @@ void initResultRow(SResultRow* pResultRow) {
* offset[0] offset[1] offset[2]
*/
// TODO refactor: some function move away
void setFunctionResultOutput(SOperatorInfo *pOperator, SOptrBasicInfo *pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
int32_t numOfExprs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
......@@ -1380,9 +1381,10 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
}
}
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId) {
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput,
uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
......@@ -2040,8 +2042,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
}
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList) {
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList) {
if (pColList == NULL) { // data from other sources
blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
pRes->info.rows = numOfRows;
......@@ -2165,8 +2167,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
code = extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
code =
extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error;
......@@ -2281,7 +2284,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
int32_t code =
extractDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
......@@ -2673,8 +2676,8 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
}
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
......@@ -3130,7 +3133,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
......@@ -3403,8 +3406,8 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
size_t keyBufSize, const char* pkey) {
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey) {
initExprSupp(pSup, pExprInfo, numOfCols);
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
for (int32_t i = 0; i < numOfCols; ++i) {
......@@ -3457,12 +3460,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
initBasicInfo(&pInfo->binfo, pResultBlock);
initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
pInfo->groupId = INT32_MIN;
pOperator->name = "TableAggregate";
pInfo->groupId = INT32_MIN;
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
......@@ -3578,25 +3581,26 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
return pList;
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
int32_t numOfCols = 0;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc);
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
pInfo->limit = limit;
pInfo->slimit = slimit;
pInfo->curOffset = limit.offset;
pInfo->curSOffset = slimit.offset;
pInfo->binfo.pRes = pResBlock;
pInfo->limit = limit;
pInfo->slimit = slimit;
pInfo->curOffset = limit.offset;
pInfo->curSOffset = slimit.offset;
pInfo->binfo.pRes = pResBlock;
pInfo->pFilterNode = pProjPhyNode->node.pConditions;
int32_t numOfRows = 4096;
......@@ -3614,12 +3618,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
pOperator->name = "ProjectOperator";
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
destroyProjectOperatorInfo, NULL, NULL, NULL);
......@@ -3639,7 +3643,7 @@ _error:
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SIndefOperatorInfo* pIndefInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
......@@ -3677,7 +3681,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
if (pScalarSup->pExprInfo != NULL) {
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
......@@ -3686,8 +3690,8 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pOperator->exprSupp.numOfExprs,
pIndefInfo->pPseudoColInfo);
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx,
pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
......@@ -3745,14 +3749,14 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator";
pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfExpr;
pOperator->pTaskInfo = pTaskInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
......@@ -3827,11 +3831,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error;
}
int32_t num = 0;
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval;
int32_t type = convertFillType(pPhyFillNode->mode);
int32_t num = 0;
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pTargets, NULL, &num);
SInterval* pInterval = &((SIntervalAggOperatorInfo*)downstream->info)->interval;
int32_t type = convertFillType(pPhyFillNode->mode);
SResultInfo* pResultInfo = &pOperator->resultInfo;
initResultSizeInfo(pOperator, 4096);
......@@ -3842,16 +3846,16 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
goto _error;
}
pInfo->pRes = pResBlock;
pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult;
pOperator->name = "FillOperator";
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->name = "FillOperator";
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
......@@ -4050,10 +4054,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STimeWindowAggSupp twSup = {
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
tsdbReaderT pDataReader = NULL;
if (pHandle->vnode) {
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
} else {
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond);
if (pHandle) {
if (pHandle->vnode) {
pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
} else {
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond);
}
}
if (pDataReader == NULL && terrno != 0) {
......@@ -4310,7 +4318,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
SColumn c = {0};
c.slotId = pNode->slotId;
c.colId = pNode->slotId;
c.type = pValNode->node.type;
c.type = pValNode->node.type;
c.bytes = pValNode->node.resType.bytes;
c.scale = pValNode->node.resType.scale;
c.precision = pValNode->node.resType.precision;
......@@ -4413,7 +4421,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) {
ASSERT(length == *(int32_t*)result);
const char* data = result + sizeof(int32_t);
code = ops->fpSet.decodeResultRow(ops, (char*) data);
code = ops->fpSet.decodeResultRow(ops, (char*)data);
if (code != TDB_CODE_SUCCESS) {
return code;
}
......
......@@ -527,7 +527,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error;
}
//taosSsleep(20);
// taosSsleep(20);
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
......@@ -740,9 +740,8 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
SStreamAggSupporter* pAggSup = pInfo->sessionSup.pStreamAggSup;
int64_t gap = pInfo->sessionSup.gap;
int32_t winIndex = 0;
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN,
pSDB->info.groupId, gap, &winIndex);
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[pInfo->updateResIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win;
pInfo->updateResIndex +=
updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, pInfo->updateResIndex, gap, NULL);
......@@ -823,23 +822,23 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo) {
if (!pResult) {
return NULL;
}
if (pResult->info.groupId == pInfo->groupId) {
return pResult;
}
}
/* Todo(liuyao) for partition by column
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
if (id == pInfo->groupId) {
copyOneRow(pResult, pBlock, i);
/* Todo(liuyao) for partition by column
SSDataBlock* pBlock = createOneDataBlock(pResult, true);
blockDataCleanup(pResult);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i);
if (id == pInfo->groupId) {
copyOneRow(pResult, pBlock, i);
}
}
}
return pResult;
*/
return pResult;
*/
}
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
......@@ -854,7 +853,7 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
pInfo->groupId = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
int32_t i = 0;
for ( ; i < size; i++) {
for (; i < size; i++) {
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, rowId);
if (pInfo->groupId != id) {
......@@ -1084,9 +1083,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
......@@ -1103,16 +1099,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
}
}
// set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
}
taosArrayDestroy(tableIdList);
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
if (pInfo->pBlockLists == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -1124,10 +1110,31 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
goto _error;
}
if (pSTInfo->interval.interval > 0 && pDataReader) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
} else {
pInfo->pUpdateInfo = NULL;
if (pHandle) {
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
if (pSTInfo->interval.interval > 0) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
} else {
pInfo->pUpdateInfo = NULL;
}
pInfo->pOperatorDumy = pTableScanDummy;
pInfo->interval = pSTInfo->interval;
pInfo->readHandle = *pHandle;
ASSERT(pHandle->reader);
pInfo->streamBlockReader = pHandle->reader;
pInfo->tableUid = pScanPhyNode->uid;
// set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle*)pHandle->reader, pColIds);
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
if (code != 0) {
taosArrayDestroy(tableIdList);
goto _error;
}
taosArrayDestroy(tableIdList);
}
// create the pseduo columns info
......@@ -1135,19 +1142,14 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
}
pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid;
pInfo->streamBlockReader = pHandle->reader;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->pDataReader = pDataReader;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->pOperatorDumy = pTableScanDummy;
pInfo->interval = pSTInfo->interval;
pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1};
pInfo->groupId = 0;
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blocking = false;
......@@ -1426,15 +1428,15 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
if(pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
if (pInfo->pCur->mr.me.ctbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ctbEntry.comment);
colDataAppend(pColInfoData, numOfRows, comment, false);
}else if(pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
} else if (pInfo->pCur->mr.me.ctbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
colDataAppend(pColInfoData, numOfRows, comment, false);
}else{
} else {
colDataAppendNULL(pColInfoData, numOfRows);
}
......@@ -1462,15 +1464,15 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// table comment
pColInfoData = taosArrayGet(p->pDataBlock, 8);
if(pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
if (pInfo->pCur->mr.me.ntbEntry.commentLen > 0) {
char comment[TSDB_TB_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, pInfo->pCur->mr.me.ntbEntry.comment);
colDataAppend(pColInfoData, numOfRows, comment, false);
}else if(pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
} else if (pInfo->pCur->mr.me.ntbEntry.commentLen == 0) {
char comment[VARSTR_HEADER_SIZE + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(comment, "");
colDataAppend(pColInfoData, numOfRows, comment, false);
}else{
} else {
colDataAppendNULL(pColInfoData, numOfRows);
}
......@@ -1570,7 +1572,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
// todo log the filter info
doFilterResult(pInfo);
......@@ -1867,7 +1869,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
int32_t num = 0;
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID);
initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs);
......@@ -1878,8 +1881,9 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->curPos = 0;
pInfo->pFilterNode = pPhyNode->node.pConditions;
pOperator->name = "TagScanOperator";
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
......@@ -1952,8 +1956,7 @@ typedef struct STableMergeScanInfo {
int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId,
uint64_t taskId, SNode* pTagCond) {
int32_t code =
getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -1990,7 +1993,7 @@ _error:
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableMergeScanInfo* pInfo = pOperator->info;
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
......@@ -2177,9 +2180,8 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle =
tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL);
......@@ -2243,8 +2245,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code);
}
SSDataBlock* pBlock =
getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
......@@ -2277,20 +2278,20 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
typedef struct STableMergeScanExecInfo {
SFileBlockLoadRecorder blockRecorder;
SSortExecInfo sortExecInfo;
SSortExecInfo sortExecInfo;
} STableMergeScanExecInfo;
int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL);
// TODO: merge these two info into one struct
STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo));
STableMergeScanInfo* pInfo = pOptr->info;
STableMergeScanInfo* pInfo = pOptr->info;
execInfo->blockRecorder = pInfo->readRecorder;
execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
*pOptrExplain = execInfo;
*len = sizeof(STableMergeScanExecInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -2305,8 +2306,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
int32_t numOfCols = 0;
SArray* pColList =
extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2320,16 +2320,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReaders = dataReaders;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReaders = dataReaders;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0;
pInfo->pResBlock = createResDataBlock(pDescNode);
......@@ -2347,22 +2347,22 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->bufPageSize = getProperSortPageSize(rowSize);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
pInfo->hasGroupId = false;
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator";
pOperator->name = "TableMergeScanOperator";
// TODO : change it
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 1024);
pOperator->fpSet =
......
......@@ -36,11 +36,11 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dataScan) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
......@@ -84,11 +84,11 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dataScan) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册