提交 68fd418c 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/dnode3

...@@ -93,7 +93,22 @@ typedef struct STableMetaOutput { ...@@ -93,7 +93,22 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta; STableMeta *tbMeta;
} STableMetaOutput; } STableMetaOutput;
typedef int32_t __async_exec_fn_t(void* param); typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
...@@ -109,7 +124,9 @@ int32_t cleanupTaskQueue(); ...@@ -109,7 +124,9 @@ int32_t cleanupTaskQueue();
*/ */
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
SSchema* tGetTbnameColumnSchema(); int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
const SSchema* tGetTbnameColumnSchema();
void initQueryModuleMsgHandle(); void initQueryModuleMsgHandle();
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
......
...@@ -29,6 +29,7 @@ extern "C" { ...@@ -29,6 +29,7 @@ extern "C" {
#include "tlist.h" #include "tlist.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#include "trpc.h" #include "trpc.h"
#include "query.h"
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp int64_t start; // start timestamp
...@@ -37,7 +38,7 @@ typedef struct SQueryExecMetric { ...@@ -37,7 +38,7 @@ typedef struct SQueryExecMetric {
int64_t rsp; // receive response from server int64_t rsp; // receive response from server
} SQueryExecMetric; } SQueryExecMetric;
typedef struct SInstanceActivity { typedef struct SInstanceSummary {
uint64_t numOfInsertsReq; uint64_t numOfInsertsReq;
uint64_t numOfInsertRows; uint64_t numOfInsertRows;
uint64_t insertElapsedTime; uint64_t insertElapsedTime;
...@@ -48,7 +49,7 @@ typedef struct SInstanceActivity { ...@@ -48,7 +49,7 @@ typedef struct SInstanceActivity {
uint64_t numOfSlowQueries; uint64_t numOfSlowQueries;
uint64_t totalRequests; uint64_t totalRequests;
uint64_t currentRequests; // the number of SRequestObj uint64_t currentRequests; // the number of SRequestObj
} SInstanceActivity; } SInstanceSummary;
typedef struct SHeartBeatInfo { typedef struct SHeartBeatInfo {
void *pTimer; // timer, used to send request msg to mnode void *pTimer; // timer, used to send request msg to mnode
...@@ -57,7 +58,7 @@ typedef struct SHeartBeatInfo { ...@@ -57,7 +58,7 @@ typedef struct SHeartBeatInfo {
typedef struct SAppInstInfo { typedef struct SAppInstInfo {
int64_t numOfConns; int64_t numOfConns;
SCorEpSet mgmtEp; SCorEpSet mgmtEp;
SInstanceActivity summary; SInstanceSummary summary;
SList *pConnList; // STscObj linked list SList *pConnList; // STscObj linked list
int64_t clusterId; int64_t clusterId;
void *pTransporter; void *pTransporter;
...@@ -100,16 +101,11 @@ typedef struct SReqResultInfo { ...@@ -100,16 +101,11 @@ typedef struct SReqResultInfo {
uint32_t current; uint32_t current;
} SReqResultInfo; } SReqResultInfo;
typedef struct SReqMsg {
void *pMsg;
uint32_t len;
} SReqMsgInfo;
typedef struct SRequestSendRecvBody { typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now tsem_t rspSem; // not used now
void* fp; void* fp;
int64_t execId; // showId/queryId int64_t execId; // showId/queryId
SReqMsgInfo requestMsg; SDataBuf requestMsg;
SReqResultInfo resInfo; SReqResultInfo resInfo;
} SRequestSendRecvBody; } SRequestSendRecvBody;
...@@ -119,29 +115,23 @@ typedef struct SRequestObj { ...@@ -119,29 +115,23 @@ typedef struct SRequestObj {
uint64_t requestId; uint64_t requestId;
int32_t type; // request type int32_t type; // request type
STscObj *pTscObj; STscObj *pTscObj;
SQueryExecMetric metric;
char *sqlstr; // sql string char *sqlstr; // sql string
int32_t sqlLen; int32_t sqlLen;
SRequestSendRecvBody body;
int64_t self; int64_t self;
char *msgBuf; char *msgBuf;
int32_t code;
void *pInfo; // sql parse info, generated by parser module void *pInfo; // sql parse info, generated by parser module
int32_t code;
SQueryExecMetric metric;
SRequestSendRecvBody body;
} SRequestObj; } SRequestObj;
typedef struct SRequestMsgBody {
int32_t msgType;
SReqMsgInfo msgInfo;
uint64_t requestId;
uint64_t requestObjRefId;
} SRequestMsgBody;
extern SAppInfo appInfo; extern SAppInfo appInfo;
extern int32_t tscReqRef; extern int32_t clientReqRefPool;
extern int32_t tscConnRef; extern int32_t clientConnRefPool;
SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest); extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
extern int (*handleRequestRspFp[TDMT_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen); int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj*);
int taos_init(); int taos_init();
...@@ -165,7 +155,7 @@ void initMsgHandleFp(); ...@@ -165,7 +155,7 @@ void initMsgHandleFp();
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
void* doFetchRow(SRequestObj* pRequest); void *doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <catalog.h> #include "os.h"
#include "catalog.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "os.h"
#include "query.h" #include "query.h"
#include "tmsg.h" #include "tmsg.h"
#include "tcache.h" #include "tcache.h"
...@@ -32,26 +32,26 @@ ...@@ -32,26 +32,26 @@
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
SAppInfo appInfo; SAppInfo appInfo;
int32_t tscReqRef = -1; int32_t clientReqRefPool = -1;
int32_t tscConnRef = -1; int32_t clientConnRefPool = -1;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0; volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj* pRequest) { static void registerRequest(SRequestObj* pRequest) {
STscObj *pTscObj = (STscObj *)taosAcquireRef(tscConnRef, pRequest->pTscObj->id); STscObj *pTscObj = (STscObj *)taosAcquireRef(clientConnRefPool, pRequest->pTscObj->id);
assert(pTscObj != NULL); assert(pTscObj != NULL);
// connection has been released already, abort creating request. // connection has been released already, abort creating request.
pRequest->self = taosAddRef(tscReqRef, pRequest); pRequest->self = taosAddRef(clientReqRefPool, pRequest);
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
if (pTscObj->pAppInfo) { if (pTscObj->pAppInfo) {
SInstanceActivity *pActivity = &pTscObj->pAppInfo->summary; SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1); int32_t total = atomic_add_fetch_32(&pSummary->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1); int32_t currentInst = atomic_add_fetch_32(&pSummary->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self, tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
pRequest->pTscObj->id, num, currentInst, total); pRequest->pTscObj->id, num, currentInst, total);
} }
...@@ -61,13 +61,13 @@ static void deregisterRequest(SRequestObj* pRequest) { ...@@ -61,13 +61,13 @@ static void deregisterRequest(SRequestObj* pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary; SInstanceSummary* pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst); tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst);
taosReleaseRef(tscConnRef, pTscObj->id); taosReleaseRef(clientConnRefPool, pTscObj->id);
} }
static void tscInitLogFile() { static void tscInitLogFile() {
...@@ -150,7 +150,7 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI ...@@ -150,7 +150,7 @@ void* createTscObj(const char* user, const char* auth, const char *db, SAppInstI
} }
pthread_mutex_init(&pObj->mutex, NULL); pthread_mutex_init(&pObj->mutex, NULL);
pObj->id = taosAddRef(tscConnRef, pObj); pObj->id = taosAddRef(clientConnRefPool, pObj);
tscDebug("connObj created, 0x%"PRIx64, pObj->id); tscDebug("connObj created, 0x%"PRIx64, pObj->id);
return pObj; return pObj;
...@@ -167,13 +167,11 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty ...@@ -167,13 +167,11 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
// TODO generated request uuid // TODO generated request uuid
pRequest->requestId = 0; pRequest->requestId = 0;
pRequest->metric.start = taosGetTimestampMs(); pRequest->metric.start = taosGetTimestampMs();
pRequest->type = type; pRequest->type = type;
pRequest->pTscObj = pObj; pRequest->pTscObj = pObj;
pRequest->body.fp = fp; pRequest->body.fp = fp; // not used it yet
// pRequest->body.requestMsg. = param;
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
tsem_init(&pRequest->body.rspSem, 0, 0); tsem_init(&pRequest->body.rspSem, 0, 0);
...@@ -202,7 +200,7 @@ void destroyRequest(SRequestObj* pRequest) { ...@@ -202,7 +200,7 @@ void destroyRequest(SRequestObj* pRequest) {
return; return;
} }
taosReleaseRef(tscReqRef, pRequest->self); taosReleaseRef(clientReqRefPool, pRequest->self);
} }
void taos_init_imp(void) { void taos_init_imp(void) {
...@@ -238,8 +236,8 @@ void taos_init_imp(void) { ...@@ -238,8 +236,8 @@ void taos_init_imp(void) {
initTaskQueue(); initTaskQueue();
tscConnRef = taosOpenRef(200, destroyTscObj); clientConnRefPool = taosOpenRef(200, destroyTscObj);
tscReqRef = taosOpenRef(40960, doDestroyRequest); clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
taosGetAppName(appInfo.appName, NULL); taosGetAppName(appInfo.appName, NULL);
appInfo.pid = taosGetPId(); appInfo.pid = taosGetPId();
......
...@@ -22,10 +22,8 @@ ...@@ -22,10 +22,8 @@
} while (0) } while (0)
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody); static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId);
static bool stringLengthCheck(const char* str, size_t maxsize) { static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) { if (str == NULL) {
...@@ -162,10 +160,12 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) { ...@@ -162,10 +160,12 @@ int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
pRequest->type = pDcl->msgType; pRequest->type = pDcl->msgType;
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen}; pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
STscObj* pTscObj = pRequest->pTscObj;
SRequestMsgBody body = buildRequestMsgImpl(pRequest); SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
SEpSet* pEpSet = &pRequest->pTscObj->pAppInfo->mgmtEp.epSet; SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
if (pDcl->msgType == TDMT_MND_CREATE_TABLE) { if (pDcl->msgType == TDMT_MND_CREATE_TABLE) {
struct SCatalog* pCatalog = NULL; struct SCatalog* pCatalog = NULL;
...@@ -177,7 +177,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -177,7 +177,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
return code; return code;
} }
SCreateTableMsg* pMsg = body.msgInfo.pMsg; SCreateTableMsg* pMsg = body->msgInfo.pData;
SName t = {0}; SName t = {0};
tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE); tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
...@@ -197,14 +197,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { ...@@ -197,14 +197,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i])); tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
} }
sendMsgToServer(pRequest->pTscObj->pTransporter, &ep, &body, &transporterId); asyncSendMsgToServer(pTscObj->pTransporter, &ep, &transporterId, body);
} else { } else {
int64_t transporterId = 0; int64_t transporterId = 0;
sendMsgToServer(pRequest->pTscObj->pTransporter, pEpSet, &body, &transporterId); asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);
} }
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroyRequestMsgBody(&body); destroySendMsgInfo(body);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -296,14 +296,13 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con ...@@ -296,14 +296,13 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
return NULL; return NULL;
} }
SRequestMsgBody body = {0}; SMsgSendInfo* body = buildConnectMsg(pRequest);
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0; int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroyRequestMsgBody(&body); destroySendMsgInfo(body);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
...@@ -320,15 +319,25 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con ...@@ -320,15 +319,25 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
return pTscObj; return pTscObj;
} }
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
pMsgBody->msgType = TDMT_MND_CONNECT; SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
pMsgBody->msgInfo.len = sizeof(SConnectMsg); if (pMsgSendInfo == NULL) {
pMsgBody->requestObjRefId = pRequest->self; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->msgInfo.len = sizeof(SConnectMsg);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType];
pMsgSendInfo->param = pRequest;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
if (pConnect == NULL) { if (pConnect == NULL) {
tfree(pMsgSendInfo);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1; return NULL;
} }
STscObj *pObj = pRequest->pTscObj; STscObj *pObj = pRequest->pTscObj;
...@@ -341,49 +350,49 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) ...@@ -341,49 +350,49 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
pConnect->startTime = htobe64(appInfo.startTime); pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
pMsgBody->msgInfo.pMsg = pConnect; pMsgSendInfo->msgInfo.pData = pConnect;
return 0; return pMsgSendInfo;
} }
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) { static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL); assert(pMsgBody != NULL);
tfree(pMsgBody->msgInfo.pMsg); tfree(pMsgBody->msgInfo.pData);
tfree(pMsgBody);
} }
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) { int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
char *pMsg = rpcMallocCont(pBody->msgInfo.len); char *pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, TMSG_INFO(pBody->msgType)); tscError("0x%"PRIx64" msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1; return -1;
} }
memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len); memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pBody->msgType, .msgType = pInfo->msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pBody->msgInfo.len, .contLen = pInfo->msgInfo.len,
.ahandle = (void*) pBody->requestObjRefId, .ahandle = (void*) pInfo,
.handle = NULL, .handle = NULL,
.code = 0 .code = 0
}; };
assert(pInfo->fp != NULL);
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId); rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int64_t requestRefId = (int64_t)pMsg->ahandle; SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
assert(pMsg->ahandle != NULL);
SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId); if (pSendInfo->requestObjRefId != 0) {
if (pRequest == NULL) { SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
rpcFreeCont(pMsg->pCont); assert(pRequest->self == pSendInfo->requestObjRefId);
return;
}
assert(pRequest->self == requestRefId);
pRequest->metric.rsp = taosGetTimestampMs(); pRequest->metric.rsp = taosGetTimestampMs();
pRequest->code = pMsg->code; pRequest->code = pMsg->code;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
...@@ -398,27 +407,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { ...@@ -398,27 +407,21 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
* The actual inserted number of points is the first number. * The actual inserted number of points is the first number.
*/ */
if (pMsg->code == TSDB_CODE_SUCCESS) { if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, TMSG_INFO(pMsg->msgType), tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%" PRId64 " ms", pRequest->requestId,
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
if (handleRequestRspFp[pRequest->type]) { pRequest->metric.rsp - pRequest->metric.start);
char *p = malloc(pMsg->contLen);
if (p == NULL) {
pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
terrno = pRequest->code;
} else { } else {
memcpy(p, pMsg->pCont, pMsg->contLen); tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%" PRId64 " ms", pRequest->requestId,
pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen); TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen,
} pRequest->metric.rsp - pRequest->metric.start);
} }
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, TMSG_INFO(pMsg->msgType), taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
} }
taosReleaseRef(tscReqRef, requestRefId); SDataBuf buf = {.pData = pMsg->pCont, .len = pMsg->contLen};
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
sem_post(&pRequest->body.rspSem);
} }
TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) { TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
...@@ -455,14 +458,14 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -455,14 +458,14 @@ void* doFetchRow(SRequestObj* pRequest) {
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE; pRequest->type = TDMT_MND_SHOW_RETRIEVE;
SRequestMsgBody body = buildRequestMsgImpl(pRequest); SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj* pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroyRequestMsgBody(&body); destroySendMsgInfo(body);
pResultInfo->current = 0; pResultInfo->current = 0;
if (pResultInfo->numOfRows <= pResultInfo->current) { if (pResultInfo->numOfRows <= pResultInfo->current) {
......
...@@ -35,14 +35,14 @@ void taos_cleanup(void) { ...@@ -35,14 +35,14 @@ void taos_cleanup(void) {
return; return;
} }
int32_t id = tscReqRef; int32_t id = clientReqRefPool;
tscReqRef = -1; clientReqRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
cleanupTaskQueue(); cleanupTaskQueue();
id = tscConnRef; id = clientConnRefPool;
tscConnRef = -1; clientConnRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
rpcCleanup(); rpcCleanup();
...@@ -72,7 +72,7 @@ void taos_close(TAOS* taos) { ...@@ -72,7 +72,7 @@ void taos_close(TAOS* taos) {
STscObj *pTscObj = (STscObj *)taos; STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs); tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(tscConnRef, pTscObj->id); taosRemoveRef(clientConnRefPool, pTscObj->id);
} }
int taos_errno(TAOS_RES *tres) { int taos_errno(TAOS_RES *tres) {
...@@ -130,7 +130,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { ...@@ -130,7 +130,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return NULL; return NULL;
} }
return taos_query_l(taos, sql, strlen(sql)); return taos_query_l(taos, sql, (int32_t) strlen(sql));
} }
TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
...@@ -140,7 +140,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { ...@@ -140,7 +140,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
SRequestObj *pRequest = (SRequestObj *) pRes; SRequestObj *pRequest = (SRequestObj *) pRes;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pRequest->type == TSDB_SQL_INSERT) { pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) {
return NULL; return NULL;
} }
......
...@@ -18,45 +18,23 @@ ...@@ -18,45 +18,23 @@
#include "tname.h" #include "tname.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "tmsgtype.h"
#include "trpc.h" #include "trpc.h"
int (*handleRequestRspFp[TDMT_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen); int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
pMsgBody->msgType = TDMT_MND_CONNECT; SRequestObj* pRequest = param;
pMsgBody->msgInfo.len = sizeof(SConnectMsg); pRequest->code = code;
pMsgBody->requestObjRefId = pRequest->self; sem_post(&pRequest->body.rspSem);
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
if (pConnect == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
// TODO refactor full_name
char *db; // ugly code to move the space
STscObj *pObj = pRequest->pTscObj;
pthread_mutex_lock(&pObj->mutex);
db = strstr(pObj->db, TS_PATH_DELIMITER);
db = (db == NULL) ? pObj->db : db + 1;
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
pthread_mutex_unlock(&pObj->mutex);
pConnect->pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
pMsgBody->msgInfo.pMsg = pConnect;
return 0; return 0;
} }
int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SConnectRsp *pConnect = (SConnectRsp *)pMsg; SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData;
pConnect->acctId = htonl(pConnect->acctId); pConnect->acctId = htonl(pConnect->acctId);
pConnect->connId = htonl(pConnect->connId); pConnect->connId = htonl(pConnect->connId);
pConnect->clusterId = htobe64(pConnect->clusterId); pConnect->clusterId = htobe64(pConnect->clusterId);
...@@ -81,16 +59,20 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { ...@@ -81,16 +59,20 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
pTscObj->pAppInfo->clusterId = pConnect->clusterId; pTscObj->pAppInfo->clusterId = pConnect->clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pRequest->body.resInfo.pRspMsg = pMsg; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
sem_post(&pRequest->body.rspSem);
return 0; return 0;
} }
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) {
pMsgBody->msgType = TDMT_MND_SHOW_RETRIEVE; pMsgSendInfo->msgType = TDMT_MND_SHOW_RETRIEVE;
pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg); pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
pMsgBody->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->fp = handleRequestRspFp[pMsgSendInfo->msgType];
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg)); SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
if (pRetrieveMsg == NULL) { if (pRetrieveMsg == NULL) {
...@@ -98,29 +80,38 @@ static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMs ...@@ -98,29 +80,38 @@ static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMs
} }
pRetrieveMsg->showId = htonl(pRequest->body.execId); pRetrieveMsg->showId = htonl(pRequest->body.execId);
pMsgBody->msgInfo.pMsg = pRetrieveMsg; pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest) { SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) { if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
SRequestMsgBody body = {0}; buildRetrieveMnodeMsg(pRequest, pMsgSendInfo);
buildRetrieveMnodeMsg(pRequest, &body);
return body;
} else { } else {
assert(pRequest != NULL); assert(pRequest != NULL);
SRequestMsgBody body = { pMsgSendInfo->requestObjRefId = pRequest->self;
.requestObjRefId = pRequest->self, pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
.msgInfo = pRequest->body.requestMsg, pMsgSendInfo->msgType = pRequest->type;
.msgType = pRequest->type, pMsgSendInfo->requestId = pRequest->requestId;
.requestId = pRequest->requestId, pMsgSendInfo->param = pRequest;
};
return body; pMsgSendInfo->fp = (handleRequestRspFp[pRequest->type] == NULL)? genericRspCallback:handleRequestRspFp[pRequest->type];
} }
return pMsgSendInfo;
} }
int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SShowRsp* pShow = (SShowRsp *)pMsg; SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
tsem_post(&pRequest->body.rspSem);
return code;
}
SShowRsp* pShow = (SShowRsp *)pMsg->pData;
pShow->showId = htonl(pShow->showId); pShow->showId = htonl(pShow->showId);
STableMetaMsg *pMetaMsg = &(pShow->tableMeta); STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
...@@ -141,7 +132,7 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) ...@@ -141,7 +132,7 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
pFields[i].bytes = pSchema[i].bytes; pFields[i].bytes = pSchema[i].bytes;
} }
pRequest->body.resInfo.pRspMsg = pMsg; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
pResInfo->fields = pFields; pResInfo->fields = pFields;
...@@ -151,16 +142,18 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) ...@@ -151,16 +142,18 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t)); pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
pRequest->body.execId = pShow->showId; pRequest->body.execId = pShow->showId;
tsem_post(&pRequest->body.rspSem);
return 0; return 0;
} }
int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(msgLen >= sizeof(SRetrieveTableRsp)); assert(pMsg->len >= sizeof(SRetrieveTableRsp));
tfree(pRequest->body.resInfo.pRspMsg); SRequestObj* pRequest = param;
pRequest->body.resInfo.pRspMsg = pMsg; // tfree(pRequest->body.resInfo.pRspMsg);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg; SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows); pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
pRetrieve->precision = htons(pRetrieve->precision); pRetrieve->precision = htons(pRetrieve->precision);
...@@ -173,29 +166,42 @@ int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t ...@@ -173,29 +166,42 @@ int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows, tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
pRetrieve->completed, pRequest->body.execId); pRetrieve->completed, pRequest->body.execId);
tsem_post(&pRequest->body.rspSem);
return 0; return 0;
} }
int32_t processCreateDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo rsp with the vnode id list // todo rsp with the vnode id list
SRequestObj* pRequest = param;
tsem_post(&pRequest->body.rspSem);
} }
int32_t processUseDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg; SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData;
SName name = {0}; SName name = {0};
tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB); tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB);
char db[TSDB_DB_NAME_LEN] = {0}; char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db); tNameGetDbName(&name, db);
SRequestObj* pRequest = param;
setConnectionDB(pRequest->pTscObj, db); setConnectionDB(pRequest->pTscObj, db);
tsem_post(&pRequest->body.rspSem);
return 0;
} }
int32_t processCreateTableRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
assert(pMsg != NULL); assert(pMsg != NULL);
SRequestObj* pRequest = param;
tsem_post(&pRequest->body.rspSem);
} }
int32_t processDropDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// todo: Remove cache in catalog cache. // todo: Remove cache in catalog cache.
SRequestObj* pRequest = param;
tsem_post(&pRequest->body.rspSem);
} }
void initMsgHandleFp() { void initMsgHandleFp() {
......
...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ...@@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST}) ADD_EXECUTABLE(clientTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES( TARGET_LINK_LIBRARIES(
clientTest clientTest
PUBLIC os util common transport gtest taos PUBLIC os util common transport gtest taos qcom
) )
TARGET_INCLUDE_DIRECTORIES( TARGET_INCLUDE_DIRECTORIES(
......
...@@ -49,13 +49,13 @@ int main(int argc, char** argv) { ...@@ -49,13 +49,13 @@ int main(int argc, char** argv) {
TEST(testCase, driverInit_Test) { taos_init(); } TEST(testCase, driverInit_Test) { taos_init(); }
TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, create_user_Test) { TEST(testCase, create_user_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -68,7 +68,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } ...@@ -68,7 +68,7 @@ TEST(testCase, driverInit_Test) { taos_init(); }
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, create_account_Test) { TEST(testCase, create_account_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -81,7 +81,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } ...@@ -81,7 +81,7 @@ TEST(testCase, driverInit_Test) { taos_init(); }
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, drop_account_Test) { TEST(testCase, drop_account_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -94,9 +94,9 @@ TEST(testCase, driverInit_Test) { taos_init(); } ...@@ -94,9 +94,9 @@ TEST(testCase, driverInit_Test) { taos_init(); }
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, show_user_Test) { TEST(testCase, show_user_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show users"); TAOS_RES* pRes = taos_query(pConn, "show users");
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
...@@ -113,7 +113,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } ...@@ -113,7 +113,7 @@ TEST(testCase, driverInit_Test) { taos_init(); }
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, drop_user_Test) { TEST(testCase, drop_user_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
...@@ -126,7 +126,7 @@ TEST(testCase, driverInit_Test) { taos_init(); } ...@@ -126,7 +126,7 @@ TEST(testCase, driverInit_Test) { taos_init(); }
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, show_db_Test) { TEST(testCase, show_db_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
...@@ -191,10 +191,15 @@ TEST(testCase, drop_db_test) { ...@@ -191,10 +191,15 @@ TEST(testCase, drop_db_test) {
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
showDB(pConn); showDB(pConn);
pRes = taos_query(pConn, "create database abc1");
if (taos_errno(pRes) != 0) {
printf("create to drop db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
...@@ -248,7 +253,52 @@ TEST(testCase, show_stable_Test) { ...@@ -248,7 +253,52 @@ TEST(testCase, show_stable_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show stables"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "show stables");
if (taos_errno(pRes) != 0) {
printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_vgroup_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "show vgroups");
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SCreateAcctMsg* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen); SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SCreateStbMsg* buildCreateTableMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SDropTableMsg* buildDropTableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
......
...@@ -86,7 +86,7 @@ SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, cha ...@@ -86,7 +86,7 @@ SDropUserMsg* buildDropUserMsg(SSqlInfo* pInfo, int32_t *msgLen, int64_t id, cha
return pMsg; return pMsg;
} }
SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t msgLen) { SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, int32_t msgLen) {
SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg)); SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg));
pShowMsg->type = pShowInfo->showType; pShowMsg->type = pShowInfo->showType;
...@@ -105,6 +105,12 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t m ...@@ -105,6 +105,12 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, int64_t id, char* msgBuf, int32_t m
pShowMsg->payloadLen = htons(pEpAddr->n); pShowMsg->payloadLen = htons(pEpAddr->n);
} }
if (pShowInfo->showType == TSDB_MGMT_TABLE_STB || pShowInfo->showType == TSDB_MGMT_TABLE_VGROUP) {
SName n = {0};
tNameSetDbName(&n, pCtx->acctId, pCtx->db, strlen(pCtx->db));
tNameGetFullDbName(&n, pShowMsg->db);
}
return pShowMsg; return pShowMsg;
} }
......
...@@ -4090,7 +4090,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou ...@@ -4090,7 +4090,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, void** ou
} }
} }
*output = buildShowMsg(pShowInfo, pCtx->requestId, pMsgBuf->buf, pMsgBuf->len); *output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
*outputLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/; *outputLen = sizeof(SShowMsg)/* + htons(pShowMsg->payloadLen)*/;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -14,7 +14,7 @@ static struct SSchema _s = { ...@@ -14,7 +14,7 @@ static struct SSchema _s = {
.name = "tbname", .name = "tbname",
}; };
SSchema* tGetTbnameColumnSchema() { const SSchema* tGetTbnameColumnSchema() {
return &_s; return &_s;
} }
...@@ -103,7 +103,7 @@ int32_t cleanupTaskQueue() { ...@@ -103,7 +103,7 @@ int32_t cleanupTaskQueue() {
static void execHelper(struct SSchedMsg* pSchedMsg) { static void execHelper(struct SSchedMsg* pSchedMsg) {
assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL); assert(pSchedMsg != NULL && pSchedMsg->ahandle != NULL);
__async_exec_fn_t* execFn = (__async_exec_fn_t*) pSchedMsg->ahandle; __async_exec_fn_t execFn = (__async_exec_fn_t) pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle); int32_t code = execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) { if (code != 0 && pSchedMsg->msg != NULL) {
*(int32_t*) pSchedMsg->msg = code; *(int32_t*) pSchedMsg->msg = code;
......
...@@ -32,7 +32,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ ...@@ -32,7 +32,7 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
buildConnectMsg(pRequest, &body); buildConnectMsg(pRequest, &body);
int64_t transporterId = 0; int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body); destroyConnectMsg(&body);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册