提交 d862f2b4 编写于 作者: D dapan1121

fix TAOS ref issue

上级 38a481af
......@@ -128,7 +128,7 @@ typedef struct STscObj {
int8_t connType;
int32_t acctId;
uint32_t connId;
uint64_t id; // ref ID returned by taosAddRef
TAOS *id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
......
......@@ -38,7 +38,7 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
STscObj *pTscObj = acquireTscObj(*(int64_t*)pRequest->pTscObj->id);
assert(pTscObj != NULL);
......@@ -54,7 +54,7 @@ static void registerRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
pRequest->self, *(int64_t*)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
}
}
......@@ -70,8 +70,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
releaseTscObj(pTscObj->id);
pRequest->self, *(int64_t*)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
releaseTscObj(*(int64_t*)pTscObj->id);
}
// todo close the transporter properly
......@@ -80,7 +80,7 @@ void closeTransporter(STscObj *pTscObj) {
return;
}
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t*)pTscObj->id);
rpcClose(pTscObj->pAppInfo->pTransporter);
}
......@@ -128,7 +128,7 @@ void closeAllRequests(SHashObj *pRequests) {
void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj;
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
SClientHbKey connKey = {.tscRid = *(int64_t*)pTscObj->id, .connType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
......@@ -137,7 +137,7 @@ void destroyTscObj(void *pObj) {
// TODO
//closeTransporter(pTscObj);
}
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t*)pTscObj->id, pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj);
}
......@@ -166,10 +166,11 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
}
taosThreadMutexInit(&pObj->mutex, NULL);
pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->id = taosMemoryMalloc(sizeof(int64_t));
*(int64_t*)pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 1;
tscDebug("connObj created, 0x%" PRIx64, pObj->id);
tscDebug("connObj created, 0x%" PRIx64, *(int64_t*)pObj->id);
return pObj;
}
......
......@@ -263,6 +263,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
// drop table if exists not_exists_table
if (NULL == pQuery->pCmdMsg) {
pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
return TSDB_CODE_SUCCESS;
}
......@@ -722,7 +723,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
code = asyncExecDdlQuery(pRequest, pQuery);
break;
case QUERY_EXEC_MODE_SCHEDULE: {
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
SArray* pNodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
pRequest->type = pQuery->msgType;
......@@ -735,12 +736,10 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
if (code) {
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
}
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
if (code) {
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
}
if (TSDB_CODE_SUCCESS == code) {
......@@ -937,7 +936,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
taos_close_internal(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, *(int64_t*)pTscObj->id,
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
destroyRequest(pRequest);
}
......@@ -1100,9 +1099,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
if (pObj) {
uint64_t *id = taosMemoryMalloc(sizeof(uint64_t));
*id = pObj->id;
return (TAOS*)id;
return pObj->id;
}
return NULL;
......
......@@ -99,9 +99,7 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
STscObj* pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
if (pObj) {
uint64_t *id = taosMemoryMalloc(sizeof(uint64_t));
*id = pObj->id;
return (TAOS*)id;
return pObj->id;
}
return NULL;
......@@ -113,9 +111,9 @@ void taos_close_internal(void *taos) {
}
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t*)pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(clientConnRefPool, pTscObj->id);
taosRemoveRef(clientConnRefPool, *(int64_t*)pTscObj->id);
}
void taos_close(TAOS *taos) {
......@@ -683,6 +681,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
}
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
tscDebug("enter meta callback, code %s", tstrerror(code));
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
......
......@@ -77,7 +77,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, *(int64_t*)pTscObj->id);
}
pTscObj->connId = connectRsp.connId;
......@@ -90,7 +90,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connType = connectRsp.connType;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, *(int64_t*)pTscObj->id, connectRsp.clusterId, connectRsp.connType);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
......
......@@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_ADD_COLUMN: {
int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
const char *errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
......@@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_ADD_TAG: {
int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
const char *errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
......@@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
pos--;
++freeBytes;
outBytes = snprintf(pos, freeBytes, ")");
TAOS_RES *res = taos_query((TAOS*)info->taos->id, result);
TAOS_RES *res = taos_query(info->taos->id, result);
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......
......@@ -204,8 +204,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -213,7 +213,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -35,7 +35,7 @@ typedef struct SPhysiPlanContext {
int32_t errCode;
int16_t nextDataBlockId;
SArray* pLocationHelper;
SArray* pExecNodeList;
SArray* pExecNodeList; // SArray<SQueryNodeLoad>
} SPhysiPlanContext;
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
......@@ -459,7 +459,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
}
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &node);
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
}
......@@ -532,7 +532,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &node);
} else {
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &node);
......
......@@ -210,7 +210,7 @@ typedef struct SSchJob {
int32_t levelNum;
int32_t taskNum;
SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeAddr>
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel>
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
......
......@@ -636,8 +636,9 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
nodeNum = taosArrayGetSize(pJob->nodeList);
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
SQueryNodeAddr *naddr = &nload->addr;
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册