提交 500553d5 编写于 作者: L Liu Jicong

refactor: add debug log

上级 01a345a7
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
#include <assert.h> #include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h> #include <time.h>
#include "taos.h" #include "taos.h"
#include <stdlib.h>
static int running = 1; static int running = 1;
static void msg_process(TAOS_RES* msg) { static void msg_process(TAOS_RES* msg) {
...@@ -28,8 +28,8 @@ static void msg_process(TAOS_RES* msg) { ...@@ -28,8 +28,8 @@ static void msg_process(TAOS_RES* msg) {
printf("db: %s\n", tmq_get_db_name(msg)); printf("db: %s\n", tmq_get_db_name(msg));
printf("vg: %d\n", tmq_get_vgroup_id(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg));
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
tmq_raw_data *raw = tmq_get_raw_meta(msg); tmq_raw_data* raw = tmq_get_raw_meta(msg);
if(raw){ if (raw) {
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
return; return;
...@@ -55,7 +55,7 @@ static void msg_process(TAOS_RES* msg) { ...@@ -55,7 +55,7 @@ static void msg_process(TAOS_RES* msg) {
} }
tmq_free_raw_meta(raw); tmq_free_raw_meta(raw);
char* result = tmq_get_json_meta(msg); char* result = tmq_get_json_meta(msg);
if(result){ if (result) {
printf("meta result: %s\n", result); printf("meta result: %s\n", result);
} }
tmq_free_json_meta(result); tmq_free_json_meta(result);
...@@ -96,7 +96,9 @@ int32_t init_env() { ...@@ -96,7 +96,9 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)"); pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -152,6 +154,7 @@ int32_t init_env() { ...@@ -152,6 +154,7 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
#if 0
pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
...@@ -264,7 +267,9 @@ int32_t init_env() { ...@@ -264,7 +267,9 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)"); pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -277,6 +282,7 @@ int32_t init_env() { ...@@ -277,6 +282,7 @@ int32_t init_env() {
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
#endif
return 0; return 0;
} }
...@@ -296,8 +302,15 @@ int32_t create_topic() { ...@@ -296,8 +302,15 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic2 as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
...@@ -353,6 +366,7 @@ tmq_t* build_consumer() { ...@@ -353,6 +366,7 @@ tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2"); tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "msg.with.table.name", "true");
......
...@@ -2923,7 +2923,6 @@ typedef struct { ...@@ -2923,7 +2923,6 @@ typedef struct {
SMqRspHead head; SMqRspHead head;
STqOffsetVal reqOffset; STqOffsetVal reqOffset;
STqOffsetVal rspOffset; STqOffsetVal rspOffset;
int32_t skipLogNum;
int32_t blockNum; int32_t blockNum;
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
......
...@@ -83,6 +83,7 @@ typedef struct { ...@@ -83,6 +83,7 @@ typedef struct {
int32_t srcVgId; int32_t srcVgId;
int32_t childId; int32_t childId;
int64_t sourceVer; int64_t sourceVer;
int64_t reqId;
SArray* blocks; // SArray<SSDataBlock*> SArray* blocks; // SArray<SSDataBlock*>
} SStreamDataBlock; } SStreamDataBlock;
...@@ -324,6 +325,8 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem ...@@ -324,6 +325,8 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
if (pSubmitClone == NULL) { if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1; return -1;
} }
...@@ -412,6 +415,7 @@ typedef struct { ...@@ -412,6 +415,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int64_t reqId;
int32_t srcTaskId; int32_t srcTaskId;
int32_t srcNodeId; int32_t srcNodeId;
int32_t dstTaskId; int32_t dstTaskId;
......
...@@ -1052,6 +1052,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { ...@@ -1052,6 +1052,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t code = -1; int32_t code = -1;
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256);
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) goto FAIL; if (req.topicNames == NULL) goto FAIL;
...@@ -1146,14 +1147,6 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para ...@@ -1146,14 +1147,6 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
conf->commitCbUserParam = param; conf->commitCbUserParam = param;
} }
#if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0;
SMqPollRsp* pRsp = &tmq_message->msg;
return pRsp->skipLogNum;
}
#endif
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqPollCbParam* pParam = (SMqPollCbParam*)param;
SMqClientVg* pVg = pParam->pVg; SMqClientVg* pVg = pParam->pVg;
...@@ -1296,9 +1289,6 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { ...@@ -1296,9 +1289,6 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
offsetNew = *pOffset; offsetNew = *pOffset;
} }
/*tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64 ", vgKey is %s",
* tmq->consumerId, epoch,*/
/*pVgEp->vgId, offset, vgKey);*/
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffsetNew = offsetNew, .currentOffsetNew = offsetNew,
......
...@@ -5648,7 +5648,6 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { ...@@ -5648,7 +5648,6 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->skipLogNum) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1;
...@@ -5674,7 +5673,6 @@ int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { ...@@ -5674,7 +5673,6 @@ int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1; if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->skipLogNum) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *)); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *));
......
...@@ -92,7 +92,9 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { ...@@ -92,7 +92,9 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerLostMsg *pLostMsg = pMsg->pCont; SMqConsumerLostMsg *pLostMsg = pMsg->pCont;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pLostMsg->consumerId);
ASSERT(pConsumer); if (pConsumer == NULL) {
return 0;
}
mInfo("receive consumer lost msg, consumer id %" PRId64 ", status %s", pLostMsg->consumerId, mInfo("receive consumer lost msg, consumer id %" PRId64 ", status %s", pLostMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
...@@ -450,6 +452,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -450,6 +452,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
SArray *newSub = subscribe.topicNames; SArray *newSub = subscribe.topicNames;
taosArraySortString(newSub, taosArrayCompareString); taosArraySortString(newSub, taosArrayCompareString);
taosArrayRemoveDuplicate(newSub, taosArrayCompareString, taosMemoryFree);
int32_t newTopicNum = taosArrayGetSize(newSub); int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance // check topic existance
...@@ -907,8 +910,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -907,8 +910,8 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// client id // client id
char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; char clientId[256 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN); tstrncpy(varDataVal(clientId), pConsumer->clientId, 256);
varDataSetLen(clientId, strlen(varDataVal(clientId))); varDataSetLen(clientId, strlen(varDataVal(clientId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)clientId, false); colDataAppend(pColInfo, numOfRows, (const char *)clientId, false);
......
...@@ -199,6 +199,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { ...@@ -199,6 +199,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
int32_t tlen = 0; int32_t tlen = 0;
int32_t sz; int32_t sz;
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
tlen += taosEncodeString(buf, pConsumer->clientId);
tlen += taosEncodeString(buf, pConsumer->cgroup); tlen += taosEncodeString(buf, pConsumer->cgroup);
tlen += taosEncodeFixedI8(buf, pConsumer->updateType); tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
tlen += taosEncodeFixedI32(buf, pConsumer->epoch); tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
...@@ -264,6 +265,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { ...@@ -264,6 +265,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) { void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
int32_t sz; int32_t sz;
buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
buf = taosDecodeStringTo(buf, pConsumer->clientId);
buf = taosDecodeStringTo(buf, pConsumer->cgroup); buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI8(buf, &pConsumer->updateType); buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
buf = taosDecodeFixedI32(buf, &pConsumer->epoch); buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
......
...@@ -52,7 +52,6 @@ typedef struct { ...@@ -52,7 +52,6 @@ typedef struct {
int64_t reqOffset; int64_t reqOffset;
int64_t processedVer; int64_t processedVer;
int32_t epoch; int32_t epoch;
int32_t skipLogNum;
// rpc info // rpc info
int64_t reqId; int64_t reqId;
SRpcHandleInfo rpcInfo; SRpcHandleInfo rpcInfo;
......
...@@ -635,6 +635,8 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { ...@@ -635,6 +635,8 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
pSubmit = streamDataSubmitNew(pReq); pSubmit = streamDataSubmitNew(pReq);
if (pSubmit == NULL) { if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("failed to create data submit for stream since out of memory");
failed = true; failed = true;
} }
...@@ -644,12 +646,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { ...@@ -644,12 +646,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (!pTask->isDataScan) continue; if (!pTask->isDataScan) continue;
qDebug("data submit enqueue stream task: %d", pTask->taskId);
if (!failed) { if (!failed) {
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
qError("stream task input failed, task id %d", pTask->taskId);
continue; continue;
} }
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) { if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId);
continue; continue;
} }
} else { } else {
......
...@@ -217,7 +217,6 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -217,7 +217,6 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
} }
if (pRsp->blockNum == 0) { if (pRsp->blockNum == 0) {
pRsp->skipLogNum++;
return -1; return -1;
} }
......
...@@ -242,6 +242,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -242,6 +242,8 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(msgLen);
if (data == NULL) { if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qError("failed to copy data for stream since out of memory");
return -1; return -1;
} }
memcpy(data, msg, msgLen); memcpy(data, msg, msgLen);
......
...@@ -143,6 +143,9 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, ...@@ -143,6 +143,9 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
// enqueue // enqueue
if (pData != NULL) { if (pData != NULL) {
qDebug("task %d(child %d) recv retrieve req from task %d, reqId %ld", pTask->taskId, pTask->selfChildId,
pReq->srcTaskId, pReq->reqId);
pData->type = STREAM_INPUT__DATA_RETRIEVE; pData->type = STREAM_INPUT__DATA_RETRIEVE;
pData->srcVgId = 0; pData->srcVgId = 0;
// decode // decode
......
...@@ -57,7 +57,9 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock ...@@ -57,7 +57,9 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock
pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.type = pRetrieve->streamBlockType;
pData->reqId = pReq->reqId;
pData->blocks = pArray; pData->blocks = pArray;
return 0; return 0;
} }
......
...@@ -65,6 +65,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -65,6 +65,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->dstTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->srcNodeId) < 0) return -1;
...@@ -77,6 +78,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p ...@@ -77,6 +78,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->dstNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->dstTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->srcNodeId) < 0) return -1;
...@@ -121,6 +123,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -121,6 +123,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t sz = taosArrayGetSize(pTask->childEpInfo); int32_t sz = taosArrayGetSize(pTask->childEpInfo);
ASSERT(sz > 0); ASSERT(sz > 0);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
req.reqId = tGenIdPI64();
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i);
req.dstNodeId = pEpInfo->nodeId; req.dstNodeId = pEpInfo->nodeId;
req.dstTaskId = pEpInfo->taskId; req.dstTaskId = pEpInfo->taskId;
...@@ -154,6 +157,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) ...@@ -154,6 +157,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %ld", pTask->taskId, pTask->selfChildId,
pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
} }
return 0; return 0;
FAIL: FAIL:
......
...@@ -54,6 +54,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -54,6 +54,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
block.info.type = STREAM_PULL_OVER; block.info.type = STREAM_PULL_OVER;
block.info.childId = pTask->selfChildId; block.info.childId = pTask->selfChildId;
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
qDebug("task %d(child %d) processed retrieve, reqId %ld", pTask->taskId, pTask->selfChildId,
pRetrieveBlock->reqId);
} }
break; break;
} }
......
...@@ -388,13 +388,11 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog ...@@ -388,13 +388,11 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
} }
/*taosSsleep(3);*/ /*taosSsleep(3);*/
int32_t batchCnt = 0; int32_t batchCnt = 0;
int32_t skipLogNum = 0;
int64_t startTime = taosGetTimestampUs(); int64_t startTime = taosGetTimestampUs();
while (running) { while (running) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000);
if (tmqmessage) { if (tmqmessage) {
batchCnt++; batchCnt++;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
/*msg_process(tmqmessage);*/ /*msg_process(tmqmessage);*/
} }
...@@ -412,7 +410,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog ...@@ -412,7 +410,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
} }
if (0 == g_stConfInfo.simCase) { if (0 == g_stConfInfo.simCase) {
printf("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime); printf("consume result: msgs: %d, time used:%.3f second\n", batchCnt, consumeTime);
} else { } else {
printf("{consume success: %d}", totalMsgs); printf("{consume success: %d}", totalMsgs);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册