提交 286b9e40 编写于 作者: P plum-lihui

test: add notify between main script and comsume processor

上级 57e6846d
...@@ -54,9 +54,11 @@ class TDTestCase: ...@@ -54,9 +54,11 @@ class TDTestCase:
tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
sql = "insert into %s.consumeinfo values "%cdbName sql = "insert into %s.consumeinfo values "%cdbName
...@@ -64,6 +66,27 @@ class TDTestCase: ...@@ -64,6 +66,27 @@ class TDTestCase:
tdLog.info("consume info sql: %s"%sql) tdLog.info("consume info sql: %s"%sql)
tdSql.query(sql) tdSql.query(sql)
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
break
else:
time.sleep(0.1)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
while 1:
tdSql.query("select * from %s.notifyinfo"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if tdSql.getRows() == 2 :
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
if tdSql.getData(1, 1) == 1:
break
time.sleep(0.1)
return
def selectConsumeResult(self,expectRows,cdbName='cdb'): def selectConsumeResult(self,expectRows,cdbName='cdb'):
resultList=[] resultList=[]
while 1: while 1:
...@@ -72,7 +95,7 @@ class TDTestCase: ...@@ -72,7 +95,7 @@ class TDTestCase:
if tdSql.getRows() == expectRows: if tdSql.getRows() == expectRows:
break break
else: else:
time.sleep(5) time.sleep(1)
for i in range(expectRows): for i in range(expectRows):
tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
...@@ -207,7 +230,9 @@ class TDTestCase: ...@@ -207,7 +230,9 @@ class TDTestCase:
showRow = 1 showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
time.sleep(2) tdLog.info("wait the notify info of start consume")
self.getStartConsumeNotifyFromTmqsim()
tdLog.info("pkill consume processor") tdLog.info("pkill consume processor")
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM tmq_sim.exe") os.system("TASKKILL /F /IM tmq_sim.exe")
...@@ -282,14 +307,17 @@ class TDTestCase: ...@@ -282,14 +307,17 @@ class TDTestCase:
showRow = 1 showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
time.sleep(6) # time.sleep(6)
tdLog.info("start to wait commit notify")
self.getStartCommitNotifyFromTmqsim()
tdLog.info("pkill consume processor") tdLog.info("pkill consume processor")
if (platform.system().lower() == 'windows'): if (platform.system().lower() == 'windows'):
os.system("TASKKILL /F /IM tmq_sim.exe") os.system("TASKKILL /F /IM tmq_sim.exe")
else: else:
os.system('pkill tmq_sim') os.system('pkill tmq_sim')
expectRows = 0 # expectRows = 0
resultList = self.selectConsumeResult(expectRows) # resultList = self.selectConsumeResult(expectRows)
# wait for data ready # wait for data ready
prepareEnvThread.join() prepareEnvThread.join()
......
...@@ -34,6 +34,12 @@ ...@@ -34,6 +34,12 @@
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32) #define MAX_VGROUP_CNT (32)
typedef enum {
NOTIFY_CMD_START_CONSUM,
NOTIFY_CMD_START_COMMIT,
NOTIFY_CMD_ID_BUTT
}NOTIFY_CMD_ID;
typedef struct { typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
...@@ -67,6 +73,8 @@ typedef struct { ...@@ -67,6 +73,8 @@ typedef struct {
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
int64_t ts; int64_t ts;
TAOS* taos;
} SThreadInfo; } SThreadInfo;
typedef struct { typedef struct {
...@@ -339,8 +347,37 @@ int queryDB(TAOS* taos, char* command) { ...@@ -339,8 +347,37 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {
}
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
char sqlStr[1024] = {0};
int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)",
g_stConfInfo.cdbName,
now,
cmdId,
pInfo->consumerId);
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);
return 0;
}
static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
pError("tmq_commit_cb_print() commit %d\n", code); pError("tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) {
g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
}
taosFprintfFile(g_fp, "tmq_commit_cb_print() be called\n");
} }
void build_consumer(SThreadInfo* pInfo) { void build_consumer(SThreadInfo* pInfo) {
...@@ -353,7 +390,7 @@ void build_consumer(SThreadInfo* pInfo) { ...@@ -353,7 +390,7 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
// tmq_conf_set(conf, "group.id", "cgrp1"); // tmq_conf_set(conf, "group.id", "cgrp1");
for (int32_t i = 0; i < pInfo->numOfKey; i++) { for (int32_t i = 0; i < pInfo->numOfKey; i++) {
...@@ -392,9 +429,6 @@ void build_topic_list(SThreadInfo* pInfo) { ...@@ -392,9 +429,6 @@ void build_topic_list(SThreadInfo* pInfo) {
int32_t saveConsumeResult(SThreadInfo* pInfo) { int32_t saveConsumeResult(SThreadInfo* pInfo) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL);
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
...@@ -404,7 +438,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { ...@@ -404,7 +438,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr); taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
...@@ -413,38 +447,14 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { ...@@ -413,38 +447,14 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
taos_free_result(pRes); taos_free_result(pRes);
#if 0
// vgroups
for (i = 0; i < pInfo->numOfVgroups; i++) {
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.vgroup_%d values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
g_stConfInfo.cdbName,
now,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->consumeRowCnt,
pInfo->checkresult);
char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
}
#endif
return 0; return 0;
} }
void loop_consume(SThreadInfo* pInfo) { void loop_consume(SThreadInfo* pInfo) {
int32_t code; int32_t code;
int32_t once_flag = 0;
int64_t totalMsgs = 0; int64_t totalMsgs = 0;
int64_t totalRows = 0; int64_t totalRows = 0;
...@@ -465,6 +475,11 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -465,6 +475,11 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++; totalMsgs++;
if (0 == once_flag) {
once_flag = 1;
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
}
if (totalRows >= pInfo->expectMsgCnt) { if (totalRows >= pInfo->expectMsgCnt) {
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString)); taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
...@@ -489,6 +504,12 @@ void* consumeThreadFunc(void* param) { ...@@ -489,6 +504,12 @@ void* consumeThreadFunc(void* param) {
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo* pInfo = (SThreadInfo*)param;
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
exit(-1);
}
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
...@@ -508,7 +529,6 @@ void* consumeThreadFunc(void* param) { ...@@ -508,7 +529,6 @@ void* consumeThreadFunc(void* param) {
loop_consume(pInfo); loop_consume(pInfo);
if (pInfo->ifManualCommit) { if (pInfo->ifManualCommit) {
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
pPrint("tmq_commit() manual commit when consume end.\n"); pPrint("tmq_commit() manual commit when consume end.\n");
/*tmq_commit(pInfo->tmq, NULL, 0);*/ /*tmq_commit(pInfo->tmq, NULL, 0);*/
tmq_commit_sync(pInfo->tmq, NULL); tmq_commit_sync(pInfo->tmq, NULL);
...@@ -539,6 +559,9 @@ void* consumeThreadFunc(void* param) { ...@@ -539,6 +559,9 @@ void* consumeThreadFunc(void* param) {
taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]); taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
} }
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册