提交 ce9b8483 编写于 作者: Y yihaoDeng

TD-2334

上级 29408010
...@@ -307,7 +307,6 @@ typedef struct STscObj { ...@@ -307,7 +307,6 @@ typedef struct STscObj {
SRpcCorEpSet *tscCorMgmtEpSet; SRpcCorEpSet *tscCorMgmtEpSet;
void* pDnodeConn; void* pDnodeConn;
pthread_mutex_t mutex; pthread_mutex_t mutex;
T_REF_DECLARE()
} STscObj; } STscObj;
typedef struct SSubqueryState { typedef struct SSubqueryState {
...@@ -483,7 +482,6 @@ extern int tscObjRef; ...@@ -483,7 +482,6 @@ extern int tscObjRef;
extern void * tscTmr; extern void * tscTmr;
extern void * tscQhandle; extern void * tscQhandle;
extern int tscKeepConn[]; extern int tscKeepConn[];
extern int tsInsertHeadSize;
extern int tscNumOfThreads; extern int tscNumOfThreads;
extern int tscRefId; extern int tscRefId;
......
...@@ -115,9 +115,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -115,9 +115,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
pObj->signature = pObj; pObj->signature = pObj;
pObj->pDnodeConn = pDnodeConn; pObj->pDnodeConn = pDnodeConn;
T_REF_INIT_VAL(pObj, 1);
tstrncpy(pObj->user, user, sizeof(pObj->user)); tstrncpy(pObj->user, user, sizeof(pObj->user));
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass)); secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
memcpy(pObj->pass, secretEncrypt, secretEncryptLen); memcpy(pObj->pass, secretEncrypt, secretEncryptLen);
...@@ -172,11 +170,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa ...@@ -172,11 +170,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
if (taos != NULL) { if (taos != NULL) {
*taos = pObj; *taos = pObj;
} }
pObj->rid = taosAddRef(tscRefId, pObj);
registerSqlObj(pSql); registerSqlObj(pSql);
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
pObj->rid = taosAddRef(tscRefId, pObj);
return pSql; return pSql;
} }
...@@ -288,34 +284,19 @@ void taos_close(TAOS *taos) { ...@@ -288,34 +284,19 @@ void taos_close(TAOS *taos) {
return; return;
} }
// make sure that the close connection can only be executed once. SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
pObj->signature = NULL; if (pHb != NULL) {
taosTmrStopA(&(pObj->pTimer)); if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid);
if (pObj->hbrid > 0) { pHb->rpcRid = -1;
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
if (pHb != NULL) {
if (pHb->rpcRid > 0) { // wait for rsp from dnode
rpcCancelRequest(pHb->rpcRid);
pHb->rpcRid = -1;
}
tscDebug("%p HB is freed", pHb);
taos_free_result(pHb);
taosReleaseRef(tscObjRef, pHb->self);
} }
}
int32_t ref = T_REF_DEC(pObj);
assert(ref >= 0);
if (ref > 0) { tscDebug("%p HB is freed", pHb);
tscDebug("%p %d remain sqlObjs, not free tscObj and dnodeConn:%p", pObj, ref, pObj->pDnodeConn); taosReleaseRef(tscObjRef, pHb->self);
return; taos_free_result(pHb);
} }
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn); tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
taosRemoveRef(tscRefId, pObj->rid); taosRemoveRef(tscRefId, pObj->rid);
} }
......
...@@ -36,7 +36,6 @@ int tscObjRef = -1; ...@@ -36,7 +36,6 @@ int tscObjRef = -1;
void * tscTmr; void * tscTmr;
void * tscQhandle; void * tscQhandle;
void * tscCheckDiskUsageTmr; void * tscCheckDiskUsageTmr;
int tsInsertHeadSize;
int tscRefId = -1; int tscRefId = -1;
int tscNumOfThreads; int tscNumOfThreads;
......
...@@ -460,15 +460,7 @@ void tscFreeRegisteredSqlObj(void *pSql) { ...@@ -460,15 +460,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
assert(p->self != 0); assert(p->self != 0);
tscFreeSqlObj(p); tscFreeSqlObj(p);
taosReleaseRef(tscRefId, pTscObj->rid);
int32_t ref = T_REF_DEC(pTscObj);
assert(ref >= 0);
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref);
if (ref == 0) {
tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj);
taosRemoveRef(tscRefId, pTscObj->rid);
}
} }
void tscFreeTableMetaHelper(void *pTableMeta) { void tscFreeTableMetaHelper(void *pTableMeta) {
...@@ -810,6 +802,7 @@ static void extractTableMeta(SSqlCmd* pCmd) { ...@@ -810,6 +802,7 @@ static void extractTableMeta(SSqlCmd* pCmd) {
} }
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) { int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
...@@ -824,7 +817,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) { ...@@ -824,7 +817,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
STableDataBlocks* dataBuf = NULL; STableDataBlocks* dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList); INSERT_HEAD_SIZE, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret); tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
taosHashCleanup(pVnodeDataBlockHashList); taosHashCleanup(pVnodeDataBlockHashList);
...@@ -1917,9 +1910,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { ...@@ -1917,9 +1910,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
} }
void registerSqlObj(SSqlObj* pSql) { void registerSqlObj(SSqlObj* pSql) {
int32_t ref = T_REF_INC(pSql->pTscObj); taosAcquireRef(tscRefId, pSql->pTscObj->rid);
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
pSql->self = taosAddRef(tscObjRef, pSql); pSql->self = taosAddRef(tscObjRef, pSql);
} }
...@@ -2626,4 +2617,4 @@ int32_t copyTagData(STagData* dst, const STagData* src) { ...@@ -2626,4 +2617,4 @@ int32_t copyTagData(STagData* dst, const STagData* src) {
} }
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册