提交 c364f989 编写于 作者: D dapan1121

enh: stop query process

上级 e851977d
......@@ -301,6 +301,7 @@ void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
int32_t removeRequest(int64_t rid);
void doDestroyRequest(void *p);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
......
......@@ -37,10 +37,12 @@ int32_t clientConnRefPool = -1;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj *pRequest) {
static int32_t registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = acquireTscObj(*(int64_t *)pRequest->pTscObj->id);
assert(pTscObj != NULL);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
// connection has been released already, abort creating request.
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
......@@ -56,6 +58,8 @@ static void registerRequest(SRequestObj *pRequest) {
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
pRequest->self, *(int64_t *)pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
}
return TSDB_CODE_SUCCESS;
}
static void deregisterRequest(SRequestObj *pRequest) {
......@@ -202,7 +206,10 @@ void *createRequest(STscObj *pObj, int32_t type) {
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest);
if (registerRequest(pRequest)) {
doDestroyRequest(pRequest);
return NULL;
}
return pRequest;
}
......@@ -230,12 +237,10 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
static void doDestroyRequest(void *p) {
void doDestroyRequest(void *p) {
assert(p != NULL);
SRequestObj *pRequest = (SRequestObj *)p;
assert(RID_VALID(pRequest->self));
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
if (pRequest->body.queryJob != 0) {
......@@ -253,7 +258,9 @@ static void doDestroyRequest(void *p) {
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
deregisterRequest(pRequest);
if (pRequest->self) {
deregisterRequest(pRequest);
}
taosMemoryFreeClear(pRequest);
}
......
......@@ -2008,7 +2008,9 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
void syncQueryFn(void* param, void* res, int32_t code) {
SSyncQueryParam* pParam = param;
pParam->pRequest = res;
pParam->pRequest->code = code;
if (pParam->pRequest) {
pParam->pRequest->code = code;
}
tsem_post(&pParam->sem);
}
......
......@@ -1127,12 +1127,14 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_
}
*num = taosHashGetSize(pCtg->userCache);
if (*num > 0) {
*users = taosMemoryCalloc(*num, sizeof(SUserAuthVersion));
if (NULL == *users) {
ctgError("calloc %d userAuthVersion failed", *num);
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
if (*num <= 0) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
*users = taosMemoryCalloc(*num, sizeof(SUserAuthVersion));
if (NULL == *users) {
ctgError("calloc %d userAuthVersion failed", *num);
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
uint32_t i = 0;
......@@ -1144,6 +1146,11 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_
(*users)[i].user[len] = 0;
(*users)[i].version = pAuth->version;
++i;
if (i >= *num) {
taosHashCancelIterate(pCtg->userCache, pAuth);
break;
}
pAuth = taosHashIterate(pCtg->userCache, pAuth);
}
......
......@@ -23,6 +23,7 @@
#include <sys/time.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include "taos.h"
......@@ -35,7 +36,7 @@ int64_t st, et;
char hostName[128];
char dbName[128];
char tbName[128];
char runTimes = 1;
int32_t runTimes = 10000;
typedef struct {
int id;
......@@ -49,6 +50,7 @@ typedef struct {
} STable;
typedef struct SSP_CB_PARAM {
TAOS *taos;
bool fetch;
int32_t *end;
} SSP_CB_PARAM;
......@@ -73,6 +75,16 @@ static void sqExecSQL(TAOS *taos, char *command) {
taos_free_result(pSql);
}
static void sqExecSQLE(TAOS *taos, char *command) {
int i;
int32_t code = -1;
TAOS_RES *pSql = taos_query(taos, command);
taos_free_result(pSql);
}
void sqExit(char* prefix, const char* errMsg) {
fprintf(stderr, "%s error: %s\n", prefix, errMsg);
exit(1);
......@@ -123,6 +135,27 @@ void sqFreeQueryCb(void *param, TAOS_RES *pRes, int code) {
}
void sqCloseFetchCb(void *param, TAOS_RES *pRes, int numOfRows) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
taos_close(qParam->taos);
*qParam->end = 1;
}
void sqCloseQueryCb(void *param, TAOS_RES *pRes, int code) {
SSP_CB_PARAM *qParam = (SSP_CB_PARAM *)param;
if (code == 0 && pRes) {
if (qParam->fetch) {
taos_fetch_rows_a(pRes, sqFreeFetchCb, param);
} else {
taos_close(qParam->taos);
*qParam->end = 1;
}
} else {
sqExit("select", taos_errstr(pRes));
}
}
int sqSyncStopQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
......@@ -131,6 +164,9 @@ int sqSyncStopQuery(bool fetch) {
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
......@@ -161,6 +197,9 @@ int sqAsyncStopQuery(bool fetch) {
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
......@@ -188,6 +227,9 @@ int sqSyncFreeQuery(bool fetch) {
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
......@@ -216,6 +258,9 @@ int sqAsyncFreeQuery(bool fetch) {
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
......@@ -235,8 +280,119 @@ int sqAsyncFreeQuery(bool fetch) {
CASE_LEAVE();
}
int sqSyncCloseQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
TAOS_RES* pRes = taos_query(taos, sql);
code = taos_errno(pRes);
if (code) {
sqExit("taos_query", taos_errstr(pRes));
}
if (fetch) {
taos_fetch_row(pRes);
}
taos_close(taos);
}
CASE_LEAVE();
}
int sqAsyncCloseQuery(bool fetch) {
CASE_ENTER();
for (int32_t i = 0; i < runTimes; ++i) {
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
sprintf(sql, "reset query cache");
sqExecSQL(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQL(taos, sql);
sprintf(sql, "select * from %s", tbName);
int32_t qEnd = 0;
SSP_CB_PARAM param = {0};
param.fetch = fetch;
param.end = &qEnd;
taos_query_a(taos, sql, sqFreeQueryCb, &param);
while (0 == qEnd) {
usleep(5000);
}
}
CASE_LEAVE();
}
void *syncQueryThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
char sql[1024] = {0};
int32_t code = 0;
TAOS *taos = taos_connect(hostName, "root", "taosdata", NULL, 0);
if (taos == NULL) sqExit("taos_connect", taos_errstr(NULL));
qParam->taos = taos;
sprintf(sql, "reset query cache");
sqExecSQLE(taos, sql);
sprintf(sql, "use %s", dbName);
sqExecSQLE(taos, sql);
sprintf(sql, "select * from %s", tbName);
TAOS_RES* pRes = taos_query(taos, sql);
if (qParam->fetch) {
taos_fetch_row(pRes);
}
taos_free_result(pRes);
}
void *closeThreadFp(void *arg) {
SSP_CB_PARAM* qParam = (SSP_CB_PARAM*)arg;
while (true) {
if (qParam->taos) {
usleep(rand() % 10000);
taos_close(qParam->taos);
break;
}
usleep(1);
}
}
int sqConSyncCloseQuery(bool fetch) {
CASE_ENTER();
pthread_t qid, cid;
for (int32_t i = 0; i < runTimes; ++i) {
SSP_CB_PARAM param = {0};
param.fetch = fetch;
pthread_create(&qid, NULL, syncQueryThreadFp, (void*)&param);
pthread_create(&cid, NULL, closeThreadFp, (void*)&param);
pthread_join(qid, NULL);
pthread_join(cid, NULL);
}
CASE_LEAVE();
}
void sqRunAllCase(void) {
/*
sqSyncStopQuery(false);
sqSyncStopQuery(true);
sqAsyncStopQuery(false);
......@@ -247,6 +403,14 @@ void sqRunAllCase(void) {
sqAsyncFreeQuery(false);
sqAsyncFreeQuery(true);
sqSyncCloseQuery(false);
sqSyncCloseQuery(true);
sqAsyncCloseQuery(false);
sqAsyncCloseQuery(true);
*/
sqConSyncCloseQuery(false);
sqConSyncCloseQuery(true);
}
......@@ -256,6 +420,8 @@ int main(int argc, char *argv[]) {
exit(0);
}
srand((unsigned int)time(NULL));
strcpy(hostName, argv[1]);
strcpy(dbName, argv[2]);
strcpy(tbName, argv[3]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册