提交 73dfd117 编写于 作者: D dapan1121

enh: stop query

上级 c364f989
......@@ -139,7 +139,7 @@ typedef struct STscObj {
int8_t connType;
int32_t acctId;
uint32_t connId;
TAOS* id; // ref ID returned by taosAddRef
int64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
......
......@@ -38,7 +38,7 @@ static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
static int32_t registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)pRequest->pTscObj->id);
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
......@@ -56,7 +56,7 @@ static int32_t registerRequest(SRequestObj *pRequest) {
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
pRequest->self, *(int64_t *)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
}
return TSDB_CODE_SUCCESS;
......@@ -74,8 +74,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d",
pRequest->self, *(int64_t *)pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
releaseTscObj(*(int64_t *)pTscObj->id);
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
releaseTscObj(pTscObj->id);
}
// todo close the transporter properly
......@@ -84,7 +84,7 @@ void closeTransporter(STscObj *pTscObj) {
return;
}
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, *(int64_t *)pTscObj->id);
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
rpcClose(pTscObj->pAppInfo->pTransporter);
}
......@@ -133,16 +133,15 @@ void closeAllRequests(SHashObj *pRequests) {
void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj;
SClientHbKey connKey = {.tscRid = *(int64_t *)pTscObj->id, .connType = pTscObj->connType};
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
if (0 == connNum) {
// TODO
// closeTransporter(pTscObj);
closeTransporter(pTscObj);
}
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, *(int64_t *)pTscObj->id,
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj,
pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj);
......@@ -172,11 +171,10 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
}
taosThreadMutexInit(&pObj->mutex, NULL);
pObj->id = taosMemoryMalloc(sizeof(int64_t));
*(int64_t *)pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 1;
tscDebug("connObj created, 0x%" PRIx64, *(int64_t *)pObj->id);
tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj);
return pObj;
}
......
......@@ -1170,7 +1170,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
taos_close_internal(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, *(int64_t*)pTscObj->id,
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
destroyRequest(pRequest);
}
......@@ -1333,7 +1333,9 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
if (pObj) {
return pObj->id;
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
*rid = pObj->id;
return (TAOS*)rid;
}
return NULL;
......@@ -2016,11 +2018,18 @@ void syncQueryFn(void* param, void* res, int32_t code) {
}
void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
fp(param, NULL, terrno);
return;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(rid);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
......@@ -2032,7 +2041,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void*
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
releaseTscObj(*(int64_t *)taos);
releaseTscObj(rid);
fp(param, NULL, terrno);
return;
......@@ -2042,7 +2051,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void*
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
releaseTscObj(*(int64_t *)taos);
releaseTscObj(rid);
fp(param, NULL, terrno);
return;
}
......@@ -2051,7 +2060,7 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void*
pRequest->body.queryFp = fp;
pRequest->body.param = param;
doAsyncQuery(pRequest, false);
releaseTscObj(*(int64_t *)taos);
releaseTscObj(rid);
}
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
......@@ -2060,7 +2069,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
return NULL;
}
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
......@@ -2070,16 +2080,16 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taosAsyncQueryImpl(taos, sql, syncQueryFn, param, validateOnly);
taosAsyncQueryImpl((TAOS*)&rid, sql, syncQueryFn, param, validateOnly);
tsem_wait(&param->sem);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(rid);
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(*(int64_t*)taos);
releaseTscObj(rid);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
......@@ -2087,7 +2097,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen, validateOnly);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(rid);
return pRes;
#endif
......
......@@ -93,7 +93,9 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
STscObj *pObj = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY);
if (pObj) {
return pObj->id;
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
*rid = pObj->id;
return (TAOS*)rid;
}
return NULL;
......@@ -105,9 +107,9 @@ void taos_close_internal(void *taos) {
}
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", *(int64_t *)pTscObj->id, pTscObj->numOfReqs);
tscDebug("0x%" PRIx64 " try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(clientConnRefPool, *(int64_t *)pTscObj->id);
taosRemoveRef(clientConnRefPool, pTscObj->id);
}
void taos_close(TAOS *taos) {
......@@ -880,6 +882,12 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
int64_t rid = *(int64_t*)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0;
SRequestObj *pRequest = NULL;
......@@ -897,7 +905,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
STscObj *pTscObj = acquireTscObj(rid);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
......@@ -942,7 +950,7 @@ _return:
taosArrayDestroy(catalogReq.pTableMeta);
destroyRequest(pRequest);
releaseTscObj(*(int64_t *)taos);
releaseTscObj(rid);
return code;
}
......
......@@ -77,7 +77,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, *(int64_t*)pTscObj->id);
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
}
pTscObj->connId = connectRsp.connId;
......@@ -91,7 +91,7 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
pTscObj->connType = connectRsp.connType;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, *(int64_t*)pTscObj->id, connectRsp.clusterId, connectRsp.connType);
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
......
......@@ -309,7 +309,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_ADD_COLUMN: {
int n = sprintf(result, "alter stable `%s` add column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
const char *errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
......@@ -323,7 +323,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_ADD_TAG: {
int n = sprintf(result, "alter stable `%s` add tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
const char *errStr = taos_errstr(res);
if (code != TSDB_CODE_SUCCESS) {
......@@ -337,7 +337,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: {
int n = sprintf(result, "alter stable `%s` modify column ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -350,7 +350,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
case SCHEMA_ACTION_CHANGE_TAG_SIZE: {
int n = sprintf(result, "alter stable `%s` modify tag ", action->alterSTable.sTableName);
smlBuildColumnDescription(action->alterSTable.field, result + n, capacity - n, &outBytes);
TAOS_RES *res = taos_query(info->taos->id, result); // TODO async doAsyncQuery
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result); // TODO async doAsyncQuery
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -405,7 +405,7 @@ static int32_t smlApplySchemaAction(SSmlHandle *info, SSchemaAction *action) {
pos--;
++freeBytes;
outBytes = snprintf(pos, freeBytes, ")");
TAOS_RES *res = taos_query(info->taos->id, result);
TAOS_RES *res = taos_query((TAOS*)&info->taos->id, result);
code = taos_errno(res);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " apply schema action. error : %s", info->id, taos_errstr(res));
......@@ -2436,7 +2436,13 @@ static void smlInsertCallback(void *param, void *res, int32_t code) {
*/
TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) {
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
uError("SML:taos_schemaless_insert invalid taos");
......@@ -2445,7 +2451,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
if(!request){
releaseTscObj(*(int64_t*)taos);
releaseTscObj(rid);
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
......@@ -2533,6 +2539,6 @@ end:
// ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
releaseTscObj(*(int64_t*)taos);
releaseTscObj(rid);
return (TAOS_RES*)request;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册