diff --git a/src/client/inc/tscLog.h b/src/client/inc/tscLog.h index 5273a87ea0d0549420acf2b6679f50ce22159ebc..f25ec02bd8204d124477e6eba1070520b4d00c4e 100644 --- a/src/client/inc/tscLog.h +++ b/src/client/inc/tscLog.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TSC_LOG_H -#define TDENGINE_TSC_LOG_H +#ifndef TDENGINE_TSCLOG_H +#define TDENGINE_TSCLOG_H #ifdef __cplusplus extern "C" { @@ -22,7 +22,7 @@ extern "C" { #include "tlog.h" -extern int32_t cDebugFlag; +extern uint32_t cDebugFlag; extern int8_t tscEmbedded; #define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index 43c9b009bfdc537b033b7cffbe00bb673c627847..7529891635f47fb4f2b68ede5b48ed5640c8f00a 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_TSCJOINPROCESS_H -#define TDENGINE_TSCJOINPROCESS_H +#ifndef TDENGINE_TSCSUBQUERY_H +#define TDENGINE_TSCSUBQUERY_H #ifdef __cplusplus extern "C" { @@ -52,4 +52,4 @@ void tscUnlockByThread(int64_t *lockedBy); } #endif -#endif // TDENGINE_TSCJOINPROCESS_H +#endif // TDENGINE_TSCSUBQUERY_H diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 142f8063e35e07b9d8714de108ccccecd43e168c..901ae0359e66a5aba438599110027efd263c87ae 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -327,8 +327,8 @@ typedef struct SSqlObj { pthread_t owner; // owner of sql object, by which it is executed STscObj *pTscObj; int64_t rpcRid; - void (*fp)(); - void (*fetchFp)(); + __async_cb_func_t fp; + __async_cb_func_t fetchFp; void *param; int64_t stime; uint32_t queryId; diff --git a/src/client/src/taos.def b/src/client/src/taos.def index 49d7290ce7bbb9cf64bd71c8d263ad3913576a71..43cd8190614facc207bbad0885360f23c8216c8a 100644 --- a/src/client/src/taos.def +++ b/src/client/src/taos.def @@ -32,7 +32,6 @@ taos_errstr taos_errno taos_query_a taos_fetch_rows_a -taos_fetch_row_a taos_subscribe taos_consume taos_unsubscribe diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 37207858a80786d60c97397bb7d579be0e25d685..a740b9e2ba9a5ece1d389f1a0432faf147dc3a26 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -20,13 +20,11 @@ #include "trpc.h" #include "tscLog.h" #include "tscSubquery.h" -#include "tscLocalMerge.h" #include "tscUtil.h" #include "tsched.h" #include "tschemautil.h" #include "tsclient.h" -static void tscProcessFetchRow(SSchedMsg *pMsg); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()); @@ -37,7 +35,6 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo * query), it will sequentially query&retrieve data for all vnodes */ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); -static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) { SSqlCmd* pCmd = &pSql->cmd; @@ -191,11 +188,6 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy); } -void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) { - // query completed, continue to retrieve - tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy); -} - void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { SSqlObj *pSql = (SSqlObj *)taosa; if (pSql == NULL || pSql->signature != pSql) { @@ -263,103 +255,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { } } -void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) { - SSqlObj *pSql = (SSqlObj *)taosa; - if (pSql == NULL || pSql->signature != pSql) { - tscError("sql object is NULL"); - tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED); - return; - } - - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - if (pRes->qhandle == 0) { - tscError("qhandle is NULL"); - pSql->param = param; - pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; - - tscAsyncResultOnError(pSql); - return; - } - - pSql->fetchFp = fp; - pSql->param = param; - - if (pRes->row >= pRes->numOfRows) { - tscResetForNextRetrieve(pRes); - pSql->fp = tscAsyncFetchSingleRowProxy; - - if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; - } - - tscProcessSql(pSql); - } else { - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscProcessFetchRow; - schedMsg.ahandle = pSql; - schedMsg.thandle = pRes->tsrow; - schedMsg.msg = NULL; - taosScheduleTask(tscQhandle, &schedMsg); - } -} - -void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) { - SSqlObj *pSql = (SSqlObj *)tres; - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - if (numOfRows == 0) { - if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. - tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode); - } else { - /* - * 1. has reach the limitation - * 2. no remain virtual nodes to be retrieved anymore - */ - (*pSql->fetchFp)(pSql->param, pSql, NULL); - } - return; - } - - for (int i = 0; i < pCmd->numOfCols; ++i){ - SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); - if (pSup->pSqlExpr != NULL) { -// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row; - } else { - //todo add - } - } - - pRes->row++; - - (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow); -} - -void tscProcessFetchRow(SSchedMsg *pMsg) { - SSqlObj *pSql = (SSqlObj *)pMsg->ahandle; - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - for (int i = 0; i < pCmd->numOfCols; ++i) { - SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i); - - if (pSup->pSqlExpr != NULL) { - tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0); - } else { -// todo add - } - } - - pRes->row++; - (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); -} - // this function will be executed by queue task threads, so the terrno is not valid static void tscProcessAsyncError(SSchedMsg *pMsg) { void (*fp)() = pMsg->ahandle; @@ -372,7 +267,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { int32_t* c = malloc(sizeof(int32_t)); *c = code; - SSchedMsg schedMsg = { 0 }; + SSchedMsg schedMsg = {0}; schedMsg.fp = tscProcessAsyncError; schedMsg.ahandle = fp; schedMsg.thandle = param; @@ -380,7 +275,6 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { taosScheduleTask(tscQhandle, &schedMsg); } - void tscAsyncResultOnError(SSqlObj *pSql) { if (pSql == NULL || pSql->signature != pSql) { tscDebug("%p SqlObj is freed, not add into queue async res", pSql); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 402472ebb4fabfcc8ece7b9b36e8f22c382578ea..62791e750a35a98df9b132809165f0aea6b18f7c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -339,7 +339,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pSql->retry > pSql->maxRetry) { tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); } else { - // wait for a little bit moment and then retry, todo do not sleep in rpc callback thread + // wait for a little bit moment and then retry + // todo do not sleep in rpc callback thread, add this process into queueu to process if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { int32_t duration = getWaitingTimeInterval(pSql->retry); taosMsleep(duration); @@ -1178,7 +1179,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return TSDB_CODE_SUCCESS; } -int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { +int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo * UNUSED_PARAM(pInfo)) { SSqlCmd *pCmd = &pSql->cmd; pCmd->payloadLen = sizeof(SDropUserMsg); pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER; @@ -2099,7 +2100,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { return 0; } -static void createHBObj(STscObj* pObj) { +static void createHbObj(STscObj* pObj) { if (pObj->hbrid != 0) { return; } @@ -2162,7 +2163,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->superAuth = pConnect->superAuth; pObj->connId = htonl(pConnect->connId); - createHBObj(pObj); + createHbObj(pObj); //launch a timer to send heartbeat to maintain the connection and send status to mnode taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer); diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index c54d519637cad547fac2d1960e809ec2c615274d..54334d49f3439729af798fdc9eb6be07df413ae5 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -180,7 +180,7 @@ extern int32_t tsLogKeepDays; extern int32_t dDebugFlag; extern int32_t vDebugFlag; extern int32_t mDebugFlag; -extern int32_t cDebugFlag; +extern uint32_t cDebugFlag; extern int32_t jniDebugFlag; extern int32_t tmrDebugFlag; extern int32_t sdbDebugFlag; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 125e80564295401cb17337f09e66cf4955ecd944..cfe91d351911dd723adb6948d492593ed782c73c 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -212,7 +212,7 @@ int32_t mDebugFlag = 131; int32_t sdbDebugFlag = 131; int32_t dDebugFlag = 135; int32_t vDebugFlag = 135; -int32_t cDebugFlag = 131; +uint32_t cDebugFlag = 131; int32_t jniDebugFlag = 131; int32_t odbcDebugFlag = 131; int32_t httpDebugFlag = 131; diff --git a/src/inc/taos.h b/src/inc/taos.h index 5e4f50e31d6016ebd1e01aecb59d8646180ce76a..05d390ffd0cdf4bf0acab82714c867777d9593d4 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -140,7 +140,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); -DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); +//DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); diff --git a/tests/examples/c/asyncdemo.c b/tests/examples/c/asyncdemo.c index c6cc89b31d6280c45ea30b33509eed5ebdf0dc08..be3a908f11748a5ac414157bef6b9caed389b303 100644 --- a/tests/examples/c/asyncdemo.c +++ b/tests/examples/c/asyncdemo.c @@ -261,9 +261,6 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code) if (code == 0 && tres) { // asynchronous API to fetch a batch of records taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable); - - // taos_fetch_row_a is a less efficient way to retrieve records since it call back app for every row - // taos_fetch_row_a(tres, taos_fetch_row_call_back, pTable); } else { printf("%s select failed, code:%d\n", pTable->name, code);