提交 b15eb994 编写于 作者: H Haojun Liao

[TD-225]refactor codes.

上级 5757003f
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TSC_LOG_H #ifndef TDENGINE_TSCLOG_H
#define TDENGINE_TSC_LOG_H #define TDENGINE_TSCLOG_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -22,7 +22,7 @@ extern "C" { ...@@ -22,7 +22,7 @@ extern "C" {
#include "tlog.h" #include "tlog.h"
extern int32_t cDebugFlag; extern uint32_t cDebugFlag;
extern int8_t tscEmbedded; extern int8_t tscEmbedded;
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0) #define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_TSCJOINPROCESS_H #ifndef TDENGINE_TSCSUBQUERY_H
#define TDENGINE_TSCJOINPROCESS_H #define TDENGINE_TSCSUBQUERY_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -52,4 +52,4 @@ void tscUnlockByThread(int64_t *lockedBy); ...@@ -52,4 +52,4 @@ void tscUnlockByThread(int64_t *lockedBy);
} }
#endif #endif
#endif // TDENGINE_TSCJOINPROCESS_H #endif // TDENGINE_TSCSUBQUERY_H
...@@ -327,8 +327,8 @@ typedef struct SSqlObj { ...@@ -327,8 +327,8 @@ typedef struct SSqlObj {
pthread_t owner; // owner of sql object, by which it is executed pthread_t owner; // owner of sql object, by which it is executed
STscObj *pTscObj; STscObj *pTscObj;
int64_t rpcRid; int64_t rpcRid;
void (*fp)(); __async_cb_func_t fp;
void (*fetchFp)(); __async_cb_func_t fetchFp;
void *param; void *param;
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
......
...@@ -32,7 +32,6 @@ taos_errstr ...@@ -32,7 +32,6 @@ taos_errstr
taos_errno taos_errno
taos_query_a taos_query_a
taos_fetch_rows_a taos_fetch_rows_a
taos_fetch_row_a
taos_subscribe taos_subscribe
taos_consume taos_consume
taos_unsubscribe taos_unsubscribe
......
...@@ -20,13 +20,11 @@ ...@@ -20,13 +20,11 @@
#include "trpc.h" #include "trpc.h"
#include "tscLog.h" #include "tscLog.h"
#include "tscSubquery.h" #include "tscSubquery.h"
#include "tscLocalMerge.h"
#include "tscUtil.h" #include "tscUtil.h"
#include "tsched.h" #include "tsched.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()); static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)());
...@@ -37,7 +35,6 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo ...@@ -37,7 +35,6 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
* query), it will sequentially query&retrieve data for all vnodes * query), it will sequentially query&retrieve data for all vnodes
*/ */
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) { void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
...@@ -191,11 +188,6 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf ...@@ -191,11 +188,6 @@ static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOf
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy); tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
} }
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
// query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchSingleRowProxy);
}
void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
SSqlObj *pSql = (SSqlObj *)taosa; SSqlObj *pSql = (SSqlObj *)taosa;
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
...@@ -263,103 +255,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) { ...@@ -263,103 +255,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, __async_cb_func_t fp, void *param) {
} }
} }
void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), void *param) {
SSqlObj *pSql = (SSqlObj *)taosa;
if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
return;
}
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
pSql->param = param;
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscAsyncResultOnError(pSql);
return;
}
pSql->fetchFp = fp;
pSql->param = param;
if (pRes->row >= pRes->numOfRows) {
tscResetForNextRetrieve(pRes);
pSql->fp = tscAsyncFetchSingleRowProxy;
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
tscProcessSql(pSql);
} else {
SSchedMsg schedMsg = { 0 };
schedMsg.fp = tscProcessFetchRow;
schedMsg.ahandle = pSql;
schedMsg.thandle = pRes->tsrow;
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
}
void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj *pSql = (SSqlObj *)tres;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (numOfRows == 0) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode);
} else {
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
(*pSql->fetchFp)(pSql->param, pSql, NULL);
}
return;
}
for (int i = 0; i < pCmd->numOfCols; ++i){
SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) {
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pSup->pSqlExpr->resBytes * pRes->row;
} else {
//todo add
}
}
pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
}
void tscProcessFetchRow(SSchedMsg *pMsg) {
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < pCmd->numOfCols; ++i) {
SInternalField* pSup = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
if (pSup->pSqlExpr != NULL) {
tscGetResultColumnChr(pRes, &pQueryInfo->fieldsInfo, i, 0);
} else {
// todo add
}
}
pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow);
}
// this function will be executed by queue task threads, so the terrno is not valid // this function will be executed by queue task threads, so the terrno is not valid
static void tscProcessAsyncError(SSchedMsg *pMsg) { static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle; void (*fp)() = pMsg->ahandle;
...@@ -372,7 +267,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { ...@@ -372,7 +267,7 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
int32_t* c = malloc(sizeof(int32_t)); int32_t* c = malloc(sizeof(int32_t));
*c = code; *c = code;
SSchedMsg schedMsg = { 0 }; SSchedMsg schedMsg = {0};
schedMsg.fp = tscProcessAsyncError; schedMsg.fp = tscProcessAsyncError;
schedMsg.ahandle = fp; schedMsg.ahandle = fp;
schedMsg.thandle = param; schedMsg.thandle = param;
...@@ -380,7 +275,6 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) { ...@@ -380,7 +275,6 @@ void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
taosScheduleTask(tscQhandle, &schedMsg); taosScheduleTask(tscQhandle, &schedMsg);
} }
void tscAsyncResultOnError(SSqlObj *pSql) { void tscAsyncResultOnError(SSqlObj *pSql) {
if (pSql == NULL || pSql->signature != pSql) { if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pSql); tscDebug("%p SqlObj is freed, not add into queue async res", pSql);
......
...@@ -339,7 +339,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -339,7 +339,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pSql->retry > pSql->maxRetry) { if (pSql->retry > pSql->maxRetry) {
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry); tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
} else { } else {
// wait for a little bit moment and then retry, todo do not sleep in rpc callback thread // wait for a little bit moment and then retry
// todo do not sleep in rpc callback thread, add this process into queueu to process
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
int32_t duration = getWaitingTimeInterval(pSql->retry); int32_t duration = getWaitingTimeInterval(pSql->retry);
taosMsleep(duration); taosMsleep(duration);
...@@ -1178,7 +1179,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1178,7 +1179,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int32_t tscBuildDropUserMsg(SSqlObj *pSql, SSqlInfo * UNUSED_PARAM(pInfo)) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SDropUserMsg); pCmd->payloadLen = sizeof(SDropUserMsg);
pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER; pCmd->msgType = TSDB_MSG_TYPE_CM_DROP_USER;
...@@ -2099,7 +2100,7 @@ int tscProcessShowRsp(SSqlObj *pSql) { ...@@ -2099,7 +2100,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
return 0; return 0;
} }
static void createHBObj(STscObj* pObj) { static void createHbObj(STscObj* pObj) {
if (pObj->hbrid != 0) { if (pObj->hbrid != 0) {
return; return;
} }
...@@ -2162,7 +2163,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) { ...@@ -2162,7 +2163,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
pObj->superAuth = pConnect->superAuth; pObj->superAuth = pConnect->superAuth;
pObj->connId = htonl(pConnect->connId); pObj->connId = htonl(pConnect->connId);
createHBObj(pObj); createHbObj(pObj);
//launch a timer to send heartbeat to maintain the connection and send status to mnode //launch a timer to send heartbeat to maintain the connection and send status to mnode
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer); taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, (void *)pObj->rid, tscTmr, &pObj->pTimer);
......
...@@ -180,7 +180,7 @@ extern int32_t tsLogKeepDays; ...@@ -180,7 +180,7 @@ extern int32_t tsLogKeepDays;
extern int32_t dDebugFlag; extern int32_t dDebugFlag;
extern int32_t vDebugFlag; extern int32_t vDebugFlag;
extern int32_t mDebugFlag; extern int32_t mDebugFlag;
extern int32_t cDebugFlag; extern uint32_t cDebugFlag;
extern int32_t jniDebugFlag; extern int32_t jniDebugFlag;
extern int32_t tmrDebugFlag; extern int32_t tmrDebugFlag;
extern int32_t sdbDebugFlag; extern int32_t sdbDebugFlag;
......
...@@ -212,7 +212,7 @@ int32_t mDebugFlag = 131; ...@@ -212,7 +212,7 @@ int32_t mDebugFlag = 131;
int32_t sdbDebugFlag = 131; int32_t sdbDebugFlag = 131;
int32_t dDebugFlag = 135; int32_t dDebugFlag = 135;
int32_t vDebugFlag = 135; int32_t vDebugFlag = 135;
int32_t cDebugFlag = 131; uint32_t cDebugFlag = 131;
int32_t jniDebugFlag = 131; int32_t jniDebugFlag = 131;
int32_t odbcDebugFlag = 131; int32_t odbcDebugFlag = 131;
int32_t httpDebugFlag = 131; int32_t httpDebugFlag = 131;
......
...@@ -140,7 +140,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); ...@@ -140,7 +140,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); //DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param);
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);
......
...@@ -261,9 +261,6 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code) ...@@ -261,9 +261,6 @@ void taos_select_call_back(void *param, TAOS_RES *tres, int code)
if (code == 0 && tres) { if (code == 0 && tres) {
// asynchronous API to fetch a batch of records // asynchronous API to fetch a batch of records
taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable); taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable);
// taos_fetch_row_a is a less efficient way to retrieve records since it call back app for every row
// taos_fetch_row_a(tres, taos_fetch_row_call_back, pTable);
} }
else { else {
printf("%s select failed, code:%d\n", pTable->name, code); printf("%s select failed, code:%d\n", pTable->name, code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册