提交 4fd2fb06 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 d5a5dcf9
...@@ -46,6 +46,8 @@ extern "C" { ...@@ -46,6 +46,8 @@ extern "C" {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms #define HEARTBEAT_INTERVAL 1500 // ms
#define SYNC_ON_TOP_OF_ASYNC 0
enum { enum {
RES_TYPE__QUERY = 1, RES_TYPE__QUERY = 1,
RES_TYPE__TMQ, RES_TYPE__TMQ,
......
...@@ -1083,6 +1083,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc ...@@ -1083,6 +1083,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc
SSyncQueryParam* pParam = pRequest->body.param; SSyncQueryParam* pParam = pRequest->body.param;
taos_fetch_rows_a(pRequest, syncFetchFn, pParam); taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
tsem_wait(&pParam->sem);
} }
if (setupOneRowPtr) { if (setupOneRowPtr) {
......
...@@ -189,7 +189,16 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { ...@@ -189,7 +189,16 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
} }
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
#if 0
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(struct SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param);
tsem_wait(&param->sem);
return param->pRequest;
#else
size_t sqlLen = strlen(sql); size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
...@@ -199,14 +208,6 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { ...@@ -199,14 +208,6 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return execQuery(pTscObj, sql, sqlLen); return execQuery(pTscObj, sql, sqlLen);
#endif #endif
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(struct SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param);
tsem_wait(&param->sem);
return param->pRequest;
} }
TAOS_ROW taos_fetch_row(TAOS_RES *res) { TAOS_ROW taos_fetch_row(TAOS_RES *res) {
...@@ -221,8 +222,12 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -221,8 +222,12 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL; return NULL;
} }
#if SYNC_ON_TOP_OF_ASYNC
return doAsyncFetchRow(pRequest, true, true); return doAsyncFetchRow(pRequest, true, true);
// return doFetchRows(pRequest, true, true); #else
return doFetchRows(pRequest, true, true);
#endif
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res)) {
SMqRspObj *msg = ((SMqRspObj *)res); SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo; SReqResultInfo *pResultInfo;
......
...@@ -94,7 +94,7 @@ TEST(testCase, driverInit_Test) { ...@@ -94,7 +94,7 @@ TEST(testCase, driverInit_Test) {
// taosInitGlobalCfg(); // taosInitGlobalCfg();
// taos_init(); // taos_init();
} }
#if 0
TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); // taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
...@@ -645,7 +645,7 @@ TEST(testCase, projection_query_tables) { ...@@ -645,7 +645,7 @@ TEST(testCase, projection_query_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 1000000; i += 20) { for(int32_t i = 0; i < 10000; i += 20) {
char sql[1024] = {0}; char sql[1024] = {0};
sprintf(sql, sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" "insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...@@ -772,10 +772,9 @@ TEST(testCase, agg_query_tables) { ...@@ -772,10 +772,9 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
/* /*
--- copy the following script in the shell to setup the environment. --- copy the following script in the shell to setup the environment ---
create database test; create database test;
use test; use test;
......
...@@ -21,9 +21,9 @@ ...@@ -21,9 +21,9 @@
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("acquire job:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); }
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); } FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release job:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); }
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan; pTask->plan = pPlan;
......
...@@ -124,7 +124,8 @@ int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* pa ...@@ -124,7 +124,8 @@ int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* pa
SSchJob *pJob = schAcquireJob(job); SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) { if (NULL == pJob) {
qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); fp(NULL, param, TSDB_CODE_SCH_STATUS_ERROR);
return TSDB_CODE_SCH_STATUS_ERROR;
} }
pJob->attr.syncSchedule = false; pJob->attr.syncSchedule = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册