提交 b327f797 编写于 作者: J jiajingbin

Merge branch 'main' of https://github.com/taosdata/TDengine into main

cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE ON) set(CMAKE_VERBOSE_MAKEFILE ON)
set(TD_BUILD_TAOSA_INTERNAL FALSE) set(TD_BUILD_TAOSA_INTERNAL FALSE)
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "3.1.0.1.alpha") SET(TD_VER_NUMBER "3.1.0.2.alpha")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
...@@ -52,7 +52,7 @@ CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3); ...@@ -52,7 +52,7 @@ CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
### Create a Stream ### Create a Stream
```sql ```sql
create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s); create stream current_stream trigger at_once into current_stream_output_stb as select _wstart as wstart, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);
``` ```
### Write Data ### Write Data
......
...@@ -81,6 +81,14 @@ For example: ...@@ -81,6 +81,14 @@ For example:
taos -h h1.taos.com -s "use db; show tables;" taos -h h1.taos.com -s "use db; show tables;"
``` ```
## Export query results to a file
- You can use ">>" to export the query results to a file, the syntax is like `select * from table >> file`. If there is only file name without path, the file will be generated under the current working directory of TDegnine CLI.
## Import data from CSV file
- You can use `insert into table_name file 'fileName'` to import the data from the specified file into the specified table. For example, `insert into d0 file '/root/d0.csv';` means importing the data in file "/root/d0.csv" into table "d0". If there is only file name without path, that means the file is located under current working directory of TDengine CLI.
## TDengine CLI tips ## TDengine CLI tips
- You can use the up and down keys to iterate the history of commands entered - You can use the up and down keys to iterate the history of commands entered
...@@ -89,3 +97,5 @@ taos -h h1.taos.com -s "use db; show tables;" ...@@ -89,3 +97,5 @@ taos -h h1.taos.com -s "use db; show tables;"
- Execute `RESET QUERY CACHE` to clear the local cache of the table schema - Execute `RESET QUERY CACHE` to clear the local cache of the table schema
- Execute SQL statements in batches. You can store a series of shell commands (ending with ;, one line for each SQL command) in a script file and execute the command `source <file-name>` in the TDengine CLI to execute all SQL commands in that file automatically - Execute SQL statements in batches. You can store a series of shell commands (ending with ;, one line for each SQL command) in a script file and execute the command `source <file-name>` in the TDengine CLI to execute all SQL commands in that file automatically
- Enter `q` to exit TDengine CLI - Enter `q` to exit TDengine CLI
...@@ -89,3 +89,11 @@ taos -h h1.taos.com -s "use db; show tables;" ...@@ -89,3 +89,11 @@ taos -h h1.taos.com -s "use db; show tables;"
- 执行 `RESET QUERY CACHE` 可清除本地表 Schema 的缓存 - 执行 `RESET QUERY CACHE` 可清除本地表 Schema 的缓存
- 批量执行 SQL 语句。可以将一系列的 TDengine CLI 命令(以英文 ; 结尾,每个 SQL 语句为一行)按行存放在文件里,在 TDengine CLI 里执行命令 `source <file-name>` 自动执行该文件里所有的 SQL 语句 - 批量执行 SQL 语句。可以将一系列的 TDengine CLI 命令(以英文 ; 结尾,每个 SQL 语句为一行)按行存放在文件里,在 TDengine CLI 里执行命令 `source <file-name>` 自动执行该文件里所有的 SQL 语句
- 输入 `q``quit``exit` 回车,可以退出 TDengine CLI - 输入 `q``quit``exit` 回车,可以退出 TDengine CLI
## TDengine CLI 导出查询结果到文件中
- 可以使用符号 “>>” 导出查询结果到某个文件中,语法为: sql 查询语句 >> ‘输出文件名’; 输出文件如果不写路径的话,将输出至当前目录下。如 select * from d0 >> ‘/root/d0.csv’; 将把查询结果输出到 /root/d0.csv 中。
## TDengine CLI 导入文件中的数据到表中
- 可以使用 insert into table_name file '输入文件名',把上一步中导出的数据文件再导入到指定表中。如 insert into d0 file '/root/d0.csv'; 表示把上面导出的数据全部再导致至 d0 表中。
...@@ -152,6 +152,8 @@ enum { ...@@ -152,6 +152,8 @@ enum {
STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES, STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__CHECKPOINT_TRIGGER,
STREAM_INPUT__TRANS_STATE,
STREAM_INPUT__REF_DATA_BLOCK, STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY, STREAM_INPUT__DESTROY,
}; };
...@@ -168,7 +170,9 @@ typedef enum EStreamType { ...@@ -168,7 +170,9 @@ typedef enum EStreamType {
STREAM_PULL_DATA, STREAM_PULL_DATA,
STREAM_PULL_OVER, STREAM_PULL_OVER,
STREAM_FILL_OVER, STREAM_FILL_OVER,
STREAM_CHECKPOINT,
STREAM_CREATE_CHILD_TABLE, STREAM_CREATE_CHILD_TABLE,
STREAM_TRANS_STATE,
} EStreamType; } EStreamType;
#pragma pack(push, 1) #pragma pack(push, 1)
......
...@@ -254,7 +254,6 @@ enum { ...@@ -254,7 +254,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
......
...@@ -122,6 +122,7 @@ typedef struct { ...@@ -122,6 +122,7 @@ typedef struct {
int8_t type; int8_t type;
int32_t srcVgId; int32_t srcVgId;
int32_t srcTaskId;
int32_t childId; int32_t childId;
int64_t sourceVer; int64_t sourceVer;
int64_t reqId; int64_t reqId;
...@@ -251,6 +252,7 @@ typedef struct SStreamChildEpInfo { ...@@ -251,6 +252,7 @@ typedef struct SStreamChildEpInfo {
int32_t nodeId; int32_t nodeId;
int32_t childId; int32_t childId;
int32_t taskId; int32_t taskId;
int8_t dataAllowed;
SEpSet epSet; SEpSet epSet;
} SStreamChildEpInfo; } SStreamChildEpInfo;
...@@ -272,6 +274,7 @@ typedef struct SStreamStatus { ...@@ -272,6 +274,7 @@ typedef struct SStreamStatus {
int8_t schedStatus; int8_t schedStatus;
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool transferState; bool transferState;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t timerActive; // timer is active int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus; } SStreamStatus;
...@@ -399,8 +402,9 @@ typedef struct { ...@@ -399,8 +402,9 @@ typedef struct {
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t type;
int32_t taskId; int32_t taskId;
int32_t dataSrcVgId; int32_t srcVgId;
int32_t upstreamTaskId; int32_t upstreamTaskId;
int32_t upstreamChildId; int32_t upstreamChildId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
...@@ -570,8 +574,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); ...@@ -570,8 +574,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
...@@ -579,6 +581,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); ...@@ -579,6 +581,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
...@@ -626,7 +630,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p ...@@ -626,7 +630,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); int32_t appendTranstateIntoInputQ(SStreamTask* pTask);
// agg level // agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
......
...@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p ...@@ -1863,10 +1863,10 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0; return 0;
} }
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){
if (!pVg->seekUpdated) { if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.beginOffset = *reqOffset; if(hasData) pVg->offsetInfo.beginOffset = *reqOffset;
pVg->offsetInfo.endOffset = *rspOffset; pVg->offsetInfo.endOffset = *rspOffset;
} else { } else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
...@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1929,7 +1929,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, pDataRsp->blockNum != 0);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
...@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1979,7 +1979,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
...@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -2007,7 +2007,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, pollRspWrapper->taosxRsp.blockNum != 0);
if (pollRspWrapper->taosxRsp.blockNum == 0) { if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
......
...@@ -742,7 +742,6 @@ SArray *vmGetMsgHandles() { ...@@ -742,7 +742,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
......
...@@ -244,7 +244,7 @@ static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, con ...@@ -244,7 +244,7 @@ static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, con
SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
} }
taosArrayDestroy(pConsumerEp->vgs); taosArrayDestroy(pConsumerEp->vgs);
......
...@@ -77,6 +77,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -77,6 +77,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->chkInfo.version = ver; pTask->chkInfo.version = ver;
pTask->pMeta = pSnode->pMeta; pTask->pMeta = pSnode->pMeta;
streamTaskOpenAllUpstreamInput(pTask);
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
......
...@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream ...@@ -175,7 +175,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever); int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -250,7 +250,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); ...@@ -250,7 +250,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version); int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
...@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { ...@@ -289,9 +289,8 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
} }
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, req.reqOffset);
dataRsp.blockNum = 0; dataRsp.blockNum = 0;
dataRsp.rspOffset = dataRsp.reqOffset;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); tqInfo("tqPushEmptyDataRsp to consumer:0x%"PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId);
...@@ -391,7 +390,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -391,7 +390,6 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
} }
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
...@@ -715,7 +713,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -715,7 +713,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, req.reqOffset);
if (req.useSnapshot == true) { if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
...@@ -928,6 +926,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -928,6 +926,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta; pTask->pMeta = pTq->pStreamMeta;
streamTaskOpenAllUpstreamInput(pTask);
// backup the initial status, and set it to be TASK_STATUS__INIT // backup the initial status, and set it to be TASK_STATUS__INIT
pTask->chkInfo.version = ver; pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver; pTask->chkInfo.currentVer = ver;
...@@ -1272,7 +1272,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1272,7 +1272,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (done) { if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs(); pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
appendTranstateIntoInputQ(pTask);
} else { } else {
STimeWindow* pWindow = &pTask->dataRange.window; STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
...@@ -1337,44 +1338,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1337,44 +1338,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SStreamTransferReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
return -1;
}
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// transfer the ownership of executor state
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
pTask->status.transferState = true;
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
...@@ -1706,6 +1669,8 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1706,6 +1669,8 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
STQ* pTq = pVnode->pTq; STQ* pTq = pVnode->pTq;
int32_t vgId = pVnode->config.vgId;
SMsgHead* msgStr = pMsg->pCont; SMsgHead* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
...@@ -1722,7 +1687,9 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -1722,7 +1687,9 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); tqDebug("vgId:%d receive dispatch msg to s-task:0x%"PRIx64"-0x%x", vgId, req.streamId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, false); streamProcessDispatchMsg(pTask, &req, &rsp, false);
...@@ -1739,7 +1706,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -1739,7 +1706,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
FAIL: FAIL:
if (pMsg->info.handle == NULL) { if (pMsg->info.handle == NULL) {
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId); tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", vgId, taskId);
return -1; return -1;
} }
......
...@@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){ ...@@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){ if(buildHandle(pTq, handle) < 0){
return -1; return -1;
} }
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
} }
...@@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ ...@@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){ if(buildHandle(pTq, handle) < 0){
return -1; return -1;
} }
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
} }
......
...@@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v ...@@ -39,7 +39,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock); taosRUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); tqTrace("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
// push data for stream processing: // push data for stream processing:
// 1. the vnode has already been restored. // 1. the vnode has already been restored.
......
...@@ -210,13 +210,22 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { ...@@ -210,13 +210,22 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
} }
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { const char* id = pTask->id.idStr;
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
pTask->status.transferState = true;
/*int32_t code = */streamSchedExec(pTask); if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
if (!pTask->status.appendTranstateBlock) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, add transfer-state block into inputQ",
id, ver, pTask->dataRange.range.maxVer);
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
appendTranstateIntoInputQ(pTask);
/*int32_t code = */streamSchedExec(pTask);
} else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
id, ver, pTask->dataRange.range.maxVer);
}
} }
} }
...@@ -262,7 +271,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -262,7 +271,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(status == TASK_STATUS__NORMAL); ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done // the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
......
...@@ -20,8 +20,9 @@ ...@@ -20,8 +20,9 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId); const SMqMetaRsp* pRsp, int32_t vgId);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pReq->reqOffset; pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
...@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { ...@@ -35,8 +36,9 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
return 0; return 0;
} }
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
pRsp->reqOffset = pReq->reqOffset; pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->withTbName = 1; pRsp->withTbName = 1;
pRsp->withSchema = 1; pRsp->withSchema = 1;
...@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { ...@@ -69,7 +71,6 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) { static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId; uint64_t consumerId = pRequest->consumerId;
STqOffsetVal reqOffset = pRequest->reqOffset;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
...@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -86,7 +87,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} else { } else {
// no poll occurs in this vnode for this topic, let's seek to the right offset value. // no poll occurs in this vnode for this topic, let's seek to the right offset value.
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
if (pRequest->useSnapshot) { if (pRequest->useSnapshot) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
consumerId, pHandle->subKey, vgId); consumerId, pHandle->subKey, vgId);
...@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -100,12 +101,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer + 1); tqInitDataRsp(&dataRsp, *pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version); pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
...@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -113,7 +114,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
*pBlockReturned = true; *pBlockReturned = true;
return code; return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed", " in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, vgId, pRequest->subKey); pHandle->subKey, consumerId, vgId, pRequest->subKey);
...@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -125,11 +126,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} }
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ //static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
if(offset->type == TMQ_OFFSET__LOG){ // if(offset->type == TMQ_OFFSET__LOG){
offset->version = ver + 1; // offset->version = ver;
} // }
} //}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) { SRpcMsg* pMsg, STqOffsetVal* pOffset) {
...@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -138,8 +139,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0; terrno = 0;
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest); tqInitDataRsp(&dataRsp, *pOffset);
dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset // dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
...@@ -152,8 +153,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -152,8 +153,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
// lock // lock
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
if (pOffset->version >= ver || if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data
dataRsp.rspOffset.version >= ver) { // check if there are data again to avoid lost data
code = tqRegisterPushHandle(pTq, pHandle, pMsg); code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
goto end; goto end;
...@@ -161,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -161,7 +161,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
} }
setRequestVersion(&dataRsp.reqOffset, pOffset->version); // setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : { end : {
...@@ -182,8 +182,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -182,8 +182,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SWalCkHead* pCkHead = NULL; SWalCkHead* pCkHead = NULL;
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest); tqInitTaosxRsp(&taosxRsp, *offset);
taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset // taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if (offset->type != TMQ_OFFSET__LOG) { if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
...@@ -236,7 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -236,7 +236,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version); // setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -249,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -249,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) { if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version); // setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -279,7 +279,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -279,7 +279,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version); // setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} else { } else {
...@@ -296,15 +296,13 @@ end: ...@@ -296,15 +296,13 @@ end:
} }
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
int32_t code = -1;
STqOffsetVal offset = {0};
STqOffsetVal reqOffset = pRequest->reqOffset; STqOffsetVal reqOffset = pRequest->reqOffset;
// 1. reset the offset if needed // 1. reset the offset if needed
if (IS_OFFSET_RESET_TYPE(reqOffset.type)) { if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
// handle the reset offset cases, according to the consumer's choice. // handle the reset offset cases, according to the consumer's choice.
bool blockReturned = false; bool blockReturned = false;
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned); int32_t code = extractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
if (code != 0) { if (code != 0) {
return code; return code;
} }
...@@ -313,20 +311,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ ...@@ -313,20 +311,17 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
if (blockReturned) { if (blockReturned) {
return 0; return 0;
} }
} else if(reqOffset.type != 0){ // use the consumer specified offset } else if(reqOffset.type == 0){ // use the consumer specified offset
// the offset value can not be monotonious increase??
offset = reqOffset;
} else {
uError("req offset type is 0"); uError("req offset type is 0");
return TSDB_CODE_TMQ_INVALID_MSG; return TSDB_CODE_TMQ_INVALID_MSG;
} }
// this is a normal subscribe requirement // this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} else { // todo handle the case where re-balance occurs. } else { // todo handle the case where re-balance occurs.
// for taosx // for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
} }
} }
......
...@@ -392,6 +392,9 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) { ...@@ -392,6 +392,9 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
code = tTombBlockPut(reader->tombBlock, record); code = tTombBlockPut(reader->tombBlock, record);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(reader->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) { if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
break; break;
} }
......
...@@ -628,6 +628,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -628,6 +628,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg); return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK: case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg); return tqProcessSeekReq(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
...@@ -660,8 +661,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) ...@@ -660,8 +661,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY: case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg); return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_TRANSFER_STATE:
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH: case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
......
...@@ -624,7 +624,7 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg* ...@@ -624,7 +624,7 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg*
} }
} }
if (nSma < pCfg->numOfColumns) { if (nSma < pCfg->numOfColumns && nSma > 0) {
bool smaOn = false; bool smaOn = false;
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " SMA("); *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " SMA(");
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
......
...@@ -848,6 +848,10 @@ static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) { ...@@ -848,6 +848,10 @@ static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
bool ignoreNull = getIgoreNullRes(pSup); bool ignoreNull = getIgoreNullRes(pSup);
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
if (checkWindowBoundReached(pSliceInfo)) {
return;
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
......
...@@ -1520,6 +1520,10 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { ...@@ -1520,6 +1520,10 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows); pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows);
cleanupExprSupp(&pInfo->scalarSupp); cleanupExprSupp(&pInfo->scalarSupp);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
......
...@@ -52,7 +52,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -52,7 +52,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
...@@ -63,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* ...@@ -63,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
......
...@@ -142,40 +142,6 @@ int32_t streamSchedExec(SStreamTask* pTask) { ...@@ -142,40 +142,6 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return 0; return 0;
} }
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int8_t status = 0;
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
if (pBlock == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr);
} else {
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED;
}
// rsp by input status
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
pDispatchRsp->inputStatus = status;
pDispatchRsp->streamId = htobe64(pReq->streamId);
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
}
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
int8_t status = TASK_INPUT_STATUS__NORMAL; int8_t status = TASK_INPUT_STATUS__NORMAL;
...@@ -235,90 +201,115 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ...@@ -235,90 +201,115 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return 0; return 0;
} }
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// todo add the input queue buffer limitation
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq);
if (exec) { static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
if (streamTryExec(pTask) < 0) { int8_t status = 0;
return -1;
} SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
if (pBlock == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr);
} else { } else {
streamSchedExec(pTask); if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
pTask->status.appendTranstateBlock = true;
}
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
} }
return 0; return status;
} }
// todo record the idle time for dispatch data static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (code != TSDB_CODE_SUCCESS) { if (*pBuf == NULL) {
// dispatch message failed: network error, or node not available. return TSDB_CODE_OUT_OF_MEMORY;
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast. todo handle the shuffle dispatch failure
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code));
return code;
} else {
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
return streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
}
} }
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
// there are other dispatch message not response yet pDispatchRsp->inputStatus = status;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { pDispatchRsp->streamId = htobe64(pReq->streamId);
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
if (leftRsp > 0) { pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
return 0; pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
}
return TSDB_CODE_SUCCESS;
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
} }
}
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
int32_t status = 0;
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
pTask->msgInfo.retryCount = 0; if (!pInfo->dataAllowed) {
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId);
status = TASK_INPUT_STATUS__BLOCKED;
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status); } else {
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
// the input queue of the (down stream) task that receive the output data is full, if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// so the TASK_INPUT_STATUS_BLOCKED is rsp streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
// todo blocking the output status qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
int32_t waitDuration = 300; // 300 ms
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
streamRetryDispatchStreamBlock(pTask, waitDuration);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el);
pTask->msgInfo.blockingTs = 0;
} }
// now ready for next data output status = streamTaskAppendInputBlocks(pTask, pReq);
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); }
// otherwise, continue dispatch the first block to down stream task in pipeline {
streamDispatchStreamBlock(pTask); // do send response with the input status
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) {
// todo handle failure
return code;
}
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp);
} }
tDeleteStreamDispatchReq(pReq);
streamSchedExec(pTask);
return 0; return 0;
} }
//int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
// qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
// pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
//
// // todo add the input queue buffer limitation
// streamTaskEnqueueBlocks(pTask, pReq, pRsp);
// tDeleteStreamDispatchReq(pReq);
//
// if (exec) {
// if (streamTryExec(pTask) < 0) {
// return -1;
// }
// } else {
// streamSchedExec(pTask);
// }
//
// return 0;
//}
int32_t streamProcessRunReq(SStreamTask* pTask) { int32_t streamProcessRunReq(SStreamTask* pTask) {
if (streamTryExec(pTask) < 0) { if (streamTryExec(pTask) < 0) {
return -1; return -1;
...@@ -385,12 +376,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { ...@@ -385,12 +376,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
destroyStreamDataBlock((SStreamDataBlock*) pItem); destroyStreamDataBlock((SStreamDataBlock*) pItem);
return code; return code;
} }
} else if (type == STREAM_INPUT__CHECKPOINT) { } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) {
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
} else if (type == STREAM_INPUT__GET_RES) { } else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later. // use the default memory limit, refactor later.
taosWriteQitem(pTask->inputQueue->queue, pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else {
ASSERT(0);
} }
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
...@@ -433,4 +427,16 @@ SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t ...@@ -433,4 +427,16 @@ SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
} }
return NULL; return NULL;
} }
\ No newline at end of file
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
if (num == 0) {
return;
}
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
pInfo->dataAllowed = true;
}
}
...@@ -204,7 +204,7 @@ void streamFreeQitem(SStreamQueueItem* data) { ...@@ -204,7 +204,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
if (type == STREAM_INPUT__GET_RES) { if (type == STREAM_INPUT__GET_RES) {
blockDataDestroy(((SStreamTrigger*)data)->pBlock); blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) { } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(data); taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) { } else if (type == STREAM_INPUT__DATA_SUBMIT) {
......
...@@ -25,6 +25,9 @@ typedef struct SBlockName { ...@@ -25,6 +25,9 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN]; char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName; } SBlockName;
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType; pMsg->msgType = msgType;
pMsg->pCont = pCont; pMsg->pCont = pCont;
...@@ -35,8 +38,9 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc ...@@ -35,8 +38,9 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
...@@ -88,8 +92,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -88,8 +92,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
...@@ -113,14 +118,15 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -113,14 +118,15 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
} }
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId) { int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
pReq->dataSrcVgId = vgId; pReq->srcVgId = vgId;
pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId; pReq->upstreamNodeId = pTask->info.nodeId;
pReq->blockNum = numOfBlocks; pReq->blockNum = numOfBlocks;
pReq->taskId = dstTaskId; pReq->taskId = dstTaskId;
pReq->type = type;
pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES); pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
...@@ -436,9 +442,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S ...@@ -436,9 +442,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
return 0; return 0;
} }
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0; int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks); int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0); ASSERT(numOfBlocks != 0);
...@@ -446,15 +451,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -446,15 +451,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
SStreamDispatchReq req = {0}; SStreamDispatchReq req = {0};
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId); code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
for (int32_t i = 0; i < numOfBlocks; i++) { for (int32_t i = 0; i < numOfBlocks; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroyP(req.data, taosMemoryFree);
taosArrayDestroy(req.dataLen); taosArrayDestroy(req.dataLen);
...@@ -487,7 +492,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -487,7 +492,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId); code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
} }
...@@ -497,8 +502,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -497,8 +502,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
// TODO: do not use broadcast // TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) { if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) {
for (int32_t j = 0; j < vgSz; j++) { for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH; goto FAIL_SHUFFLE_DISPATCH;
...@@ -518,14 +522,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -518,14 +522,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
} }
} }
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId, qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr,
numOfBlocks, vgSz); pTask->info.selfChildId, numOfBlocks, vgSz);
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
if (pReqs[i].blockNum > 0) { if (pReqs[i].blockNum > 0) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
pReqs[i].blockNum, pVgInfo->vgId); pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) { if (code < 0) {
...@@ -536,7 +540,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat ...@@ -536,7 +540,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
code = 0; code = 0;
FAIL_SHUFFLE_DISPATCH: FAIL_SHUFFLE_DISPATCH:
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < vgSz; i++) {
taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen); taosArrayDestroy(pReqs[i].dataLen);
...@@ -552,7 +556,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { ...@@ -552,7 +556,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
...@@ -593,12 +597,13 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ...@@ -593,12 +597,13 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} }
pTask->msgInfo.pData = pBlock; pTask->msgInfo.pData = pBlock;
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
pBlock->type == STREAM_INPUT__TRANS_STATE);
int32_t retryCount = 0; int32_t retryCount = 0;
while (1) { while (1) {
int32_t code = streamDispatchAllBlocks(pTask, pBlock); int32_t code = doDispatchAllBlocks(pTask, pBlock);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
break; break;
} }
...@@ -715,3 +720,84 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { ...@@ -715,3 +720,84 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
num); num);
return 0; return 0;
} }
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
const char* id = pTask->id.idStr;
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast.
// todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId,
pRsp->inputStatus, code);
// there are other dispatch message not response yet
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
qDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp);
if (leftRsp > 0) {
return 0;
}
}
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
SStreamDataBlock* p = pTask->msgInfo.pData;
if (p->type == STREAM_INPUT__TRANS_STATE) {
qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return code;
}
}
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
qDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // pipeline send data in output queue
// this message has been sent successfully, let's try next one.
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id,
pRsp->downstreamTaskId, el);
pTask->msgInfo.blockingTs = 0;
// put data into inputQ of current task is also allowed
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
// otherwise, continue dispatch the first block to down stream task in pipeline
streamDispatchStreamBlock(pTask);
}
return 0;
}
...@@ -287,7 +287,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -287,7 +287,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
} }
} }
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
...@@ -301,7 +301,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -301,7 +301,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pStreamTask->id.idStr); pStreamTask->id.idStr);
} }
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
...@@ -334,6 +334,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -334,6 +334,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
} }
// todo check the output queue for fill-history task, and wait for it complete
// 1. expand the query time window for stream task of WAL scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
...@@ -380,17 +383,18 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -380,17 +383,18 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!pTask->status.transferState) { ASSERT(pTask->status.appendTranstateBlock == 1);
return code;
}
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask); streamTaskFillHistoryFinished(pTask);
streamTaskEndScanWAL(pTask); code = streamDoTransferStateToStreamTask(pTask);
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
} else if (level == TASK_LEVEL__AGG) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code; return code;
...@@ -400,14 +404,41 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -400,14 +404,41 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return code; return code;
} }
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
const char* id) { int32_t retryTimes = 0;
int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5;
int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr;
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue;
}
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
}
qDebug("s-task:%s sink task handle result block one-by-one", id);
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
}
}
// non sink task
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status)) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -415,49 +446,104 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu ...@@ -415,49 +446,104 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10); taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue; continue;
} }
qDebug("===stream===break batchSize:%d", *numOfBlocks); qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// do not merge blocks for sink node // do not merge blocks for sink node and check point data block
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
*numOfBlocks = 1; qItem->type == STREAM_INPUT__TRANS_STATE) {
*pInput = qItem; if (*pInput == NULL) {
return TSDB_CODE_SUCCESS; qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id);
} *numOfBlocks = 1;
*pInput = qItem;
if (*pInput == NULL) { return TSDB_CODE_SUCCESS;
ASSERT((*numOfBlocks) == 0); } else {
*pInput = qItem; // previous existed blocks needs to be handle, before handle the checkpoint msg block
qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id,
*numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
} else { } else {
// todo we need to sort the data block, instead of just appending into the array list. if (*pInput == NULL) {
void* newRet = streamMergeQueueItem(*pInput, qItem); ASSERT((*numOfBlocks) == 0);
if (newRet == NULL) { *pInput = qItem;
if (terrno == 0) { } else {
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); // todo we need to sort the data block, instead of just appending into the array list.
} else { void* newRet = streamMergeQueueItem(*pInput, qItem);
qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, if (newRet == NULL) {
tstrerror(terrno)); qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
} }
streamQueueProcessFail(pTask->inputQueue);
*pInput = newRet;
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
}
}
}
*pInput = newRet; int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
streamFreeQitem((SStreamQueueItem*)pBlock);
qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
return 0;
} }
}
*numOfBlocks += 1; // dispatch the tran-state block to downstream task immediately
streamQueueProcessSuccess(pTask->inputQueue); int32_t type = pTask->outputInfo.type;
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { // transfer the ownership of executor state
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
return TSDB_CODE_SUCCESS; if (level == TASK_LEVEL__SOURCE) {
qDebug("s-task:%s add transfer-state block into outputQ", id);
} else {
qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
}
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
pBlock->srcVgId = pTask->pMeta->vgId;
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
if (code == 0) {
streamDispatchStreamBlock(pTask);
} else {
streamFreeQitem((SStreamQueueItem*)pBlock);
}
}
} else { // non-dispatch task, do task state transfer directly
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
streamFreeQitem((SStreamQueueItem*)pBlock);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
} }
} }
return code;
} }
/** /**
...@@ -478,12 +564,17 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -478,12 +564,17 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id); qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize);
if (pInput == NULL) { if (pInput == NULL) {
ASSERT(batchSize == 0); ASSERT(batchSize == 0);
break; break;
} }
if (pInput->type == STREAM_INPUT__TRANS_STATE) {
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
return 0;
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
...@@ -557,18 +648,7 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { ...@@ -557,18 +648,7 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 1. notify all downstream tasks to transfer executor state after handle all history blocks. // 1. notify all downstream tasks to transfer executor state after handle all history blocks.
int32_t code = streamDispatchTransferStateMsg(pTask); appendTranstateIntoInputQ(pTask);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
// 2. do transfer stream task operator states.
pTask->status.transferState = true;
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
return code;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -587,28 +667,36 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -587,28 +667,36 @@ int32_t streamTryExec(SStreamTask* pTask) {
} }
// todo the task should be commit here // todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) { // if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed // fill-history WAL scan has completed
if (pTask->status.transferState) { // if (pTask->status.transferState) {
code = streamTransferStateToStreamTask(pTask); // code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // if (code != TSDB_CODE_SUCCESS) {
return code; // atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
} // return code;
streamSchedExec(pTask); // }
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), // call this function (streamExecForAll) directly.
pTask->status.schedStatus); // code = streamExecForAll(pTask);
} // if (code < 0) {
} else { // do nothing
// }
// }
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
// } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus); pTask->status.schedStatus);
if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask); streamSchedExec(pTask);
} }
} // }
} else { } else {
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
......
...@@ -372,68 +372,40 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { ...@@ -372,68 +372,40 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
return 0; return 0;
} }
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) { int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
void* buf = NULL; SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
int32_t code = -1; if (pTranstate == NULL) {
SRpcMsg msg = {0}; return TSDB_CODE_OUT_OF_MEMORY;
int32_t tlen;
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
} }
buf = rpcMallocCont(sizeof(SMsgHead) + tlen); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (buf == NULL) { if (pBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; taosFreeQitem(pTranstate);
return -1; return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
return code;
} }
tEncoderClear(&encoder); pTranstate->type = STREAM_INPUT__TRANS_STATE;
msg.contLen = tlen + sizeof(SMsgHead); pBlock->info.type = STREAM_TRANS_STATE;
msg.pCont = buf; pBlock->info.rows = 1;
msg.msgType = TDMT_STREAM_TRANSFER_STATE; pBlock->info.childId = pTask->info.selfChildId;
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg); pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, taosArrayPush(pTranstate->blocks, pBlock);
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
return 0;
}
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { taosMemoryFree(pBlock);
SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTranstate) < 0) {
taosFreeQitem(pTranstate);
return TSDB_CODE_OUT_OF_MEMORY;
}
// serialize pTask->status.appendTranstateBlock = true;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo); qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
for (int32_t i = 0; i < numOfVgs; i++) { pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); streamSchedExec(pTask);
req.downstreamTaskId = pVgInfo->taskId;
doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}
return 0; return TSDB_CODE_SUCCESS;
} }
// agg // agg
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "tutil.h" #include "tutil.h"
#include "walInt.h" #include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver; return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
} }
......
...@@ -70,25 +70,16 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -70,25 +70,16 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t fetchVer = pReader->curVersion; int64_t fetchVer = pReader->curVersion;
int64_t lastVer = walGetLastVer(pReader->pWal); int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal);
// int64_t appliedVer = walGetAppliedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal);
// if(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
// wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
// }
// int64_t endVer = TMIN(appliedVer, committedVer);
int64_t endVer = committedVer;
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", end index:%" PRId64, ", applied index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, endVer); pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
if (fetchVer > appliedVer){
if (fetchVer > endVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
while (fetchVer <= appliedVer) {
while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) { if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1; return -1;
} }
......
...@@ -126,6 +126,7 @@ ...@@ -126,6 +126,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
......
...@@ -20,6 +20,7 @@ class TDTestCase: ...@@ -20,6 +20,7 @@ class TDTestCase:
tbname = "tb" tbname = "tb"
tbname1 = "tb1" tbname1 = "tb1"
tbname2 = "tb2" tbname2 = "tb2"
tbname3 = "tb3"
stbname = "stb" stbname = "stb"
ctbname1 = "ctb1" ctbname1 = "ctb1"
ctbname2 = "ctb2" ctbname2 = "ctb2"
...@@ -5607,6 +5608,44 @@ class TDTestCase: ...@@ -5607,6 +5608,44 @@ class TDTestCase:
tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_single} partition by tbname range('2020-02-01 00:00:06') fill(linear)") tdSql.query(f"select _irowts, _isfilled, interp(c0) from {dbname}.{stbname_single} partition by tbname range('2020-02-01 00:00:06') fill(linear)")
tdSql.checkRows(0) tdSql.checkRows(0)
#### TS-3799 ####
tdSql.execute(
f'''create table if not exists {dbname}.{tbname3} (ts timestamp, c0 double)'''
)
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:51.000000000', 4.233947800000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:52.000000000', 3.606781000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:52.500000000', 3.162353500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:53.000000000', 3.162292500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:53.500000000', 4.998230000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:54.400000000', 8.800414999999999)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:54.900000000', 8.853271500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:55.900000000', 7.507751500000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:56.400000000', 7.510681000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:56.900000000', 7.841614000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:57.900000000', 8.153809000000001)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:58.500000000', 6.866455000000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-06 23:59:59.000000000', 6.869140600000000)")
tdSql.execute(f"insert into {dbname}.{tbname3} values ('2023-08-07 00:00:00.000000000', 0.261475000000001)")
tdSql.query(f"select _irowts, interp(c0) from {dbname}.{tbname3} range('2023-08-06 23:59:00','2023-08-06 23:59:59') every(1m) fill(next)")
tdSql.checkRows(1);
tdSql.checkData(0, 0, '2023-08-06 23:59:00')
tdSql.checkData(0, 1, 4.233947800000000)
tdSql.query(f"select _irowts, interp(c0) from {dbname}.{tbname3} range('2023-08-06 23:59:00','2023-08-06 23:59:59') every(1m) fill(value, 1)")
tdSql.checkRows(1);
tdSql.checkData(0, 0, '2023-08-06 23:59:00')
tdSql.checkData(0, 1, 1)
tdSql.query(f"select _irowts, interp(c0) from {dbname}.{tbname3} range('2023-08-06 23:59:00','2023-08-06 23:59:59') every(1m) fill(null)")
tdSql.checkRows(1);
tdSql.checkData(0, 0, '2023-08-06 23:59:00')
tdSql.checkData(0, 1, None)
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
......
...@@ -36,7 +36,7 @@ class TDTestCase: ...@@ -36,7 +36,7 @@ class TDTestCase:
# tdDnodes[1].cfgDir # tdDnodes[1].cfgDir
cfgFile = f"%s/taos.cfg"%(cfgDir) cfgFile = f"%s/taos.cfg"%(cfgDir)
shellCmd = 'echo "tmqMaxTopicNum %d" >> %s'%(tmqMaxTopicNum, cfgFile) shellCmd = 'echo tmqMaxTopicNum %d >> %s'%(tmqMaxTopicNum, cfgFile)
tdLog.info(" shell cmd: %s"%(shellCmd)) tdLog.info(" shell cmd: %s"%(shellCmd))
os.system(shellCmd) os.system(shellCmd)
tdDnodes.stoptaosd(1) tdDnodes.stoptaosd(1)
......
...@@ -131,7 +131,7 @@ class TDTestCase: ...@@ -131,7 +131,7 @@ class TDTestCase:
if snapshot_value == "true": if snapshot_value == "true":
if offset_value != "earliest" and offset_value != "": if offset_value != "earliest" and offset_value != "":
if offset_value == "latest": if offset_value == "latest":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info)) offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True) tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
...@@ -154,7 +154,7 @@ class TDTestCase: ...@@ -154,7 +154,7 @@ class TDTestCase:
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
else: else:
if offset_value != "none": if offset_value != "none":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace(offset_value, "0")), subscription_info)) offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0").replace("latest", "0").replace(offset_value, "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) >= 0, True) tdSql.checkEqual(sum(offset_value_list) >= 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res) tdSql.checkEqual(sum(rows_value_list), expected_res)
......
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
buildPath = tdCom.getBuildPath()
cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
tdLog.info(cmdStr1)
os.system(cmdStr1)
time.sleep(15)
cmdStr2 = '%s/build/bin/tmq_offset_test &'%(buildPath)
tdLog.info(cmdStr2)
os.system(cmdStr2)
time.sleep(20)
os.system("kill -9 `pgrep taosBenchmark`")
result = os.system("kill -9 `pgrep tmq_offset_test`")
if result != 0:
tdLog.exit("tmq_offset_test error!")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
...@@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c) ...@@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c) add_executable(sml_test sml_test.c)
add_executable(get_db_name_test get_db_name_test.c) add_executable(get_db_name_test get_db_name_test.c)
add_executable(tmq_offset tmqOffset.c) add_executable(tmq_offset tmqOffset.c)
add_executable(tmq_offset_test tmq_offset_test.c)
target_link_libraries( target_link_libraries(
tmq_offset tmq_offset
PUBLIC taos PUBLIC taos
...@@ -42,6 +43,13 @@ target_link_libraries( ...@@ -42,6 +43,13 @@ target_link_libraries(
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
) )
target_link_libraries(
tmq_offset_test
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries( target_link_libraries(
write_raw_block_test write_raw_block_test
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
int buildData(TAOS* pConn){
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists tp");
if (taos_errno(pRes) != 0) {
printf("error in drop tp, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists db_ts3756");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_ts3756 vgroups 2 wal_retention_period 3600");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_ts3756");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
if (taos_errno(pRes) != 0) {
printf("failed to create table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into t1 values(now, 1)");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into t1 values(now + 1s, 2)");
if (taos_errno(pRes) != 0) {
printf("failed to insert, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic tp as select * from t1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic tp, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
void test_offset(TAOS* pConn){
if(buildData(pConn) != 0){
ASSERT(0);
}
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t timeout = 200;
tmq_topic_assignment* pAssign1 = NULL;
int32_t numOfAssign1 = 0;
tmq_topic_assignment* pAssign2 = NULL;
int32_t numOfAssign2 = 0;
tmq_topic_assignment* pAssign3 = NULL;
int32_t numOfAssign3 = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign1, &numOfAssign1);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign1);
tmq_consumer_close(tmq);
ASSERT(0);
}
code = tmq_get_topic_assignment(tmq, "tp", &pAssign2, &numOfAssign2);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign2);
tmq_consumer_close(tmq);
ASSERT(0);
}
code = tmq_get_topic_assignment(tmq, "tp", &pAssign3, &numOfAssign3);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign3);
tmq_consumer_close(tmq);
ASSERT(0);
return;
}
ASSERT(numOfAssign1 == 2);
ASSERT(numOfAssign1 == numOfAssign2);
ASSERT(numOfAssign1 == numOfAssign3);
for(int i = 0; i < numOfAssign1; i++){
int j = 0;
int k = 0;
for(; j < numOfAssign2; j++){
if(pAssign1[i].vgId == pAssign2[j].vgId){
break;
}
}
for(; k < numOfAssign3; k++){
if(pAssign1[i].vgId == pAssign3[k].vgId){
break;
}
}
ASSERT(pAssign1[i].currentOffset == pAssign2[j].currentOffset);
ASSERT(pAssign1[i].currentOffset == pAssign3[k].currentOffset);
ASSERT(pAssign1[i].begin == pAssign2[j].begin);
ASSERT(pAssign1[i].begin == pAssign3[k].begin);
ASSERT(pAssign1[i].end == pAssign2[j].end);
ASSERT(pAssign1[i].end == pAssign3[k].end);
}
tmq_free_assignment(pAssign1);
tmq_free_assignment(pAssign2);
tmq_free_assignment(pAssign3);
int cnt = 0;
int offset1 = -1;
int offset2 = -1;
while (cnt++ < 10) {
printf("start to poll:%d\n", cnt);
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
ASSERT(0);
}
for(int i = 0; i < numOfAssign; i++){
int64_t position = tmq_position(tmq, "tp", pAssign[i].vgId);
if(position == 0) continue;
printf("position = %d\n", (int)position);
tmq_commit_offset_sync(tmq, "tp", pAssign[i].vgId, position);
int64_t committed = tmq_committed(tmq, "tp", pAssign[i].vgId);
ASSERT(position == committed);
}
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
if(offset1 != -1){
ASSERT(offset1 == pAssign[0].currentOffset);
}
if(offset2 != -1){
ASSERT(offset2 == pAssign[1].currentOffset);
}
offset1 = pAssign[0].currentOffset;
offset2 = pAssign[1].currentOffset;
tmq_free_assignment(pAssign);
taos_free_result(pRes);
}
}
tmq_consumer_close(tmq);
}
// run taosBenchmark first
void test_ts3756(TAOS* pConn){
TAOS_RES*pRes = taos_query(pConn, "use test");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists t1");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic t1 as select * from meters");
if (taos_errno(pRes) != 0) {
ASSERT(0);
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t timeout = 200;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
while (1) {
// printf("start to poll\n");
pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
tmq_topic_assignment* pAssignTmp = NULL;
int32_t numOfAssignTmp = 0;
int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssignTmp, &numOfAssignTmp);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
ASSERT(0);
}
if(numOfAssign != 0){
int i = 0;
for(; i < numOfAssign; i++){
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
break;
}
}
if(i == numOfAssign){
ASSERT(0);
}
tmq_free_assignment(pAssign);
}
numOfAssign = numOfAssignTmp;
pAssign = pAssignTmp;
taos_free_result(pRes);
}
}
tmq_free_assignment(pAssign);
}
int main(int argc, char* argv[]) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
test_offset(pConn);
test_ts3756(pConn);
taos_close(pConn);
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册