提交 baa48489 编写于 作者: L Liu Jicong

merge from 3.0

上级 db23b07a
......@@ -209,6 +209,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_FETCH, "merge-fetch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
......
......@@ -353,6 +353,7 @@ typedef struct SDownstreamSourceNode {
uint64_t taskId;
uint64_t schedId;
int32_t execId;
int32_t fetchMsgType;
} SDownstreamSourceNode;
typedef struct SExchangePhysiNode {
......
......@@ -102,7 +102,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
return true;
......
......@@ -214,6 +214,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -111,6 +111,7 @@ SArray *qmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
......
......@@ -328,6 +328,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
......@@ -238,9 +238,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
}
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
......
......@@ -42,7 +42,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType};
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
rsp.pCont = rpcMallocCont(contLen);
......@@ -88,6 +88,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP:
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
return;
case TDMT_MND_STATUS_RSP:
......@@ -253,7 +254,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
static bool rpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
return true;
......
......@@ -531,7 +531,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH ||
pMsg->msgType == TDMT_SCH_DROP_TASK) {
return 0;
}
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
......
......@@ -45,6 +45,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
break;
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
break;
case TDMT_SCH_DROP_TASK:
......@@ -72,6 +73,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
......
......@@ -86,6 +86,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
break;
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
break;
case TDMT_SCH_FETCH_RSP:
......
......@@ -51,15 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode);
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
int64_t vnodeGetSyncHandle(SVnode *pVnode);
......@@ -68,14 +60,25 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
void *vnodeGetIdx(SVnode *pVnode);
void *vnodeGetIvtIdx(SVnode *pVnode);
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
// meta
typedef struct SMeta SMeta; // todo: remove
......
......@@ -94,6 +94,8 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode);
#ifdef __cplusplus
}
......
......@@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
SDecoder dc = {0};
......@@ -133,7 +133,7 @@ _err:
return code;
}
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL;
void *pReq;
int32_t len;
......@@ -261,6 +261,11 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_SCH_QUERY:
......@@ -276,11 +281,18 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing");
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG)
&& !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
......
......@@ -120,7 +120,24 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
return code;
}
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SEpSet newEpSet = {0};
syncGetRetryEpSet(pVnode->sync, &newEpSet);
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
newEpSet.numOfEps, newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
newEpSet.eps[i].port);
}
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
tmsgSendRedirectRsp(&rsp, &newEpSet);
}
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId;
int32_t code = 0;
......@@ -131,7 +148,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
code = vnodePreProcessReq(pVnode, pMsg);
code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) {
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
} else {
......@@ -141,7 +158,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
if (code > 0) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
}
......@@ -156,16 +173,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code < 0) {
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
SEpSet newEpSet = {0};
syncGetRetryEpSet(pVnode->sync, &newEpSet);
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
}
pMsg->info.hasEpSet = 1;
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet);
vnodeRedirectRpcMsg(pVnode, pMsg);
} else {
if (terrno != 0) code = terrno;
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
......@@ -185,7 +193,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vnodeWaitBlockMsg(pVnode);
}
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId;
int32_t code = 0;
......@@ -199,7 +207,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) {
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
}
......@@ -513,3 +521,17 @@ void vnodeSyncStart(SVnode *pVnode) {
}
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) {
return false;
}
// todo
// if (!pVnode->restored) {
// terrno = TSDB_CODE_APP_NOT_READY;
// return false;
// }
return true;
}
\ No newline at end of file
......@@ -319,8 +319,9 @@ typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DATAREADER,
STREAM_SCAN_FROM_DATAREADER, // todo(liuyao) delete it
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
typedef struct SCatchSupporter {
......@@ -612,6 +613,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SSDataBlock* pWinBlock; // window result
SqlFunctionCtx* pDummyCtx; // for combine
SSDataBlock* pDelRes; // delete result
bool returnDelete;
SSDataBlock* pUpdateRes; // update window
SHashObj* pStDeleted;
void* pDelIterator;
......@@ -889,6 +891,9 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
......
......@@ -2053,7 +2053,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
pMsgSendInfo->param = pWrapper;
pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
pMsgSendInfo->msgType = TDMT_SCH_FETCH;
pMsgSendInfo->msgType = pSource->fetchMsgType;
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
......
......@@ -792,13 +792,19 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
}
static bool isSessionWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
}
static bool isStateWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
}
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
}
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
if (groupId) {
......@@ -834,6 +840,49 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou
}
}
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo->cond.twindows[0] = *pWin;
pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
}
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
if ((*pRowIndex) == pBlock->info.rows) {
return false;
}
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex)++;
for (; *pRowIndex < pBlock->info.rows; (*pRowIndex)++) {
if (win.skey == startData[*pRowIndex]) {
win.ekey = TMAX(win.ekey, endData[*pRowIndex]);
continue;
}
if (win.skey == endData[*pRowIndex]) {
win.skey = TMIN(win.skey, startData[*pRowIndex]);
continue;
}
ASSERT((win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
break;
}
resetTableScanInfo(pInfo->pTableScanOp->info, &win);
return true;
}
static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
STimeWindow win = {
.skey = INT64_MIN,
......@@ -852,6 +901,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
SResultWindowInfo* pCurWin =
getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex);
win = pCurWin->win;
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
} else {
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
......@@ -875,15 +925,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
if (!needRead) {
return false;
}
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
// tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1;
resetTableScanInfo(pInfo->pTableScanOp->info, &win);
return true;
}
......@@ -900,6 +942,26 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
dest->info.rows++;
}
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pTableScanOp);
if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
// scan next window data
pResult = doTableScan(pInfo->pTableScanOp);
}
if (!pResult) {
blockDataCleanup(pSDB);
*pRowIndex = 0;
return NULL;
}
if (pResult->info.groupId == pInfo->groupId) {
return pResult;
}
}
}
static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
while (1) {
SSDataBlock* pResult = NULL;
......@@ -931,9 +993,8 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_
return pResult;
*/
}
static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator,
SSDataBlock* pUpdateRes) {
static void generateIntervalTs(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator,
SSDataBlock* pUpdateRes) {
if (pDelBlock->info.rows == 0) {
return;
}
......@@ -948,7 +1009,7 @@ static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock,
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = pInfo->deleteDataIndex;
i < pDelBlock->info.rows &&
i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1;
......@@ -969,6 +1030,43 @@ static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock,
}
}
static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator,
SSDataBlock* pUpdateRes) {
if (pBlock->info.rows == 0) {
return;
}
blockDataCleanup(pUpdateRes);
blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows);
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
int32_t dummy = 0;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
// gap must be 0.
SResultWindowInfo* pStartWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
if (!pStartWin) {
// window has been closed.
continue;
}
SResultWindowInfo* pEndWin =
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
ASSERT(pEndWin);
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
pUpdateRes->info.rows++;
}
}
static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
blockDataCleanup(pUpdateBlock);
int32_t size = taosArrayGetSize(pInfo->tsArray);
......@@ -1001,7 +1099,7 @@ static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlo
}
if (size == 0) {
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock);
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock);
}
}
......@@ -1060,11 +1158,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} break;
case STREAM_DELETE_DATA: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes);
pInfo->updateResIndex = 0;
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
if (isIntervalWindow(pInfo)) {
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes);
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
} else {
generateScanRange(pInfo, pBlock, pInfo->pTableScanOp, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
}
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
} break;
......@@ -1078,8 +1182,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
return pInfo->pRes;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
if (!isStateWindow(pInfo)) {
if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
}
return pInfo->pUpdateRes;
......@@ -1104,11 +1210,19 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
return pInfo->pUpdateRes;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE) {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
pSDB->info.type = STREAM_NORMAL;
checkUpdateData(pInfo, true, pSDB, false);
return pSDB;
}
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} else if (isStateWindow(pInfo)) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) {
ASSERT(pInfo->pUpdateRes->info.rows == 0);
blockDataCleanup(pInfo->pUpdateRes);
// return empty data blcok
return pInfo->pUpdateRes;
}
......
......@@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
return true;
......
......@@ -595,6 +595,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
COPY_SCALAR_FIELD(taskId);
COPY_SCALAR_FIELD(schedId);
COPY_SCALAR_FIELD(execId);
COPY_SCALAR_FIELD(fetchMsgType);
return TSDB_CODE_SUCCESS;
}
......
......@@ -3538,6 +3538,7 @@ static const char* jkDownstreamSourceAddr = "Addr";
static const char* jkDownstreamSourceTaskId = "TaskId";
static const char* jkDownstreamSourceSchedId = "SchedId";
static const char* jkDownstreamSourceExecId = "ExecId";
static const char* jkDownstreamSourceFetchMsgType = "FetchMsgType";
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
......@@ -3552,6 +3553,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceFetchMsgType, pNode->fetchMsgType);
}
return code;
}
......@@ -3569,6 +3573,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDownstreamSourceFetchMsgType, &pNode->fetchMsgType);
}
return code;
}
......
......@@ -123,6 +123,7 @@ typedef struct SQWTaskCtx {
int8_t taskType;
int8_t explain;
int32_t queryType;
int32_t fetchType;
int32_t execId;
bool queryFetched;
......
......@@ -35,8 +35,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes);
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t code);
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
......
......@@ -104,7 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
......@@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
}
SRpcMsg rpcRsp = {
.msgType = TDMT_SCH_FETCH_RSP,
.msgType = rspType,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
......@@ -436,7 +436,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
int64_t rId = 0;
int32_t eId = msg->execId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
......
......@@ -606,7 +606,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
rsp = NULL;
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
......@@ -628,7 +628,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
rsp = NULL;
qwMsg->connInfo = ctx->dataConnInfo;
qwBuildAndSendFetchRsp(&qwMsg->connInfo, NULL, 0, code);
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
0);
}
......@@ -661,6 +661,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->queryType = qwMsg->msgType;
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
......@@ -711,7 +713,7 @@ _return:
}
if (code || rsp) {
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
dataLen);
}
......
......@@ -214,7 +214,8 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
rpcFreeCont(rsp);
break;
}
case TDMT_SCH_FETCH_RSP: {
case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
if (0 == pRsp->code && 0 == rsp->completed) {
......@@ -815,6 +816,7 @@ void *fetchQueueThread(void *param) {
switch (fetchRpc->msgType) {
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_SCH_CANCEL_TASK:
......
......@@ -35,7 +35,7 @@ extern "C" {
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
#define SCH_TASK_MAX_EXEC_TIMES 5
#define SCH_TASK_MAX_EXEC_TIMES 8
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum {
......@@ -318,6 +318,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
......@@ -327,7 +328,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH)
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
......
......@@ -44,6 +44,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
// SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
return TSDB_CODE_SUCCESS;
case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP:
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
TMSG_INFO(msgType));
......@@ -304,7 +305,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
}
break;
}
case TDMT_SCH_FETCH_RSP: {
case TDMT_SCH_FETCH_RSP:
case TDMT_SCH_MERGE_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
SCH_ERR_JRET(rspCode);
......@@ -558,6 +560,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
case TDMT_VND_DELETE:
case TDMT_SCH_EXPLAIN:
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
*fp = schHandleCallback;
break;
case TDMT_SCH_DROP_TASK:
......@@ -1016,7 +1019,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
persistHandle = true;
break;
}
case TDMT_SCH_FETCH: {
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH: {
msgSize = sizeof(SResFetchReq);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
......
......@@ -258,7 +258,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.taskId = pTask->taskId,
.schedId = schMgmt.sId,
.execId = pTask->execId,
.addr = pTask->succeedAddr};
.addr = pTask->succeedAddr,
.fetchMsgType = SCH_FETCH_TYPE(pTask),
};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->lock);
......@@ -818,7 +820,7 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH));
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
return TSDB_CODE_SUCCESS;
......
......@@ -648,8 +648,6 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
taosReleaseRef(tsNodeRefId, rid);
......@@ -657,8 +655,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
......@@ -669,15 +667,14 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_
return -1;
}
int32_t ret = 0;
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
ASSERT(rid == pSyncNode->rid);
ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
......
......@@ -449,4 +449,53 @@ if $data26 != 14 then
return -1
endi
sql create database test1 vgroups 1
sql show databases
print $data00 $data01 $data02
sql use test1
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
sql create stream streams2 trigger at_once into streamt1 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
$loop_count = 0
loop5:
sleep 300
sql select * from streamt1 order by c desc;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $rows != 2 then
print =====rows=$rows
goto loop5
endi
if $data01 != 1 then
print =====data01=$data01
goto loop5
endi
if $data05 != 4 then
print =====data05=$data05
goto loop5
endi
if $data11 != 1 then
print =====data11=$data11
goto loop5
endi
if $data15 != 3 then
print =====data15=$data15
goto loop5
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册