提交 e8e11190 编写于 作者: D dapan1121

Merge branch 'feature/3_liaohj' of github.com:taosdata/TDengine into feature/3_liaohj

...@@ -45,7 +45,7 @@ extern "C" { ...@@ -45,7 +45,7 @@ extern "C" {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms #define HEARTBEAT_INTERVAL 1500 // ms
#define SYNC_ON_TOP_OF_ASYNC 1 #define SYNC_ON_TOP_OF_ASYNC 0
enum { enum {
RES_TYPE__QUERY = 1, RES_TYPE__QUERY = 1,
...@@ -266,8 +266,8 @@ extern SAppInfo appInfo; ...@@ -266,8 +266,8 @@ extern SAppInfo appInfo;
extern int32_t clientReqRefPool; extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool; extern int32_t clientConnRefPool;
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); __async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo); void* createTscObj(const char* user, const char* auth, const char* db, int32_t connType, SAppInstInfo* pAppInfo);
...@@ -293,8 +293,6 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads); ...@@ -293,8 +293,6 @@ void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType); bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType); uint16_t port, int connType);
......
...@@ -278,7 +278,6 @@ void taos_init_imp(void) { ...@@ -278,7 +278,6 @@ void taos_init_imp(void) {
return; return;
} }
initMsgHandleFp();
initQueryModuleMsgHandle(); initQueryModuleMsgHandle();
rpcInit(); rpcInit();
......
...@@ -535,19 +535,20 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) { ...@@ -535,19 +535,20 @@ int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res); return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
} }
int32_t handleExecRes(SRequestObj* pRequest) { int32_t handleQueryExecRsp(SRequestObj* pRequest) {
if (NULL == pRequest->body.resInfo.execRes.res) { if (NULL == pRequest->body.resInfo.execRes.res) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = 0;
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); SAppInstInfo* pAppInfo = getAppInfo(pRequest);
int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
if (code) { if (code) {
return code; return code;
} }
SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); SEpSet epset = getEpSet_s(&pAppInfo->mgmtEp);
SQueryExecRes* pRes = &pRequest->body.resInfo.execRes; SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
switch (pRes->msgType) { switch (pRes->msgType) {
...@@ -565,8 +566,9 @@ int32_t handleExecRes(SRequestObj* pRequest) { ...@@ -565,8 +566,9 @@ int32_t handleExecRes(SRequestObj* pRequest) {
break; break;
} }
default: default:
tscError("invalid exec result for request type %d", pRequest->type); tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self,
return TSDB_CODE_APP_ERROR; pRequest->type, pRequest->requestId);
code = TSDB_CODE_APP_ERROR;
} }
return code; return code;
...@@ -578,13 +580,18 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { ...@@ -578,13 +580,18 @@ void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
// todo do nothing in clear value in request
tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); tscDebug("0x%"PRIx64" client retry to handle the error, code:%s, reqId:0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId);
pRequest->prevCode = code; pRequest->prevCode = code;
doAsyncQuery(pRequest, true); doAsyncQuery(pRequest, true);
return; return;
} }
if (code == TSDB_CODE_SUCCESS) {
code = handleQueryExecRsp(pRequest);
ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
pRequest->code = code;
}
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList); removeMeta(pTscObj, pRequest->tableList);
} }
...@@ -623,7 +630,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue ...@@ -623,7 +630,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
} }
handleExecRes(pRequest); handleQueryExecRsp(pRequest);
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
pRequest->code = terrno; pRequest->code = terrno;
...@@ -688,9 +695,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -688,9 +695,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob, schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest); pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
// if (NULL != pRes) {
// code = validateSversion(pRequest, pRes);
// }
} }
//todo not to be released here //todo not to be released here
...@@ -884,7 +888,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { ...@@ -884,7 +888,7 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType);
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
SConnectReq connectReq = {0}; SConnectReq connectReq = {0};
......
...@@ -634,7 +634,7 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { ...@@ -634,7 +634,7 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
// return to app directly // return to app directly
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:%" PRIx64, pRequest->self, tstrerror(code), tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId); pRequest->requestId);
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
...@@ -748,7 +748,6 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { ...@@ -748,7 +748,6 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
static void fetchCallback(void* pResult, void* param, int32_t code) { static void fetchCallback(void* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param; SRequestObj* pRequest = (SRequestObj*) param;
......
...@@ -21,8 +21,6 @@ ...@@ -21,8 +21,6 @@
#include "tdef.h" #include "tdef.h"
#include "tname.h" #include "tname.h"
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
static void setErrno(SRequestObj* pRequest, int32_t code) { static void setErrno(SRequestObj* pRequest, int32_t code) {
pRequest->code = code; pRequest->code = code;
terrno = code; terrno = code;
...@@ -107,10 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) { ...@@ -107,10 +105,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg; pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
? genericRspCallback
: handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo; return pMsgSendInfo;
} }
...@@ -209,7 +204,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -209,7 +204,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) { int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL && param != NULL); assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param; SRequestObj* pRequest = param;
...@@ -285,13 +280,21 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -285,13 +280,21 @@ int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code; return code;
} }
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
// todo refactor: this arraylist is too large switch (msgType) {
void initMsgHandleFp() { case TDMT_MND_CONNECT:
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp; return processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp; case TDMT_MND_CREATE_DB:
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp; return processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp; case TDMT_MND_USE_DB:
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp; return processUseDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = processAlterStbRsp; case TDMT_MND_CREATE_STB:
return processCreateSTableRsp;
case TDMT_MND_DROP_DB:
return processDropDbRsp;
case TDMT_MND_ALTER_STB:
return processAlterStbRsp;
default:
return genericRspCallback;
}
} }
...@@ -1497,19 +1497,25 @@ char *buildRetension(SArray *pRetension) { ...@@ -1497,19 +1497,25 @@ char *buildRetension(SArray *pRetension) {
int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
p = taosArrayGet(pRetension, 1); if (size > 1) {
len += sprintf(p1 + len, ",");
p = taosArrayGet(pRetension, 1);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep); v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
}
p = taosArrayGet(pRetension, 2); if (size > 2) {
len += sprintf(p1 + len, ",");
p = taosArrayGet(pRetension, 2);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq); v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep); v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit); len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
}
varDataSetLen(p1, len); varDataSetLen(p1, len);
return p1; return p1;
......
...@@ -2903,7 +2903,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, ...@@ -2903,7 +2903,6 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
} }
} }
taosArrayDestroy(dbCfg.pRetensions);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册