提交 8b7ca942 编写于 作者: L Liu Jicong

remove tmqTest out of clientTest

上级 70644256
......@@ -135,30 +135,30 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t *rk,
void sync_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(rk, topics))) {
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
return;
}
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(rk, 500);
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
tmq_commit(rk, NULL, 0);
tmq_commit(tmq, NULL, 0);
}
}
err = tmq_consumer_close(rk);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
......
......@@ -105,8 +105,9 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
pthread_mutex_lock(&appInfo.mutex);
pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
SAppInstInfo* p = NULL;
if (pInst == NULL) {
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
/*p->pAppHbMgr = appHbMgrInit(p, key);*/
......
......@@ -5,14 +5,26 @@ MESSAGE(STATUS "build parser unit test")
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST})
ADD_EXECUTABLE(clientTest clientTests.cpp)
TARGET_LINK_LIBRARIES(
clientTest
PUBLIC os util common transport parser catalog scheduler function gtest taos qcom
)
ADD_EXECUTABLE(tmqTest tmqTest.cpp)
TARGET_LINK_LIBRARIES(
tmqTest
PUBLIC os util common transport parser catalog scheduler function gtest taos qcom
)
TARGET_INCLUDE_DIRECTORIES(
clientTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
)
TARGET_INCLUDE_DIRECTORIES(
tmqTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
)
......@@ -564,119 +564,6 @@ TEST(testCase, insert_test) {
}
#endif
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
#endif
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_TEST) {
}
#if 0
TEST(testCase, projection_query_tables) {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <taoserror.h>
#include <tglobal.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "../inc/clientInt.h"
#include "taos.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
taosInitGlobalCfg();
// taos_init();
}
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_Test) {
}
#endif
......@@ -617,7 +617,7 @@ typedef struct SMqTopicObj {
int64_t createTime;
int64_t updateTime;
uint64_t uid;
uint64_t dbUid;
int64_t dbUid;
int32_t version;
SRWLatch lock;
int32_t sqlLen;
......
......@@ -561,7 +561,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp;
*dataLen = 0;
......@@ -573,7 +573,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
return TSDB_CODE_SUCCESS;
}
}
// Got data from sink
......
......@@ -1409,7 +1409,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDa
}
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) == 0) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1454,7 +1454,6 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
}
SSubQueryMsg* pMsg = calloc(1, msgSize);
memcpy(pMsg->msg, msg, msgLen);
pMsg->header.vgId = tInfo.addr.nodeId;
......@@ -1464,6 +1463,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->phyLen = msgLen;
pMsg->sqlLen = 0;
memcpy(pMsg->msg, msg, msgLen);
/*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/
tInfo.msg = pMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册