未验证 提交 3c152bfd 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10302 from taosdata/feature/trans_impl

fix query crash on super table
...@@ -113,6 +113,7 @@ typedef struct STableMetaOutput { ...@@ -113,6 +113,7 @@ typedef struct STableMetaOutput {
typedef struct SDataBuf { typedef struct SDataBuf {
void *pData; void *pData;
uint32_t len; uint32_t len;
void *handle;
} SDataBuf; } SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
......
...@@ -395,8 +395,10 @@ static void* hbThreadFunc(void* param) { ...@@ -395,8 +395,10 @@ static void* hbThreadFunc(void* param) {
hbClearReqInfo(pAppHbMgr); hbClearReqInfo(pAppHbMgr);
break; break;
} }
tSerializeSClientHbBatchReq(buf, tlen, pReq); tSerializeSClientHbBatchReq(buf, tlen, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); SMsgSendInfo *pInfo = calloc(1, sizeof(SMsgSendInfo));
if (pInfo == NULL) { if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq, false); tFreeClientHbBatchReq(pReq, false);
......
...@@ -12,10 +12,10 @@ ...@@ -12,10 +12,10 @@
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tref.h" #include "tref.h"
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -30,17 +30,11 @@ static bool stringLengthCheck(const char* str, size_t maxsize) { ...@@ -30,17 +30,11 @@ static bool stringLengthCheck(const char* str, size_t maxsize) {
return true; return true;
} }
static bool validateUserName(const char* user) { static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
return stringLengthCheck(user, TSDB_USER_LEN - 1);
}
static bool validatePassword(const char* passwd) { static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
}
static bool validateDbName(const char* db) { static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) { static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
char key[512] = {0}; char key[512] = {0};
...@@ -48,10 +42,12 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i ...@@ -48,10 +42,12 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
return strdup(key); return strdup(key);
} }
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); SAppInstInfo* pAppInfo);
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port) {
if (taos_init() != TSDB_CODE_SUCCESS) { if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
...@@ -63,7 +59,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -63,7 +59,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
char localDb[TSDB_DB_NAME_LEN] = {0}; char localDb[TSDB_DB_NAME_LEN] = {0};
if (db != NULL) { if (db != NULL) {
if(!validateDbName(db)) { if (!validateDbName(db)) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
return NULL; return NULL;
} }
...@@ -79,7 +75,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -79,7 +75,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return NULL; return NULL;
} }
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt); taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
} else { } else {
tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt)); tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
} }
...@@ -99,7 +95,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -99,7 +95,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
} }
} }
char* key = getClusterKey(user, secretEncrypt, ip, port); char* key = getClusterKey(user, secretEncrypt, ip, port);
SAppInstInfo** pInst = NULL; SAppInstInfo** pInst = NULL;
pthread_mutex_lock(&appInfo.mutex); pthread_mutex_lock(&appInfo.mutex);
...@@ -108,7 +104,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -108,7 +104,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo* p = NULL; SAppInstInfo* p = NULL;
if (pInst == NULL) { if (pInst == NULL) {
p = calloc(1, sizeof(struct SAppInstInfo)); p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p, key); p->pAppHbMgr = appHbMgrInit(p, key);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
...@@ -122,7 +118,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -122,7 +118,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst); return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
} }
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) { int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
*pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT); *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
if (*pRequest == NULL) { if (*pRequest == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj");
...@@ -131,7 +127,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* ...@@ -131,7 +127,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(*pRequest)->sqlstr = malloc(sqlLen + 1); (*pRequest)->sqlstr = malloc(sqlLen + 1);
if ((*pRequest)->sqlstr == NULL) { if ((*pRequest)->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self); tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
(*pRequest)->msgBuf = strdup("failed to prepare sql string buffer"); (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -140,7 +136,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* ...@@ -140,7 +136,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj*
(*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -148,14 +144,14 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { ...@@ -148,14 +144,14 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SParseContext cxt = { SParseContext cxt = {
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.acctId = pTscObj->acctId, .acctId = pTscObj->acctId,
.db = getDbOfConnection(pTscObj), .db = getDbOfConnection(pTscObj),
.pSql = pRequest->sqlstr, .pSql = pRequest->sqlstr,
.sqlLen = pRequest->sqlLen, .sqlLen = pRequest->sqlLen,
.pMsg = pRequest->msgBuf, .pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pTransporter = pTscObj->pAppInfo->pTransporter, .pTransporter = pTscObj->pAppInfo->pTransporter,
}; };
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
...@@ -174,9 +170,9 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { ...@@ -174,9 +170,9 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
pRequest->type = pDcl->msgType; pRequest->type = pDcl->msgType;
pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen}; pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen, .handle = NULL};
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
...@@ -202,7 +198,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, ...@@ -202,7 +198,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag,
SSchema* pSchema = NULL; SSchema* pSchema = NULL;
int32_t numOfCols = 0; int32_t numOfCols = 0;
int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId); int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
if (code != 0) { if (code != 0) {
return code; return code;
} }
...@@ -224,7 +220,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t ...@@ -224,7 +220,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
pResInfo->fields[i].bytes = pSchema[i].bytes; pResInfo->fields[i].bytes = pSchema[i].bytes;
pResInfo->fields[i].type = pSchema[i].type; pResInfo->fields[i].type = pSchema[i].type;
tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
} }
} }
...@@ -233,7 +229,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) ...@@ -233,7 +229,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res); int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// handle error and retry // handle error and retry
} else { } else {
...@@ -250,9 +246,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) ...@@ -250,9 +246,9 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob); return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob);
} }
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
STscObj *pTscObj = (STscObj *)taos; STscObj* pTscObj = (STscObj*)taos;
if (sqlLen > (size_t) TSDB_MAX_ALLOWED_SQL_LEN) { if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL; return NULL;
...@@ -260,9 +256,9 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -260,9 +256,9 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
nPrintTsc("%s", sql) nPrintTsc("%s", sql)
SRequestObj *pRequest = NULL; SRequestObj* pRequest = NULL;
SQueryNode *pQueryNode = NULL; SQueryNode* pQueryNode = NULL;
SArray *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
...@@ -286,13 +282,13 @@ _return: ...@@ -286,13 +282,13 @@ _return:
return pRequest; return pRequest;
} }
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) { int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
pEpSet->version = 0; pEpSet->version = 0;
// init mnode ip set // init mnode ip set
SEpSet *mgmtEpSet = &(pEpSet->epSet); SEpSet* mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0; mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0; mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) { if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) { if (strlen(firstEp) >= TSDB_EP_LEN) {
...@@ -322,14 +318,15 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe ...@@ -322,14 +318,15 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
return 0; return 0;
} }
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) { STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo); SAppInstInfo* pAppInfo) {
STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
if (NULL == pTscObj) { if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pTscObj; return pTscObj;
} }
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT); SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
if (pRequest == NULL) { if (pRequest == NULL) {
destroyTscObj(pTscObj); destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -343,14 +340,16 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t ...@@ -343,14 +340,16 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); const char* errorMsg =
(pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
printf("failed to connect to server, reason: %s\n\n", errorMsg); printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest); destroyRequest(pRequest);
taos_close(pTscObj); taos_close(pTscObj);
pTscObj = NULL; pTscObj = NULL;
} else { } else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId); tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
destroyRequest(pRequest); destroyRequest(pRequest);
} }
...@@ -365,11 +364,13 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { ...@@ -365,11 +364,13 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
} }
pMsgSendInfo->msgType = TDMT_MND_CONNECT; pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
SConnectReq connectReq = {0}; SConnectReq connectReq = {0};
STscObj* pObj = pRequest->pTscObj; STscObj* pObj = pRequest->pTscObj;
...@@ -399,17 +400,17 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { ...@@ -399,17 +400,17 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
} }
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
assert(pMsg->ahandle != NULL); assert(pMsg->ahandle != NULL);
if (pSendInfo->requestObjRefId != 0) { if (pSendInfo->requestObjRefId != 0) {
SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
assert(pRequest->self == pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId);
pRequest->metric.rsp = taosGetTimestampMs(); pRequest->metric.rsp = taosGetTimestampMs();
pRequest->code = pMsg->code; pRequest->code = pMsg->code;
STscObj *pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
if (pEpSet) { if (pEpSet) {
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
...@@ -417,22 +418,22 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -417,22 +418,22 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
} }
/* /*
* There is not response callback function for submit response. * There is not response callback function for submit response.
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) { if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self, tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
} else { } else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self, tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
} }
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
} }
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
if (pMsg->contLen > 0) { if (pMsg->contLen > 0) {
buf.pData = calloc(1, pMsg->contLen); buf.pData = calloc(1, pMsg->contLen);
...@@ -449,7 +450,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -449,7 +450,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
destroySendMsgInfo(pSendInfo); destroySendMsgInfo(pSendInfo);
} }
TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db); tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
if (user == NULL) { if (user == NULL) {
user = TSDB_DEFAULT_USER; user = TSDB_DEFAULT_USER;
...@@ -463,16 +464,17 @@ TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, cons ...@@ -463,16 +464,17 @@ TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, cons
return taos_connect_internal(ip, user, NULL, auth, db, port); return taos_connect_internal(ip, user, NULL, auth, db, port);
} }
TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) { TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
char ipStr[TSDB_EP_LEN] = {0}; const char* db, int dbLen, uint16_t port) {
char ipStr[TSDB_EP_LEN] = {0};
char dbStr[TSDB_DB_NAME_LEN] = {0}; char dbStr[TSDB_DB_NAME_LEN] = {0};
char userStr[TSDB_USER_LEN] = {0}; char userStr[TSDB_USER_LEN] = {0};
char passStr[TSDB_PASSWORD_LEN] = {0}; char passStr[TSDB_PASSWORD_LEN] = {0};
strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen)); strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen)); strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen)); strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen)); strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
return taos_connect(ipStr, userStr, passStr, dbStr, port); return taos_connect(ipStr, userStr, passStr, dbStr, port);
} }
...@@ -490,15 +492,15 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -490,15 +492,15 @@ void* doFetchRow(SRequestObj* pRequest) {
} }
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData); int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void**)&pResInfo->pData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code; pRequest->code = code;
return NULL; return NULL;
} }
setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows, tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
pResInfo->totalRows, pResInfo->completed, pRequest->requestId); pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
if (pResultInfo->numOfRows == 0) { if (pResultInfo->numOfRows == 0) {
return NULL; return NULL;
...@@ -511,7 +513,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -511,7 +513,7 @@ void* doFetchRow(SRequestObj* pRequest) {
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
epSet = pVgroupInfo->epset; epSet = pVgroupInfo->epset;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
...@@ -522,7 +524,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -522,7 +524,7 @@ void* doFetchRow(SRequestObj* pRequest) {
return NULL; return NULL;
} }
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
pShowReq->head.vgId = htonl(pVgroupInfo->vgId); pShowReq->head.vgId = htonl(pVgroupInfo->vgId);
...@@ -533,14 +535,14 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -533,14 +535,14 @@ void* doFetchRow(SRequestObj* pRequest) {
epSet = pVgroupInfo->epset; epSet = pVgroupInfo->epset;
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
} else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
if (pResultInfo->completed) { if (pResultInfo->completed) {
return NULL; return NULL;
} }
...@@ -549,7 +551,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -549,7 +551,7 @@ void* doFetchRow(SRequestObj* pRequest) {
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
...@@ -562,7 +564,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -562,7 +564,7 @@ void* doFetchRow(SRequestObj* pRequest) {
_return: _return:
for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) { for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
pResultInfo->length[i] = varDataLen(pResultInfo->row[i]); pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
...@@ -576,8 +578,8 @@ _return: ...@@ -576,8 +578,8 @@ _return:
static void doPrepareResPtr(SReqResultInfo* pResInfo) { static void doPrepareResPtr(SReqResultInfo* pResInfo) {
if (pResInfo->row == NULL) { if (pResInfo->row == NULL) {
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES); pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
} }
} }
...@@ -594,14 +596,14 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t ...@@ -594,14 +596,14 @@ void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pResultInfo->length[i] = pResultInfo->fields[i].bytes; pResultInfo->length[i] = pResultInfo->fields[i].bytes;
pResultInfo->row[i] = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows); pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows);
pResultInfo->pCol[i] = pResultInfo->row[i]; pResultInfo->pCol[i] = pResultInfo->row[i];
offset += pResultInfo->fields[i].bytes; offset += pResultInfo->fields[i].bytes;
} }
} }
char* getDbOfConnection(STscObj* pObj) { char* getDbOfConnection(STscObj* pObj) {
char *p = NULL; char* p = NULL;
pthread_mutex_lock(&pObj->mutex); pthread_mutex_lock(&pObj->mutex);
size_t len = strlen(pObj->db); size_t len = strlen(pObj->db);
if (len > 0) { if (len > 0) {
...@@ -622,10 +624,10 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { ...@@ -622,10 +624,10 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
assert(pResultInfo != NULL && pRsp != NULL); assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*) pRsp; pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*) pRsp->data; pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows); pResultInfo->numOfRows = htonl(pRsp->numOfRows);
pResultInfo->current = 0; pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1); pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->totalRows += pResultInfo->numOfRows; pResultInfo->totalRows += pResultInfo->numOfRows;
......
...@@ -97,9 +97,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { ...@@ -97,9 +97,9 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq);
void* pReq = malloc(contLen); void* pReq = malloc(contLen);
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq); tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
pMsgSendInfo->msgInfo.pData = pReq; pMsgSendInfo->msgInfo.pData = pReq;
pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgInfo.handle = NULL;
} else { } else {
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq)); SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
if (pFetchMsg == NULL) { if (pFetchMsg == NULL) {
...@@ -111,6 +111,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) { ...@@ -111,6 +111,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
pMsgSendInfo->msgInfo.pData = pFetchMsg; pMsgSendInfo->msgInfo.pData = pFetchMsg;
pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq); pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
pMsgSendInfo->msgInfo.handle = NULL;
} }
} else { } else {
assert(pRequest != NULL); assert(pRequest != NULL);
......
...@@ -328,7 +328,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -328,7 +328,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq};
tsem_init(&param.rspSem, 0, 0); tsem_init(&param.rspSem, 0, 0);
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->param = &param; sendInfo->param = &param;
...@@ -453,7 +453,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i ...@@ -453,7 +453,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tSerializeMCreateTopicReq(buf, tlen, &req); tSerializeMCreateTopicReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/ /*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
pRequest->type = TDMT_MND_CREATE_TOPIC; pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
...@@ -779,8 +779,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -779,8 +779,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
param->pVg = pVg; param->pVg = pVg;
tsem_init(&param->rspSem, 0, 0); tsem_init(&param->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq), .handle = NULL};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
......
...@@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp ...@@ -137,7 +137,7 @@ int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransp
.pCont = pMsg, .pCont = pMsg,
.contLen = pInfo->msgInfo.len, .contLen = pInfo->msgInfo.len,
.ahandle = (void*) pInfo, .ahandle = (void*) pInfo,
.handle = NULL, .handle = pInfo->msgInfo.handle,
.code = 0 .code = 0
}; };
......
...@@ -95,6 +95,7 @@ typedef struct SSchTask { ...@@ -95,6 +95,7 @@ typedef struct SSchTask {
int32_t childReady; // child task ready number int32_t childReady; // child task ready number
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
void* handle; // task send handle
} SSchTask; } SSchTask;
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
......
...@@ -18,6 +18,10 @@ ...@@ -18,6 +18,10 @@
#include "query.h" #include "query.h"
#include "catalog.h" #include "catalog.h"
typedef struct SSchTrans {
void *transInst;
void *transHandle;
}SSchTrans;
static SSchedulerMgmt schMgmt = {0}; static SSchedulerMgmt schMgmt = {0};
uint64_t schGenTaskId(void) { uint64_t schGenTaskId(void) {
...@@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in ...@@ -932,6 +936,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
pTask = *task; pTask = *task;
SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode)); SCH_TASK_DLOG("rsp msg received, type:%s, code:%s", TMSG_INFO(msgType), tstrerror(rspCode));
pTask->handle = pMsg->handle;
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
_return: _return:
...@@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { ...@@ -1000,6 +1005,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SSchTrans *trans = (SSchTrans *)transport;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) { if (NULL == pMsgSendInfo) {
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
...@@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t ...@@ -1018,14 +1026,16 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t
param->queryId = qId; param->queryId = qId;
param->taskId = tId; param->taskId = tId;
pMsgSendInfo->param = param; pMsgSendInfo->param = param;
pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.pData = msg;
pMsgSendInfo->msgInfo.len = msgSize; pMsgSendInfo->msgInfo.len = msgSize;
pMsgSendInfo->msgInfo.handle = trans->transHandle;
pMsgSendInfo->msgType = msgType; pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp; pMsgSendInfo->fp = fp;
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo); code = asyncSendMsgToServer(trans->transInst, epSet, &transporterId, pMsgSendInfo);
if (code) { if (code) {
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
...@@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, ...@@ -1149,7 +1159,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
atomic_store_32(&pTask->lastMsgType, msgType); atomic_store_32(&pTask->lastMsgType, msgType);
SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); SSchTrans trans = {.transInst = pJob->transport, .transHandle = pTask->handle};
SCH_ERR_JRET(schAsyncSendMsg(&trans, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize));
if (isCandidateAddr) { if (isCandidateAddr) {
SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr));
......
...@@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -133,15 +133,18 @@ static void clientHandleResp(SCliConn* conn) {
rpcMsg.msgType = pHead->msgType; rpcMsg.msgType = pHead->msgType;
rpcMsg.ahandle = pCtx->ahandle; rpcMsg.ahandle = pCtx->ahandle;
if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP) { if (rpcMsg.msgType == TDMT_VND_QUERY_RSP || rpcMsg.msgType == TDMT_VND_FETCH_RSP ||
rpcMsg.msgType == TDMT_VND_RES_READY) {
rpcMsg.handle = conn; rpcMsg.handle = conn;
conn->persist = 1; conn->persist = 1;
tDebug("client conn %p persist by app", conn);
} }
tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType),
inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr),
ntohs(conn->locaddr.sin_port)); ntohs(conn->locaddr.sin_port));
conn->secured = pHead->secured;
if (conn->push != NULL && conn->ctnRdCnt != 0) { if (conn->push != NULL && conn->ctnRdCnt != 0) {
(*conn->push->callback)(conn->push->arg, &rpcMsg); (*conn->push->callback)(conn->push->arg, &rpcMsg);
conn->push = NULL; conn->push = NULL;
...@@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -156,7 +159,6 @@ static void clientHandleResp(SCliConn* conn) {
} }
} }
conn->ctnRdCnt += 1; conn->ctnRdCnt += 1;
conn->secured = pHead->secured;
// buf's mem alread translated to rpcMsg.pCont // buf's mem alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
...@@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) { ...@@ -166,16 +168,14 @@ static void clientHandleResp(SCliConn* conn) {
SCliThrdObj* pThrd = conn->hostThrd; SCliThrdObj* pThrd = conn->hostThrd;
// user owns conn->persist = 1 // user owns conn->persist = 1
if (conn->push == NULL || conn->persist == 0) { if (conn->push == NULL && conn->persist == 0) {
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
destroyCmsg(conn->data);
conn->data = NULL;
} }
destroyCmsg(conn->data);
conn->data = NULL;
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) { if (!uv_is_active((uv_handle_t*)pThrd->timer) && pRpc->idleTime > 0) {
uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
} }
} }
static void clientHandleExcept(SCliConn* pConn) { static void clientHandleExcept(SCliConn* pConn) {
...@@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b ...@@ -330,6 +330,9 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b
} }
static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later
if (handle->data == NULL) {
return;
}
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册