提交 69447969 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

......@@ -117,27 +117,29 @@ def pre_test(){
def pre_test_win(){
bat '''
date /t
time /t
taskkill /f /t /im python.exe
taskkill /f /t /im bash.exe
rd /s /Q C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\debug
exit 0
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git reset --hard
git fetch || git fetch
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git reset --hard
git fetch || git fetch
git checkout -f
script {
if (env.CHANGE_TARGET == 'master') {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout master
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout master
......@@ -145,6 +147,8 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 2.0
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 2.0
......@@ -152,6 +156,8 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout 3.0
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout 3.0
......@@ -159,6 +165,8 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout develop
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout develop
......@@ -169,30 +177,52 @@ def pre_test_win(){
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git log -5
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git fetch origin +refs/pull/%CHANGE_ID%/merge
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git checkout -qf FETCH_HEAD
git log -5
} else if (env.CHANGE_URL =~ /\/TDinternal\//) {
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git pull
git fetch origin +refs/pull/${CHANGE_ID}/merge
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git fetch origin +refs/pull/%CHANGE_ID%/merge
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git checkout -qf FETCH_HEAD
git log -5
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git pull
git log -5
} else {
sh '''
echo "unmatched reposiotry ${CHANGE_URL}"
bat '''
echo "unmatched reposiotry %CHANGE_URL%"
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
git branch
git log -5
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git branch
git log -5
bat '''
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal\\community
git submodule update --init --recursive
......@@ -205,10 +235,15 @@ def pre_test_build_win() {
cd C:\\workspace\\%EXECUTOR_NUMBER%\\TDinternal
mkdir debug
cd debug
time /t
call "C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\Community\\VC\\Auxiliary\\Build\\vcvarsall.bat" x64
set CL=/MP8
cmake .. -G "NMake Makefiles JOM"
jom -j 4 || exit 8
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cmake"
time /t
cmake .. -G "NMake Makefiles JOM" || exit 7
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jom -j 6"
time /t
jom -j 6 || exit 8
time /t
return 1
......@@ -226,6 +261,13 @@ pipeline {
stages {
stage('run test') {
parallel {
stage('windows test') {
agent{label " windows10_01 || windows10_02 || windows10_03 || windows10_04 "}
steps {
stage('linux test') {
agent{label " slave3_0 || slave15 || slave16 || slave17 "}
options { skipDefaultCheckout() }
......@@ -106,8 +106,8 @@ int32_t create_topic() {
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1;
......@@ -239,7 +239,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_commit(tmq, NULL, 1);
tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
......@@ -232,11 +232,11 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async);
DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset);
......@@ -1480,6 +1480,7 @@ typedef struct {
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[256];
SArray* topicNames; // SArray<char**>
} SCMSubscribeReq;
......@@ -1487,6 +1488,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->clientId);
int32_t topicNum = taosArrayGetSize(pReq->topicNames);
tlen += taosEncodeFixedI32(buf, topicNum);
......@@ -1500,6 +1502,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeStringTo(buf, pReq->clientId);
int32_t topicNum;
buf = taosDecodeFixedI32(buf, &topicNum);
......@@ -56,10 +56,10 @@ typedef enum { SSkipListPutSuccess = 0, SSkipListPutEarlyStop = 1, SSkipListPutS
typedef struct SSkipList {
uint32_t seed;
uint16_t len;
__compar_fn_t comparFn;
__sl_key_fn_t keyFn;
TdThreadRwlock *lock;
uint16_t len;
uint8_t maxLevel;
uint8_t flags;
uint8_t type; // static info above
......@@ -263,7 +263,7 @@ static const SSysDbTableSchema topicSchema[] = {
static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
......@@ -463,7 +463,7 @@ typedef struct {
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
......@@ -427,6 +427,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) {
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
pConsumerNew->rebNewTopics = newSub;
subscribe.topicNames = NULL;
......@@ -848,11 +849,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
varDataSetLen(appId, strlen(varDataVal(appId)));
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
varDataSetLen(clientId, strlen(varDataVal(clientId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
colDataAppend(pColInfo, numOfRows, (const char *)clientId, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
......@@ -492,8 +492,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode *pMnode = pReq->info.node;
/*SSdb *pSdb = pMnode->pSdb;*/
SMDropTopicReq dropReq = {0};
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......@@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
// if (pTopic == NULL) {
// if (dropReq.igNotExists) {
// mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
// return 0;
// } else {
// mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
// return -1;
// }
// }
if (pTopic == NULL) {
if (dropReq.igNotExists) {
mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
return 0;
} else {
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic);
......@@ -528,12 +528,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 1
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
return -1;
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
......@@ -104,6 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
......@@ -111,6 +111,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break;
pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) continue;
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true);
ASSERT(code == 0);
......@@ -15,7 +15,7 @@
#include "tsdb.h"
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
SSubmitMsgIter msgIter = {0};
......@@ -54,7 +54,38 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
return 0;
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
#if 0
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, STable *pTable, STSRow *row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = TD_ROW_KEY(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
return -1;
return 0;
static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY rowKey = TD_ROW_KEY(row);
if (rowKey < minKey || rowKey > maxKey) {
tsdbError("vgId:%d table uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
" maxKey %" PRId64 " row key %" PRId64,
REPO_ID(pTsdb), uid, now, minKey, maxKey, rowKey);
return -1;
return 0;
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter msgIter = {0};
......@@ -112,14 +143,14 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
return -1;
tsdbInitSubmitBlkIter(pBlock, &blkIter);
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, pTable, row, minKey, maxKey, now) < 0) {
tInitSubmitBlkIter(&msgIter, pBlock, &blkIter);
while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) {
if (tsdbCheckRowRange(pTsdb, msgIter.uid, row, minKey, maxKey, now) < 0) {
return -1;
if (terrno != TSDB_CODE_SUCCESS) return -1;
......@@ -631,6 +631,11 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
pRsp->code = terrno;
goto _exit;
// handle the request
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
......@@ -869,8 +869,12 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
SScalarParam dest = {.columnData = &idata};
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
return code;
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity);
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
......@@ -185,10 +185,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
pKey = SL_GET_NODE_KEY(pSkipList, p);
compare = pSkipList->comparFn(pKey, pDataKey);
if (compare >= 0) {
if (compare == 0 && !hasDup) hasDup = true;
if (compare > 0) {
} else {
if (compare == 0 && !hasDup) hasDup = true;
px = p;
......@@ -57,7 +57,7 @@ class TDSql:
tdLog.notice("'reset query cache' is not supported")
s = 'drop database if exists db'
s = 'create database db'
s = 'create database db days 300'
s = 'use db'
......@@ -5,7 +5,7 @@ sleep 500
sql connect
print =============== create database
sql create database d0
sql create database d0 days 300
sql use d0
print =============== create super table and child table
......@@ -254,4 +254,4 @@ endi
#sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d)
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -409,53 +409,53 @@ sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
# return -1
return -1
if $data12 != 4 then
print ======$data12
# return -1
return -1
if $data13 != 10 then
print ======$data13
# return -1
return -1
if $data14 != 3 then
print ======$data14
# return -1
return -1
if $data15 != 1 then
print ======$data15
# return -1
return -1
# row 2
if $data21 != 4 then
print ======$data21
# return -1
return -1
if $data22 != 4 then
print ======$data22
# return -1
return -1
if $data23 != 15 then
print ======$data23
# return -1
return -1
if $data24 != 4 then
print ======$data24
# return -1
return -1
if $data25 != 3 then
print ======$data25
# return -1
return -1
......@@ -368,7 +368,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit_sync(tmq, NULL);
......@@ -37,10 +37,10 @@ typedef struct {
TdThread thread;
int32_t consumerId;
int32_t ifManualCommit;
//int32_t autoCommitIntervalMs; // 1000 ms
//char autoCommit[8]; // true, false
//char autoOffsetRest[16]; // none, earliest, latest
int32_t ifManualCommit;
// int32_t autoCommitIntervalMs; // 1000 ms
// char autoCommit[8]; // true, false
// char autoOffsetRest[16]; // none, earliest, latest
int32_t ifCheckData;
int64_t expectMsgCnt;
......@@ -99,21 +99,15 @@ static void printHelp() {
void initLogFile() {
time_t now;
struct tm curTime;
char filename[256];
time_t now;
struct tm curTime;
char filename[256];
now = taosTime(NULL);
now = taosTime(NULL);
taosLocalTime(&now, &curTime);
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt",
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900,
curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec);
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", filename);
......@@ -137,9 +131,9 @@ void saveConfigToLogFile() {
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
//taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
//taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
//taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
// taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
// taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
// taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
taosFprintfFile(g_fp, " Topics: ");
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
......@@ -234,17 +228,17 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
while (1) {
TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break;
if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg);
TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
......@@ -276,7 +270,7 @@ void build_consumer(SThreadInfo* pInfo) {
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
//tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -299,7 +293,7 @@ void build_consumer(SThreadInfo* pInfo) {
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
......@@ -322,10 +316,10 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
time_t tTime = taosGetTimestampSec();
time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(g_fp, "# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr);
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
......@@ -357,11 +351,11 @@ void loop_consume(SThreadInfo* pInfo) {
if (totalRows >= pInfo->expectMsgCnt) {
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
} else {
taosFprintfFile(g_fp, "==== delay over time, so break\n");
......@@ -389,7 +383,7 @@ void* consumeThreadFunc(void* param) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->topicList = NULL;
......@@ -397,17 +391,18 @@ void* consumeThreadFunc(void* param) {
if (pInfo->ifManualCommit) {
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
pPrint("tmq_commit() manual commit when consume end.\n");
tmq_commit(pInfo->tmq, NULL, 0);
pPrint("tmq_commit() manual commit when consume end.\n");
/*tmq_commit(pInfo->tmq, NULL, 0);*/
tmq_commit_sync(pInfo->tmq, NULL);
err = tmq_unsubscribe(pInfo->tmq);
if (err) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1;
return NULL;
err = tmq_consumer_close(pInfo->tmq);
if (err) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
......@@ -485,9 +480,9 @@ int32_t getConsumeInfo() {
int32_t* lengths = taos_fetch_lengths(pRes);
// set default value
//g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
//memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
//memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
// g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
// memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
// memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册