提交 c055d1e3 编写于 作者: P plum-lihui

test:modify tmq consumer process

上级 778ade6f
...@@ -131,10 +131,10 @@ class TDTestCase: ...@@ -131,10 +131,10 @@ class TDTestCase:
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3) # time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim() tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================") tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1) tdDnodes.stoptaosd(1)
tdDnodes.start(1) tdDnodes.starttaosd(1)
# time.sleep(3) # time.sleep(3)
tdLog.info(" restart taosd end and wait to check consume result") tdLog.info(" restart taosd end and wait to check consume result")
...@@ -250,10 +250,10 @@ class TDTestCase: ...@@ -250,10 +250,10 @@ class TDTestCase:
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tmqCom.getStartCommitNotifyFromTmqsim() tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================") tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1) tdDnodes.stoptaosd(1)
tdDnodes.start(1) tdDnodes.starttaosd(1)
# time.sleep(3) # time.sleep(3)
tdLog.info("create some new child table and insert data ") tdLog.info("create some new child table and insert data ")
......
...@@ -221,7 +221,7 @@ python3 ./test.py -f 7-tmq/tmqDropStb.py ...@@ -221,7 +221,7 @@ python3 ./test.py -f 7-tmq/tmqDropStb.py
python3 ./test.py -f 7-tmq/tmqDropStbCtb.py python3 ./test.py -f 7-tmq/tmqDropStbCtb.py
python3 ./test.py -f 7-tmq/tmqDropNtb.py python3 ./test.py -f 7-tmq/tmqDropNtb.py
python3 ./test.py -f 7-tmq/tmqUdf.py python3 ./test.py -f 7-tmq/tmqUdf.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py
......
...@@ -352,6 +352,29 @@ void ltrim(char* str) { ...@@ -352,6 +352,29 @@ void ltrim(char* str) {
// return str; // return str;
} }
int queryDB(TAOS* taos, char* command) {
int retryCnt = 10;
int code;
TAOS_RES* pRes;
while (retryCnt--) {
pRes = taos_query(taos, command);
code = taos_errno(pRes);
if (code != 0) {
taosSsleep(1);
taos_free_result(pRes);
continue;
}
taos_free_result(pRes);
return 0;
}
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
taos_free_result(pRes);
return -1;
}
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
int32_t i; int32_t i;
for (i = 0; i < pInfo->numOfVgroups; i++) { for (i = 0; i < pInfo->numOfVgroups; i++) {
...@@ -374,30 +397,49 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { ...@@ -374,30 +397,49 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
} }
} }
TAOS* createNewTaosConnect() {
TAOS* taos = NULL;
int32_t retryCnt = 10;
while (retryCnt--) {
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
if (NULL != taos) {
return taos;
}
taosSsleep(1);
}
taosFprintfFile(g_fp, "taos_connect() fail\n");
return NULL;
}
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char sqlStr[1100] = {0}; char sqlStr[1100] = {0};
if (strlen(buf) > 1024) { if (strlen(buf) > 1024) {
taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf)); taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
exit(-1); return -1;
} }
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); if (pConn == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not save consume result to main script\n");
return -1;
}
sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId,
pInfo->ts++, buf); pInfo->ts++, buf);
TAOS_RES* pRes = taos_query(pConn, sqlStr); int retCode = queryDB(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (retCode != 0) {
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes)); taosFprintfFile(g_fp, "error in save consume content\n");
taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_free_result(pRes); taos_close(pConn);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_close(pConn);
return 0; return 0;
} }
...@@ -591,15 +633,12 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn ...@@ -591,15 +633,12 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t code = tmq_get_raw_meta(msg, &raw); int32_t code = tmq_get_raw_meta(msg, &raw);
if(code == TSDB_CODE_SUCCESS){ if(code == TSDB_CODE_SUCCESS){
TAOS_RES* pRes = taos_query(pInfo->taos, "use metadb"); int retCode = queryDB(pInfo->taos, "use metadb");
if (taos_errno(pRes) != 0) { if (retCode != 0) {
pError("error when use metadb, reason:%s\n", taos_errstr(pRes)); taosFprintfFile(g_fp, "error when use metadb\n");
taosFprintfFile(g_fp, "error when use metadb, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes);
taosFprintfFile(g_fp, "raw:%p\n", &raw); taosFprintfFile(g_fp, "raw:%p\n", &raw);
taos_write_raw_meta(pInfo->taos, raw); taos_write_raw_meta(pInfo->taos, raw);
...@@ -618,19 +657,6 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn ...@@ -618,19 +657,6 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
return totalRows; return totalRows;
} }
int queryDB(TAOS* taos, char* command) {
TAOS_RES* pRes = taos_query(taos, command);
int code = taos_errno(pRes);
if (code != 0) {
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
taos_free_result(pRes);
return -1;
}
taos_free_result(pRes);
return 0;
}
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {} static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
...@@ -720,15 +746,12 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { ...@@ -720,15 +746,12 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr); taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr); int retCode = queryDB(pInfo->taos, sqlStr);
if (taos_errno(pRes) != 0) { if (retCode != 0) {
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId);
taos_free_result(pRes); return -1;
exit(-1);
} }
taos_free_result(pRes);
return 0; return 0;
} }
...@@ -823,18 +846,18 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -823,18 +846,18 @@ void loop_consume(SThreadInfo* pInfo) {
void* consumeThreadFunc(void* param) { void* consumeThreadFunc(void* param) {
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo* pInfo = (SThreadInfo*)param;
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); pInfo->taos = createNewTaosConnect();
if (pInfo->taos == NULL) { if (pInfo->taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
ASSERT(0);
return NULL; return NULL;
} }
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0); taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL; return NULL;
} }
...@@ -842,7 +865,8 @@ void* consumeThreadFunc(void* param) { ...@@ -842,7 +865,8 @@ void* consumeThreadFunc(void* param) {
if (err != 0) { if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
assert(0); taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL; return NULL;
} }
...@@ -926,17 +950,20 @@ void parseConsumeInfo() { ...@@ -926,17 +950,20 @@ void parseConsumeInfo() {
int32_t getConsumeInfo() { int32_t getConsumeInfo() {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = createNewTaosConnect();
assert(pConn != NULL); if (pConn == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not get consume info for start consumer\n");
return -1;
}
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName); sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES *pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
pError("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); taos_close(pConn);
return -1;
} }
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
...@@ -981,6 +1008,7 @@ int32_t getConsumeInfo() { ...@@ -981,6 +1008,7 @@ int32_t getConsumeInfo() {
taos_free_result(pRes); taos_free_result(pRes);
parseConsumeInfo(); parseConsumeInfo();
taos_close(pConn);
return 0; return 0;
} }
...@@ -1123,7 +1151,6 @@ void* ombConsumeThreadFunc(void* param) { ...@@ -1123,7 +1151,6 @@ void* ombConsumeThreadFunc(void* param) {
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
assert(0);
return NULL; return NULL;
} }
...@@ -1131,7 +1158,6 @@ void* ombConsumeThreadFunc(void* param) { ...@@ -1131,7 +1158,6 @@ void* ombConsumeThreadFunc(void* param) {
if (err != 0) { if (err != 0) {
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err)); taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
assert(0);
return NULL; return NULL;
} }
...@@ -1181,9 +1207,9 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) { ...@@ -1181,9 +1207,9 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) {
void* ombProduceThreadFunc(void* param) { void* ombProduceThreadFunc(void* param) {
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo* pInfo = (SThreadInfo*)param;
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); pInfo->taos = createNewTaosConnect();
if (pInfo->taos == NULL) { if (pInfo->taos == NULL) {
printf("taos_connect() fail\n"); taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n");
return NULL; return NULL;
} }
...@@ -1200,6 +1226,8 @@ void* ombProduceThreadFunc(void* param) { ...@@ -1200,6 +1226,8 @@ void* ombProduceThreadFunc(void* param) {
char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN); char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
if (NULL == sqlBuf) { if (NULL == sqlBuf) {
printf("malloc fail for sqlBuf\n"); printf("malloc fail for sqlBuf\n");
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL; return NULL;
} }
...@@ -1232,6 +1260,8 @@ void* ombProduceThreadFunc(void* param) { ...@@ -1232,6 +1260,8 @@ void* ombProduceThreadFunc(void* param) {
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE); int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
if (affectedRows < 0) { if (affectedRows < 0) {
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL; return NULL;
} }
...@@ -1266,6 +1296,8 @@ void* ombProduceThreadFunc(void* param) { ...@@ -1266,6 +1296,8 @@ void* ombProduceThreadFunc(void* param) {
} }
printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal); printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal);
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL; return NULL;
} }
...@@ -1301,10 +1333,9 @@ void startOmbConsume() { ...@@ -1301,10 +1333,9 @@ void startOmbConsume() {
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
if (0 != g_stConfInfo.producers) { if (0 != g_stConfInfo.producers) {
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* taos = createNewTaosConnect();
if (taos == NULL) { if (taos == NULL) {
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n");
ASSERT(0);
return ; return ;
} }
...@@ -1357,9 +1388,11 @@ void startOmbConsume() { ...@@ -1357,9 +1388,11 @@ void startOmbConsume() {
taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
taos_close(taos);
return; return;
} }
taos_close(taos);
} }
// pthread_create one thread to consume // pthread_create one thread to consume
...@@ -1418,7 +1451,11 @@ int main(int32_t argc, char* argv[]) { ...@@ -1418,7 +1451,11 @@ int main(int32_t argc, char* argv[]) {
return 0; return 0;
} }
getConsumeInfo(); int32_t retCode = getConsumeInfo();
if (0 != retCode) {
return -1;
}
saveConfigToLogFile(); saveConfigToLogFile();
tmqSetSignalHandle(); tmqSetSignalHandle();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册