提交 14952d7f 编写于 作者: H Haojun Liao

[td-10564] fix bug in taos_close

上级 8ff4a07d
......@@ -58,7 +58,7 @@ typedef struct SAppInstInfo {
SCorEpSet mgmtEp;
SInstanceActivity summary;
SList *pConnList; // STscObj linked list
char clusterId[TSDB_CLUSTER_ID_LEN];
uint32_t clusterId;
void *pTransporter;
} SAppInstInfo;
......@@ -127,7 +127,7 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t
void destroyTscObj(void*pObj);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(void* p);
void destroyRequest(SRequestObj* pRequest);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
......
......@@ -172,7 +172,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("%p connection is opening, dnodeConn:%p", pTscObj, pTscObj->pTransporter);
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
......@@ -267,6 +267,8 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
tstrerror(pMsg->code), pMsg->contLen);
}
taosReleaseRef(requestRefId, requestRefId);
taosReleaseRef(tscReqRef, requestRefId);
rpcFreeCont(pMsg->pCont);
sem_post(&pRequest->body.rspSem);
}
......@@ -70,7 +70,10 @@ void taos_close(TAOS* taos) {
return;
}
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(tscConnRef, pTscObj->id);
}
const char *taos_errstr(TAOS_RES *res) {
......
......@@ -36,8 +36,6 @@ int32_t tscConnRef = -1;
void *tscQhandle = NULL;
int32_t tsNumOfThreads = 1;
pthread_mutex_t rpcObjMutex; // mutex to protect open the rpc obj concurrently
volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj* pRequest) {
......@@ -65,12 +63,10 @@ static void deregisterRequest(SRequestObj* pRequest) {
STscObj* pTscObj = pRequest->pTscObj;
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary;
taosReleaseRef(tscReqRef, pRequest->self);
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
tscDebug("0x%"PRIx64" free Request from 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst);
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst);
taosReleaseRef(tscConnRef, pTscObj->id);
}
......@@ -90,15 +86,6 @@ static void tscInitLogFile() {
}
}
void tscFreeRpcObj(void *param) {
#if 0
assert(param);
SRpcObj *pRpcObj = (SRpcObj *)(param);
tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn);
rpcClose(pRpcObj->pDnodeConn);
#endif
}
void closeTransporter(STscObj* pTscObj) {
if (pTscObj == NULL || pTscObj->pTransporter == NULL) {
return;
......@@ -136,7 +123,9 @@ void* openTransporter(const char *user, const char *auth) {
void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj;
tscDebug("connect obj destroyed, 0x%"PRIx64, pTscObj->id);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
closeTransporter(pTscObj);
pthread_mutex_destroy(&pTscObj->mutex);
......@@ -161,7 +150,7 @@ void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t
pthread_mutex_init(&pObj->mutex, NULL);
pObj->id = taosAddRef(tscConnRef, pObj);
tscDebug("connect obj created, 0x%"PRIx64, pObj->id);
tscDebug("connObj created, 0x%"PRIx64, pObj->id);
return pObj;
}
......@@ -189,9 +178,9 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
return pRequest;
}
void destroyRequest(void* p) {
static void doDestroyRequest(void* p) {
assert(p != NULL);
SRequestObj* pRequest = *(SRequestObj**)p;
SRequestObj* pRequest = (SRequestObj*)p;
assert(RID_VALID(pRequest->self));
......@@ -202,6 +191,14 @@ void destroyRequest(void* p) {
deregisterRequest(pRequest);
}
void destroyRequest(SRequestObj* pRequest) {
if (pRequest == NULL) {
return;
}
taosReleaseRef(tscReqRef, pRequest->self);
}
void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
......@@ -242,7 +239,7 @@ void taos_init_imp(void) {
tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads);
tscConnRef = taosOpenRef(200, destroyTscObj);
tscReqRef = taosOpenRef(40960, destroyRequest);
tscReqRef = taosOpenRef(40960, doDestroyRequest);
taosGetAppName(appInfo.appName, NULL);
appInfo.pid = taosGetPId();
......
......@@ -33,5 +33,8 @@ int main(int argc, char** argv) {
}
TEST(testCase, driverInit_Test) {
TAOS* pTaos = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
taos_close(pConn);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册