提交 42d6ab3b 编写于 作者: dengyihao's avatar dengyihao

refactor client queue

上级 365a28f3
......@@ -222,8 +222,8 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
......@@ -324,7 +324,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
SRequestObj** pRequest);
void taos_close_internal(void* taos);
......@@ -358,9 +359,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
bool qnodeRequired(SRequestObj* pRequest);
void initTscQhandle();
void cleanupTscQhandle();
#ifdef __cplusplus
}
#endif
......
......@@ -35,22 +35,10 @@ SAppInfo appInfo;
int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1;
void *tscQhandle = NULL;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
void initTscQhandle() {
// init handle
tscQhandle = taosInitScheduler(4096, 5, "tsc");
}
void cleanupTscQhandle() {
// destroy handle
taosCleanUpScheduler(tscQhandle);
}
static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
// connection has been released already, abort creating request.
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
......@@ -72,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL);
STscObj * pTscObj = pRequest->pTscObj;
STscObj *pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
......@@ -97,7 +85,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
}
return true;
......@@ -248,7 +237,7 @@ void *createRequest(uint64_t connId, int32_t type) {
return NULL;
}
STscObj* pTscObj = acquireTscObj(connId);
STscObj *pTscObj = acquireTscObj(connId);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
......@@ -345,7 +334,6 @@ void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
initTscQhandle();
errno = TSDB_CODE_SUCCESS;
taosSeedRand(taosGetTimestampSec());
......@@ -404,7 +392,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
return 0;
}
SConfig * pCfg = taosGetCfg();
SConfig *pCfg = taosGetCfg();
SConfigItem *pItem = NULL;
switch (option) {
......
......@@ -1274,8 +1274,8 @@ typedef struct SchedArg {
SEpSet* pEpset;
} SchedArg;
void doProcessMsgFromServer(SSchedMsg* schedMsg) {
SchedArg* arg = (SchedArg*)schedMsg->ahandle;
int32_t doProcessMsgFromServer(void* param) {
SchedArg* arg = (SchedArg*)param;
SRpcMsg* pMsg = &arg->msg;
SEpSet* pEpSet = arg->pEpset;
......@@ -1328,11 +1328,10 @@ void doProcessMsgFromServer(SSchedMsg* schedMsg) {
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg);
return TSDB_CODE_SUCCESS;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SSchedMsg schedMsg = {0};
SEpSet* tEpSet = NULL;
if (pEpSet != NULL) {
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
......@@ -1343,9 +1342,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
arg->msg = *pMsg;
arg->pEpset = tEpSet;
schedMsg.fp = doProcessMsgFromServer;
schedMsg.ahandle = arg;
taosScheduleTask(tscQhandle, &schedMsg);
taosAsyncExec(doProcessMsgFromServer, arg, NULL);
}
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
......
......@@ -72,7 +72,6 @@ void taos_cleanup(void) {
catalogDestroy();
schedulerDestroy();
cleanupTscQhandle();
rpcCleanup();
tscInfo("all local resources released");
taosCleanupCfg();
......@@ -242,7 +241,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
#endif
} else if (TD_RES_TMQ(res)) {
SMqRspObj * msg = ((SMqRspObj *)res);
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo;
if (msg->resIter == -1) {
pResultInfo = tmqGetNextResInfo(res, true);
......@@ -418,7 +417,7 @@ int taos_affected_rows(TAOS_RES *res) {
return 0;
}
SRequestObj * pRequest = (SRequestObj *)res;
SRequestObj *pRequest = (SRequestObj *)res;
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
return pResInfo->numOfRows;
}
......@@ -601,7 +600,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
}
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
TAOS_FIELD * pField = &pResInfo->userFields[columnIndex];
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
if (!IS_VAR_DATA_TYPE(pField->type)) {
return 0;
}
......@@ -645,8 +644,8 @@ const char *taos_get_server_info(TAOS *taos) {
typedef struct SqlParseWrapper {
SParseContext *pCtx;
SCatalogReq catalogReq;
SRequestObj * pRequest;
SQuery * pQuery;
SRequestObj *pRequest;
SQuery *pQuery;
} SqlParseWrapper;
static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
......@@ -665,8 +664,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery * pQuery = pWrapper->pQuery;
SRequestObj * pRequest = pWrapper->pRequest;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
......@@ -684,7 +683,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
destorySqlParseWrapper(pWrapper);
tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId);
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta);
} else {
destorySqlParseWrapper(pWrapper);
......@@ -705,7 +705,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
int64_t connId = *(int64_t*)taos;
int64_t connId = *(int64_t *)taos;
taosAsyncQueryImpl(connId, sql, fp, param, false);
}
......@@ -739,7 +739,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SParseContext *pCxt = NULL;
STscObj * pTscObj = pRequest->pTscObj;
STscObj *pTscObj = pRequest->pTscObj;
int32_t code = 0;
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
......@@ -865,9 +865,9 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}
SSchedulerReq req = {
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
.syncReq = false,
.fetchFp = fetchCallback,
.cbParam = pRequest,
};
schedulerFetchRows(pRequest->body.queryJob, &req);
}
......@@ -876,7 +876,7 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL);
ASSERT(TD_RES_QUERY(res));
SRequestObj *pRequest = res;
SRequestObj *pRequest = res;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
// set the current block is all consumed
......@@ -918,7 +918,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
int64_t connId = *(int64_t *)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0;
SRequestObj * pRequest = NULL;
SRequestObj *pRequest = NULL;
SCatalogReq catalogReq = {0};
if (NULL == tableNameList) {
......@@ -940,7 +940,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return;
}
STscObj* pTscObj = pRequest->pTscObj;
STscObj *pTscObj = pRequest->pTscObj;
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
if (code) {
goto _return;
......@@ -962,7 +962,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return;
}
SSyncQueryParam* pParam = pRequest->body.param;
SSyncQueryParam *pParam = pRequest->body.param;
tsem_wait(&pParam->sem);
_return:
......
......@@ -282,7 +282,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
}
*str = '"';
int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1);
if (length <= 0) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -310,15 +310,15 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(len) *len = n;
if (len) *len = n;
return TSDB_CODE_SUCCESS;
}
char* parseTagDatatoJson(void* p) {
char* string = NULL;
char* string = NULL;
SArray* pTagVals = NULL;
cJSON* json = NULL;
cJSON* json = NULL;
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
goto end;
}
......@@ -327,7 +327,7 @@ char* parseTagDatatoJson(void* p) {
if (nCols == 0) {
goto end;
}
char tagJsonKey[256] = {0};
char tagJsonKey[256] = {0};
json = cJSON_CreateObject();
if (json == NULL) {
goto end;
......@@ -390,7 +390,7 @@ char* parseTagDatatoJson(void* p) {
end:
cJSON_Delete(json);
taosArrayDestroy(pTagVals);
if(string == NULL){
if (string == NULL) {
string = strdup(TSDB_DATA_NULL_STR_L);
}
return string;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册