未验证 提交 2536a00b 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13389 from taosdata/feature/tq

fix(query): double free
...@@ -13,7 +13,7 @@ IF (TD_LINUX) ...@@ -13,7 +13,7 @@ IF (TD_LINUX)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua) #TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
add_executable(tmq "") add_executable(tmq "")
add_executable(tstream "") add_executable(stream_demo "")
add_executable(demoapi "") add_executable(demoapi "")
target_sources(tmq target_sources(tmq
...@@ -21,9 +21,9 @@ IF (TD_LINUX) ...@@ -21,9 +21,9 @@ IF (TD_LINUX)
"tmq.c" "tmq.c"
) )
target_sources(tstream target_sources(stream_demo
PRIVATE PRIVATE
"tstream.c" "stream_demo.c"
) )
target_sources(demoapi target_sources(demoapi
...@@ -35,7 +35,7 @@ IF (TD_LINUX) ...@@ -35,7 +35,7 @@ IF (TD_LINUX)
taos_static taos_static
) )
target_link_libraries(tstream target_link_libraries(stream_demo
taos_static taos_static
) )
...@@ -48,7 +48,7 @@ IF (TD_LINUX) ...@@ -48,7 +48,7 @@ IF (TD_LINUX)
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
target_include_directories(tstream target_include_directories(stream_demo
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
...@@ -59,7 +59,7 @@ IF (TD_LINUX) ...@@ -59,7 +59,7 @@ IF (TD_LINUX)
) )
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
SET_TARGET_PROPERTIES(tstream PROPERTIES OUTPUT_NAME tstream) SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo)
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi) SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
ENDIF () ENDIF ()
IF (TD_DARWIN) IF (TD_DARWIN)
......
// sample code for TDengine subscribe/consume API
// to compile: gcc -o subscribe subscribe.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../../../include/client/taos.h" // include TDengine header file
int nTotalRows;
void print_result(TAOS_RES* res, int blockFetch) {
TAOS_ROW row = NULL;
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0;
if (blockFetch) {
nRows = taos_fetch_block(res, &row);
//for (int i = 0; i < nRows; i++) {
// taos_print_row(buf, row + i, fields, num_fields);
// puts(buf);
//}
} else {
while ((row = taos_fetch_row(res))) {
char buf[4096] = {0};
taos_print_row(buf, row, fields, num_fields);
puts(buf);
nRows++;
}
}
nTotalRows += nRows;
printf("%d rows consumed.\n", nRows);
}
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
print_result(res, *(int*)param);
}
void check_row_count(int line, TAOS_RES* res, int expected) {
int actual = 0;
TAOS_ROW row;
while ((row = taos_fetch_row(res))) {
actual++;
}
if (actual != expected) {
printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual);
} else {
printf("line %d: %d rows consumed as expected\n", line, actual);
}
}
void do_query(TAOS* taos, const char* sql) {
TAOS_RES* res = taos_query(taos, sql);
taos_free_result(res);
}
void run_test(TAOS* taos) {
do_query(taos, "drop database if exists test;");
usleep(100000);
do_query(taos, "create database test;");
usleep(100000);
do_query(taos, "use test;");
usleep(100000);
do_query(taos, "create table meters(ts timestamp, a int) tags(area int);");
do_query(taos, "create table t0 using meters tags(0);");
do_query(taos, "create table t1 using meters tags(1);");
do_query(taos, "create table t2 using meters tags(2);");
do_query(taos, "create table t3 using meters tags(3);");
do_query(taos, "create table t4 using meters tags(4);");
do_query(taos, "create table t5 using meters tags(5);");
do_query(taos, "create table t6 using meters tags(6);");
do_query(taos, "create table t7 using meters tags(7);");
do_query(taos, "create table t8 using meters tags(8);");
do_query(taos, "create table t9 using meters tags(9);");
do_query(taos, "insert into t0 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t0 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:02:00.000', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:00:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:00.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:01.000', 0);");
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t3 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t4 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t5 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t6 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t7 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t8 values('2020-01-01 00:01:02.000', 0);");
do_query(taos, "insert into t9 values('2020-01-01 00:01:02.000', 0);");
// super tables subscription
usleep(1000000);
TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
TAOS_RES* res = taos_consume(tsub);
check_row_count(__LINE__, res, 18);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
do_query(taos, "insert into t0 values('2020-01-01 00:02:00.001', 0);");
do_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
do_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0);");
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 2);
do_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
// keep progress information and restart subscription
taos_unsubscribe(tsub, 1);
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0);");
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// keep progress information and continue previous subscription
taos_unsubscribe(tsub, 1);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
// don't keep progress information and continue previous subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 24);
// single meter subscription
taos_unsubscribe(tsub, 0);
tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 5);
res = taos_consume(tsub);
check_row_count(__LINE__, res, 0);
do_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0);");
res = taos_consume(tsub);
check_row_count(__LINE__, res, 1);
taos_unsubscribe(tsub, 0);
}
int main(int argc, char *argv[]) {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
const char* sql = "select * from meters;";
const char* topic = "test-multiple";
int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0;
for (int i = 1; i < argc; i++) {
if (strncmp(argv[i], "-h=", 3) == 0) {
host = argv[i] + 3;
continue;
}
if (strncmp(argv[i], "-u=", 3) == 0) {
user = argv[i] + 3;
continue;
}
if (strncmp(argv[i], "-p=", 3) == 0) {
passwd = argv[i] + 3;
continue;
}
if (strcmp(argv[i], "-sync") == 0) {
async = 0;
continue;
}
if (strcmp(argv[i], "-restart") == 0) {
restart = 1;
continue;
}
if (strcmp(argv[i], "-single") == 0) {
sql = "select * from t0;";
topic = "test-single";
continue;
}
if (strcmp(argv[i], "-nokeep") == 0) {
keep = 0;
continue;
}
if (strncmp(argv[i], "-sql=", 5) == 0) {
sql = argv[i] + 5;
topic = "test-custom";
continue;
}
if (strcmp(argv[i], "-test") == 0) {
test = 1;
continue;
}
if (strcmp(argv[i], "-block-fetch") == 0) {
blockFetch = 1;
continue;
}
}
TAOS* taos = taos_connect(host, user, passwd, "", 0);
if (taos == NULL) {
printf("failed to connect to db, reason:%s\n", taos_errstr(taos));
exit(1);
}
if (test) {
run_test(taos);
taos_close(taos);
exit(0);
}
taos_select_db(taos, "test");
TAOS_SUB* tsub = NULL;
if (async) {
// create an asynchronized subscription, the callback function will be called every 1s
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
// create an synchronized subscription, need to call 'taos_consume' manually
tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}
if (tsub == NULL) {
printf("failed to create subscription.\n");
exit(0);
}
if (async) {
getchar();
} else while(1) {
TAOS_RES* res = taos_consume(tsub);
if (res == NULL) {
printf("failed to consume data.");
break;
} else {
print_result(res, blockFetch);
getchar();
}
}
printf("total rows consumed: %d\n", nTotalRows);
taos_unsubscribe(tsub, keep);
taos_close(taos);
return 0;
}
...@@ -176,8 +176,8 @@ tmq_t* build_consumer() { ...@@ -176,8 +176,8 @@ tmq_t* build_consumer() {
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_ctb_column"); /*tmq_list_append(topic_list, "topic_ctb_column");*/
/*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");
return topic_list; return topic_list;
} }
...@@ -195,7 +195,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { ...@@ -195,7 +195,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
if (tmqmessage) { if (tmqmessage) {
cnt++; cnt++;
msg_process(tmqmessage); msg_process(tmqmessage);
if (cnt >= 2) break; /*if (cnt >= 2) break;*/
/*printf("get data\n");*/ /*printf("get data\n");*/
taos_free_result(tmqmessage); taos_free_result(tmqmessage);
/*} else {*/ /*} else {*/
......
...@@ -219,7 +219,8 @@ typedef struct SRequestObj { ...@@ -219,7 +219,8 @@ typedef struct SRequestObj {
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo); void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo); void doFreeReqResultInfo(SReqResultInfo* pResInfo);
...@@ -241,7 +242,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver ...@@ -241,7 +242,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
taosMemoryFreeClear(msg->resInfo.length); taosMemoryFreeClear(msg->resInfo.length);
taosMemoryFreeClear(msg->resInfo.convertBuf); taosMemoryFreeClear(msg->resInfo.convertBuf);
} }
setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4); setQueryResultFromRsp(&msg->resInfo, pRetrieve, convertUcs4, false);
return &msg->resInfo; return &msg->resInfo;
} }
return NULL; return NULL;
...@@ -319,7 +320,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code ...@@ -319,7 +320,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList); int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res); int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** res);
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest); int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest);
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList); int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -117,7 +117,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, ...@@ -117,7 +117,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass,
SAppInstInfo* p = NULL; SAppInstInfo* p = NULL;
if (pInst == NULL) { if (pInst == NULL) {
p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo)); p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
taosThreadMutexInit(&p->qnodeMutex, NULL); taosThreadMutexInit(&p->qnodeMutex, NULL);
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p, key); p->pAppHbMgr = appHbMgrInit(p, key);
...@@ -203,7 +203,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -203,7 +203,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL; SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp); int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false); code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
} }
return code; return code;
} }
...@@ -230,23 +230,23 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -230,23 +230,23 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
} }
int compareQueryNodeLoad(const void* elem1, const void* elem2) { int compareQueryNodeLoad(const void* elem1, const void* elem2) {
SQueryNodeLoad *node1 = (SQueryNodeLoad *)elem1; SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
SQueryNodeLoad *node2 = (SQueryNodeLoad *)elem2; SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
if (node1->load < node2->load) { if (node1->load < node2->load) {
return -1; return -1;
} }
return node1->load > node2->load; return node1->load > node2->load;
} }
int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) { int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
taosThreadMutexLock(&pInfo->qnodeMutex); taosThreadMutexLock(&pInfo->qnodeMutex);
if (pInfo->pQnodeList) { if (pInfo->pQnodeList) {
taosArrayDestroy(pInfo->pQnodeList); taosArrayDestroy(pInfo->pQnodeList);
pInfo->pQnodeList = NULL; pInfo->pQnodeList = NULL;
} }
if (pNodeList) { if (pNodeList) {
pInfo->pQnodeList = taosArrayDup(pNodeList); pInfo->pQnodeList = taosArrayDup(pNodeList);
taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad); taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
...@@ -257,9 +257,9 @@ int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) { ...@@ -257,9 +257,9 @@ int32_t updateQnodeList(SAppInstInfo*pInfo, SArray* pNodeList) {
} }
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) { int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
SAppInstInfo*pInfo = pRequest->pTscObj->pAppInfo; SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pInfo->qnodeMutex); taosThreadMutexLock(&pInfo->qnodeMutex);
if (pInfo->pQnodeList) { if (pInfo->pQnodeList) {
*pNodeList = taosArrayDup(pInfo->pQnodeList); *pNodeList = taosArrayDup(pInfo->pQnodeList);
...@@ -267,14 +267,14 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) { ...@@ -267,14 +267,14 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
taosThreadMutexUnlock(&pInfo->qnodeMutex); taosThreadMutexUnlock(&pInfo->qnodeMutex);
if (NULL == *pNodeList) { if (NULL == *pNodeList) {
SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); SEpSet mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad)); *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, *pNodeList); code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, *pNodeList);
} }
if (TSDB_CODE_SUCCESS == code && *pNodeList) { if (TSDB_CODE_SUCCESS == code && *pNodeList) {
code = updateQnodeList(pInfo, *pNodeList); code = updateQnodeList(pInfo, *pNodeList);
} }
...@@ -342,13 +342,13 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { ...@@ -342,13 +342,13 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
tsem_init(&schdRspSem, 0, 0); tsem_init(&schdRspSem, 0, 0);
SQueryResult res = {.code = 0, .numOfRows = 0}; SQueryResult res = {.code = 0, .numOfRows = 0};
int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, schdExecCallback, &res); pRequest->metric.start, schdExecCallback, &res);
while (true) { while (true) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) { if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob); schedulerFreeJob(pRequest->body.queryJob);
...@@ -361,7 +361,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod ...@@ -361,7 +361,7 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
return pRequest->code; return pRequest->code;
} else { } else {
tsem_wait(&schdRspSem); tsem_wait(&schdRspSem);
if (res.code) { if (res.code) {
code = res.code; code = res.code;
} else { } else {
...@@ -385,7 +385,6 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod ...@@ -385,7 +385,6 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
return pRequest->code; return pRequest->code;
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
...@@ -783,7 +782,7 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, ...@@ -783,7 +782,7 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
if (NULL == pEpSet) { if (NULL == pEpSet) {
return; return;
} }
switch (pSendInfo->target.type) { switch (pSendInfo->target.type) {
case TARGET_TYPE_MNODE: case TARGET_TYPE_MNODE:
if (NULL == pTscObj) { if (NULL == pTscObj) {
...@@ -791,7 +790,7 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, ...@@ -791,7 +790,7 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
return; return;
} }
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
break; break;
case TARGET_TYPE_VNODE: { case TARGET_TYPE_VNODE: {
if (NULL == pTscObj) { if (NULL == pTscObj) {
...@@ -800,12 +799,13 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, ...@@ -800,12 +799,13 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
} }
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId, tstrerror(code)); tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
tstrerror(code));
return; return;
} }
catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet); catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
break; break;
} }
...@@ -815,12 +815,11 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, ...@@ -815,12 +815,11 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
} }
} }
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL); assert(pMsg->info.ahandle != NULL);
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
STscObj* pTscObj = NULL; STscObj* pTscObj = NULL;
if (pSendInfo->requestObjRefId != 0) { if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
...@@ -947,7 +946,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU ...@@ -947,7 +946,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return NULL; return NULL;
} }
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4); pRequest->code =
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
return NULL; return NULL;
...@@ -969,9 +969,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU ...@@ -969,9 +969,8 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return pResultInfo->row; return pResultInfo->row;
} }
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
//return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4); // return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
assert(pRequest != NULL); assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
...@@ -989,7 +988,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) ...@@ -989,7 +988,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
return NULL; return NULL;
} }
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4); pRequest->code =
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
return NULL; return NULL;
...@@ -1046,7 +1046,7 @@ static char* parseTagDatatoJson(void* p) { ...@@ -1046,7 +1046,7 @@ static char* parseTagDatatoJson(void* p) {
memset(tagJsonKey, 0, sizeof(tagJsonKey)); memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey)); memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
// json value // json value
char type = pTagVal->type; char type = pTagVal->type;
if (type == TSDB_DATA_TYPE_NULL) { if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull(); cJSON* value = cJSON_CreateNull();
if (value == NULL) { if (value == NULL) {
...@@ -1059,7 +1059,8 @@ static char* parseTagDatatoJson(void* p) { ...@@ -1059,7 +1059,8 @@ static char* parseTagDatatoJson(void* p) {
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1); char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue); int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
if (length < 0) { if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, pTagVal->pData); tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
pTagVal->pData);
taosMemoryFree(tagJsonValue); taosMemoryFree(tagJsonValue);
goto end; goto end;
} }
...@@ -1277,11 +1278,12 @@ void resetConnectDB(STscObj* pTscObj) { ...@@ -1277,11 +1278,12 @@ void resetConnectDB(STscObj* pTscObj) {
taosThreadMutexUnlock(&pTscObj->mutex); taosThreadMutexUnlock(&pTscObj->mutex);
} }
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
bool freeAfterUse) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);
taosMemoryFreeClear(pResultInfo->pRspMsg); if (freeAfterUse) taosMemoryFreeClear(pResultInfo->pRspMsg);
pResultInfo->pRspMsg = (const char*)pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows); pResultInfo->numOfRows = htonl(pRsp->numOfRows);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册