提交 c3a1b200 编写于 作者: M Minghao Li

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/3.0_mhli

......@@ -79,7 +79,8 @@ static int32_t translateIn2NumOutDou(SFunctionNode* pFunc, char* pErrBuf, int32_
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type) || !IS_NUMERIC_TYPE(para2Type)) {
if ((!IS_NUMERIC_TYPE(para1Type) && !IS_NULL_TYPE(para1Type)) ||
(!IS_NUMERIC_TYPE(para2Type) && !IS_NULL_TYPE(para2Type))) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
......@@ -109,13 +110,13 @@ static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t l
}
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_NUMERIC_TYPE(para1Type)) {
if (!IS_NUMERIC_TYPE(para1Type) && !IS_NULL_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (2 == numOfParams) {
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_NUMERIC_TYPE(para2Type)) {
if (!IS_NUMERIC_TYPE(para2Type) && !IS_NULL_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
......
......@@ -36,9 +36,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
SColumnInfoData *pOutputData = pOutput->columnData;
int32_t type = GET_PARAM_TYPE(pInput);
if (!IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED;
}
switch (type) {
case TSDB_DATA_TYPE_FLOAT: {
......@@ -119,6 +116,13 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
break;
}
case TSDB_DATA_TYPE_NULL: {
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendNULL(pOutputData, i);
}
break;
}
default: {
colDataAssign(pOutputData, pInputData, pInput->numOfRows, NULL);
}
......@@ -130,9 +134,6 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu
static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
......@@ -142,7 +143,7 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS
double *out = (double *)pOutputData->pData;
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
if (colDataIsNull_s(pInputData, i)) {
if (colDataIsNull_s(pInputData, i) || IS_NULL_TYPE(type)) {
colDataAppendNULL(pOutputData, i);
continue;
}
......@@ -159,10 +160,6 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS
}
static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) {
if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData[2];
SColumnInfoData *pOutputData = pOutput->columnData;
_getDoubleValue_fn_t getValueFn[2];
......@@ -175,11 +172,15 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
double *out = (double *)pOutputData->pData;
double result;
bool hasNullType = (IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[0])) ||
IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[1])));
int32_t numOfRows = TMAX(pInput[0].numOfRows, pInput[1].numOfRows);
if (pInput[0].numOfRows == pInput[1].numOfRows) {
for (int32_t i = 0; i < numOfRows; ++i) {
if (colDataIsNull_s(pInputData[0], i) ||
colDataIsNull_s(pInputData[1], i)) {
colDataIsNull_s(pInputData[1], i) ||
hasNullType) {
colDataAppendNULL(pOutputData, i);
continue;
}
......@@ -191,7 +192,7 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
}
}
} else if (pInput[0].numOfRows == 1) { //left operand is constant
if (colDataIsNull_s(pInputData[0], 0)) {
if (colDataIsNull_s(pInputData[0], 0) || hasNullType) {
colDataAppendNNULL(pOutputData, 0, pInput[1].numOfRows);
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
......@@ -210,7 +211,7 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
}
}
} else if (pInput[1].numOfRows == 1) {
if (colDataIsNull_s(pInputData[1], 0)) {
if (colDataIsNull_s(pInputData[1], 0) || hasNullType) {
colDataAppendNNULL(pOutputData, 0, pInput[0].numOfRows);
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
......@@ -236,9 +237,6 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) {
int32_t type = GET_PARAM_TYPE(pInput);
if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) {
return TSDB_CODE_FAILED;
}
SColumnInfoData *pInputData = pInput->columnData;
SColumnInfoData *pOutputData = pOutput->columnData;
......@@ -272,6 +270,13 @@ static int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
break;
}
case TSDB_DATA_TYPE_NULL: {
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
colDataAppendNULL(pOutputData, i);
}
break;
}
default: {
colDataAssign(pOutputData, pInputData, pInput->numOfRows, NULL);
}
......
......@@ -179,6 +179,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
if (((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer < ver - 1) {
ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0);
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1;
}
taosCloseFile(&pIdxTFile);
taosCloseFile(&pLogTFile);
......@@ -396,8 +400,12 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
}
// set status
if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index;
pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalHead) + bodyLen;
if (walGetCurFileInfo(pWal)->firstVer == -1) {
walGetCurFileInfo(pWal)->firstVer = index;
}
walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
......
from ntpath import join
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.dnodes = 5
self.mnodes = 3
self.idIndex = 0
self.roleIndex = 2
self.mnodeStatusIndex = 3
self.mnodeEpIndex = 1
self.dnodeStatusIndex = 4
self.mnodeCheckCnt = 10
self.host = socket.gethostname()
self.startPort = 6030
self.portStep = 100
self.dnodeOfLeader = 0
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def checkDnodesStatusAndCreateMnode(self,dnodeNumbers):
count=0
while count < dnodeNumbers:
tdSql.query("show dnodes")
# tdLog.debug(tdSql.queryResult)
dCnt = 0
for i in range(dnodeNumbers):
if tdSql.queryResult[i][self.dnodeStatusIndex] != "ready":
break
else:
dCnt += 1
if dCnt == dnodeNumbers:
break
time.sleep(1)
tdLog.debug("............... waiting for all dnodes ready!")
tdLog.info("==============create two new mnodes ========")
tdSql.execute("create mnode on dnode 2")
tdSql.execute("create mnode on dnode 3")
self.check3mnode()
return
def check3mnode(self):
count=0
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes;")
if tdSql.checkRows(self.mnodes) :
tdLog.debug("mnode is three nodes")
else:
tdLog.exit("mnode number is correct")
roleOfMnode0 = tdSql.queryResult[0][self.roleIndex]
roleOfMnode1 = tdSql.queryResult[1][self.roleIndex]
roleOfMnode2 = tdSql.queryResult[2][self.roleIndex]
if roleOfMnode0=='leader' and roleOfMnode1=='follower' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
else:
count+=1
else:
tdLog.exit("three mnodes is not ready in 10s ")
tdSql.query("show mnodes;")
tdSql.checkRows(self.mnodes)
tdSql.checkData(0,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort))
tdSql.checkData(0,self.mnodeStatusIndex,'ready')
tdSql.checkData(1,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep))
tdSql.checkData(1,self.mnodeStatusIndex,'ready')
tdSql.checkData(2,self.mnodeEpIndex,'%s:%d'%(self.host,self.startPort+self.portStep*2))
tdSql.checkData(2,self.mnodeStatusIndex,'ready')
def check3mnode1off(self):
count=0
while count < self.mnodeCheckCnt:
time.sleep(1)
tdSql.query("show mnodes")
tdLog.debug(tdSql.queryResult)
# if tdSql.checkRows(self.mnodes) :
# tdLog.debug("mnode is three nodes")
# else:
# tdLog.exit("mnode number is correct")
roleOfMnode0 = tdSql.queryResult[0][self.roleIndex]
roleOfMnode1 = tdSql.queryResult[1][self.roleIndex]
roleOfMnode2 = tdSql.queryResult[2][self.roleIndex]
if roleOfMnode0=='offline' :
if roleOfMnode1=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
elif roleOfMnode1=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
elif roleOfMnode1=='offline' :
if roleOfMnode0=='leader' and roleOfMnode2 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode2 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[2][self.idIndex]
break
elif roleOfMnode2=='offline' :
if roleOfMnode0=='leader' and roleOfMnode1 == 'follower' :
self.dnodeOfLeader = tdSql.queryResult[0][self.idIndex]
break
elif roleOfMnode0=='follower' and roleOfMnode1 == 'leader' :
self.dnodeOfLeader = tdSql.queryResult[1][self.idIndex]
break
count+=1
else:
tdLog.exit("three mnodes is not ready in 10s ")
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}, {'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 100000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the notify info of start consume")
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("start switch mnode ................")
tdDnodes = cluster.dnodes
tdLog.info("1. stop dnode 0")
tdDnodes[0].stoptaosd()
time.sleep(10)
self.check3mnode1off()
tdLog.info("2. start dnode 0")
tdDnodes[0].starttaosd()
self.check3mnode()
tdLog.info("3. stop dnode 1")
tdDnodes[1].stoptaosd()
time.sleep(10)
self.check3mnode1off()
tdLog.info("switch end and wait insert data end ................")
pThread.join()
tdLog.info("check the consume result")
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdLog.printNoPrefix("======== Notes: must add '-N 5' for run the script ========")
self.checkDnodesStatusAndCreateMnode(self.dnodes)
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -204,6 +204,35 @@ class TMQCom:
tdLog.debug("insert data ............ [OK]")
return
def insert_data_2(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for i in range(ctbNum):
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (j % 2 == 0):
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j)
else:
sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, -j, j)
if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def insert_data_interlaceByMultiTbl(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs=0):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
......@@ -291,6 +320,17 @@ class TMQCom:
pThread.start()
return pThread
def threadFunctionForInsert(self, **paraDict):
# create new connector for new tdSql instance in my thread
newTdSql = tdCom.newTdSql()
self.insert_data_2(newTdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
return
def asyncInsertData(self, paraDict):
pThread = threading.Thread(target=self.threadFunctionForInsert, kwargs=paraDict)
pThread.start()
return pThread
def close(self):
self.cursor.close()
......
......@@ -113,7 +113,7 @@ class TDTestCase:
tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
......
......@@ -134,3 +134,4 @@ python3 ./test.py -f 7-tmq/schema.py
python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py -f 7-tmq/tmqCheckData.py
python3 ./test.py -f 7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
......@@ -453,7 +453,17 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
int32_t precision = taos_result_precision(msg);
const char* tbName = tmq_get_table_name(msg);
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
#if 0
// get schema
//============================== stub =================================================//
for (int32_t i = 0; i < numOfFields; i++) {
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
}
//============================== stub =================================================//
#endif
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
......@@ -656,12 +666,13 @@ void* consumeThreadFunc(void* param) {
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
exit(-1);
return NULL;
}
build_consumer(pInfo);
build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0);
return NULL;
}
......@@ -669,7 +680,9 @@ void* consumeThreadFunc(void* param) {
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
taosFprintfFile(g_fp, "tmq_subscribe()! reason: %s\n", tmq_err2str(err));
assert(0);
return NULL;
}
tmq_list_destroy(pInfo->topicList);
......@@ -688,14 +701,13 @@ void* consumeThreadFunc(void* param) {
err = tmq_unsubscribe(pInfo->tmq);
if (err != 0) {
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
/*pInfo->consumeMsgCnt = -1;*/
/*return NULL;*/
taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
}
err = tmq_consumer_close(pInfo->tmq);
if (err != 0) {
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
/*exit(-1);*/
taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
}
pInfo->tmq = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册