提交 7e712a14 编写于 作者: H Haojun Liao

fix(tmq): adjust log.

上级 fad23f8c
...@@ -80,6 +80,14 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v ...@@ -80,6 +80,14 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
*/ */
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema); qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema);
/**
* set the task Id, usually used by message queue process
* @param tinfo
* @param taskId
* @param queryId
*/
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
......
...@@ -1855,14 +1855,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1855,14 +1855,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
" vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, tmq->totalRows, pVg->numOfRows, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
pollRspWrapper->reqId); pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} }
} else { } else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", SMqClientVg* pVg = pollRspWrapper->vgHandle;
tmq->consumerId, pDataRsp->head.epoch, consumerEpoch); tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
...@@ -1881,8 +1882,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1881,8 +1882,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
...@@ -1928,8 +1929,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1928,8 +1929,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return pRsp; return pRsp;
} else { } else {
tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
......
...@@ -112,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) { ...@@ -112,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 2000; i += 20) { for(int32_t i = 0; i < 100; i += 20) {
char sql[1024] = {0}; char sql[1024] = {0};
sprintf(sql, sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...@@ -167,6 +167,80 @@ void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) { ...@@ -167,6 +167,80 @@ void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
printf("success, code:%d\n", code); printf("success, code:%d\n", code);
} }
void* doConsumeData(void* param) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName12");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t2");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 25000;
int32_t count = 0;
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[1024];
const char* topicName = tmq_get_topic_name(pRes);
const char* dbName = tmq_get_db_name(pRes);
int32_t vgroupId = tmq_get_vgroup_id(pRes);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) {
break;
}
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes);
taos_print_row(buf, row, fields, numOfFields);
totalRows += 1;
// printf("precision: %d, row content: %s\n", precision, buf);
}
taos_free_result(pRes);
} else {
break;
}
}
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return NULL;
}
} // namespace } // namespace
int main(int argc, char** argv) { int main(int argc, char** argv) {
...@@ -188,7 +262,6 @@ TEST(clientCase, driverInit_Test) { ...@@ -188,7 +262,6 @@ TEST(clientCase, driverInit_Test) {
TEST(clientCase, connect_Test) { TEST(clientCase, connect_Test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
...@@ -708,7 +781,7 @@ TEST(clientCase, projection_query_tables) { ...@@ -708,7 +781,7 @@ TEST(clientCase, projection_query_tables) {
// } // }
// taos_free_result(pRes); // taos_free_result(pRes);
TAOS_RES* pRes = taos_query(pConn, "use abc2"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
...@@ -730,7 +803,7 @@ TEST(clientCase, projection_query_tables) { ...@@ -730,7 +803,7 @@ TEST(clientCase, projection_query_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for (int32_t i = 0; i < 2; ++i) { for (int32_t i = 0; i < 10000; ++i) {
printf("create table :%d\n", i); printf("create table :%d\n", i);
createNewTable(pConn, i); createNewTable(pConn, i);
} }
...@@ -970,28 +1043,23 @@ TEST(clientCase, sub_db_test) { ...@@ -970,28 +1043,23 @@ TEST(clientCase, sub_db_test) {
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("precision: %d, row content: %s\n", precision, buf); printf("precision: %d, row content: %s\n", precision, buf);
} }
taos_free_result(pRes);
} }
// return rows;
} }
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
} }
TEST(clientCase, sub_tb_test) { TEST(clientCase, sub_tb_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create topic, code:%s", taos_errstr(pRes));
// taos_free_result(pRes);
// return;
// }
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "group.id", "cgrpName27");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "earliest");
...@@ -1004,10 +1072,11 @@ TEST(clientCase, sub_tb_test) { ...@@ -1004,10 +1072,11 @@ TEST(clientCase, sub_tb_test) {
// 创建订阅 topics 列表 // 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t1"); tmq_list_append(topicList, "topic_t2");
// 启动订阅 // 启动订阅
tmq_subscribe(tmq, topicList); tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList); tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL; TAOS_FIELD* fields = NULL;
...@@ -1015,7 +1084,7 @@ TEST(clientCase, sub_tb_test) { ...@@ -1015,7 +1084,7 @@ TEST(clientCase, sub_tb_test) {
int32_t precision = 0; int32_t precision = 0;
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t msgCnt = 0; int32_t msgCnt = 0;
int32_t timeout = 5000; int32_t timeout = 25000;
int32_t count = 0; int32_t count = 0;
...@@ -1023,7 +1092,6 @@ TEST(clientCase, sub_tb_test) { ...@@ -1023,7 +1092,6 @@ TEST(clientCase, sub_tb_test) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) { if (pRes) {
char buf[1024]; char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(pRes); const char* topicName = tmq_get_topic_name(pRes);
const char* dbName = tmq_get_db_name(pRes); const char* dbName = tmq_get_db_name(pRes);
...@@ -1033,27 +1101,45 @@ TEST(clientCase, sub_tb_test) { ...@@ -1033,27 +1101,45 @@ TEST(clientCase, sub_tb_test) {
printf("db: %s\n", dbName); printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId); printf("vgroup id: %d\n", vgroupId);
if (count ++ > 200) {
tmq_unsubscribe(tmq);
break;
}
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(pRes); TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) break; if (row == NULL) {
break;
}
fields = taos_fetch_fields(pRes); fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes); numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes); precision = taos_result_precision(pRes);
rows++;
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
totalRows += 1;
printf("precision: %d, row content: %s\n", precision, buf); printf("precision: %d, row content: %s\n", precision, buf);
} }
taos_free_result(pRes);
// if ((++count) > 1) {
// break;
// }
} else {
break;
} }
// return rows;
} }
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
} }
TEST(clientCase, sub_tb_mt_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TdThread qid[20] = {0};
for(int32_t i = 0; i < 1; ++i) {
taosThreadCreate(&qid[i], NULL, doConsumeData, NULL);
}
for(int32_t i = 0; i < 4; ++i) {
taosThreadJoin(qid[i], NULL);
}
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -639,6 +639,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ...@@ -639,6 +639,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
// lock // lock
taosWLockLatch(&pTq->pushLock); taosWLockLatch(&pTq->pushLock);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, &offset); code = tqScanData(pTq, pHandle, &dataRsp, &offset);
// till now, all data has been transferred to consumer, new data needs to push client once arrived. // till now, all data has been transferred to consumer, new data needs to push client once arrived.
......
...@@ -835,6 +835,8 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); ...@@ -835,6 +835,8 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
char* buildTaskId(uint64_t taskId, uint64_t queryId);
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
......
...@@ -159,6 +159,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu ...@@ -159,6 +159,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
} }
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->id.queryId = queryId;
taosMemoryFreeClear(pTaskInfo->id.str);
pTaskInfo->id.str = buildTaskId(taskId, queryId);
}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
...@@ -1099,7 +1107,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1099,7 +1107,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
qDebug("switch to next table %" PRId64 " ts %" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0; pInfo->pTableScanOp->resultInfo.totalRows = 0;
bool found = false; bool found = false;
...@@ -1138,7 +1146,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -1138,7 +1146,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pTableScanInfo->base.cond.twindows.skey = oldSkey; pTableScanInfo->base.cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, numOfTables); ts, pTableScanInfo->currentTable, numOfTables);
} else { } else {
qError("invalid pOffset->type:%d", pOffset->type); qError("invalid pOffset->type:%d", pOffset->type);
......
...@@ -1959,7 +1959,7 @@ void destroyAggOperatorInfo(void* param) { ...@@ -1959,7 +1959,7 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static char* buildTaskId(uint64_t taskId, uint64_t queryId) { char* buildTaskId(uint64_t taskId, uint64_t queryId) {
char* p = taosMemoryMalloc(64); char* p = taosMemoryMalloc(64);
int32_t offset = 6; int32_t offset = 6;
...@@ -1971,7 +1971,6 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) { ...@@ -1971,7 +1971,6 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
offset += tintToHex(queryId, &p[offset]); offset += tintToHex(queryId, &p[offset]);
p[offset] = 0; p[offset] = 0;
return p; return p;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册