diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index cff4b0234c2c37c8dac82b174a916a926bdfd564..0aa48ef2aa167318c37d92c7c92917ebc28a7f84 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -109,7 +109,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @param tversion
* @return
*/
-int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
+int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion);
/**
diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h
index 9380b73d2d26ecd4ca5b5e11145748f652e36ade..91f21f6e6a7957c0dc86880aef869c0992392659 100644
--- a/source/client/inc/clientInt.h
+++ b/source/client/inc/clientInt.h
@@ -222,8 +222,8 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
- bool stableQuery;
- bool validateOnly;
+ bool stableQuery; // todo refactor
+ bool validateOnly; // todo refactor
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
@@ -247,9 +247,9 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
-SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
+SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
-void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
+void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
@@ -297,7 +297,7 @@ int32_t releaseTscObj(int64_t rid);
uint64_t generateRequestId();
-void* createRequest(STscObj* pObj, int32_t type);
+void* createRequest(uint64_t connId, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
@@ -318,13 +318,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
-SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
+SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
-int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
+int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest);
void taos_close_internal(void* taos);
diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c
index 89ecf16b4085c9a53f30a60af3870bb8d5230a51..ba92ed238b02489bb09d45fb65a0b0071f6fac9a 100644
--- a/source/client/src/clientEnv.c
+++ b/source/client/src/clientEnv.c
@@ -49,13 +49,8 @@ void cleanupTscQhandle() {
// destroy handle
taosCleanUpScheduler(tscQhandle);
}
-static int32_t registerRequest(SRequestObj *pRequest) {
- STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
- if (NULL == pTscObj) {
- terrno = TSDB_CODE_TSC_DISCONNECTED;
- return terrno;
- }
+static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
// connection has been released already, abort creating request.
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
@@ -246,29 +241,34 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
-void *createRequest(STscObj *pObj, int32_t type) {
- assert(pObj != NULL);
-
+void *createRequest(uint64_t connId, int32_t type) {
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
if (NULL == pRequest) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
+ STscObj* pTscObj = acquireTscObj(connId);
+ if (pTscObj == NULL) {
+ terrno = TSDB_CODE_TSC_DISCONNECTED;
+ return NULL;
+ }
+
pRequest->resType = RES_TYPE__QUERY;
- pRequest->pDb = getDbOfConnection(pObj);
pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampUs();
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
-
pRequest->type = type;
- pRequest->pTscObj = pObj;
+
+ pRequest->pDb = getDbOfConnection(pTscObj);
+ pRequest->pTscObj = pTscObj;
+
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0);
- if (registerRequest(pRequest)) {
+ if (registerRequest(pRequest, pTscObj)) {
doDestroyRequest(pRequest);
return NULL;
}
@@ -327,8 +327,8 @@ void doDestroyRequest(void *p) {
if (pRequest->self) {
deregisterRequest(pRequest);
}
- taosMemoryFree(pRequest);
+ taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
}
@@ -338,7 +338,6 @@ void destroyRequest(SRequestObj *pRequest) {
}
taos_stop_query(pRequest);
-
removeRequest(pRequest->self);
}
diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c
index 6fb3d980274b835af8636a635a8612fb36c3ad4f..a4a5ec7499e13369fd3113f3b0c624deb95861bb 100644
--- a/source/client/src/clientImpl.c
+++ b/source/client/src/clientImpl.c
@@ -148,29 +148,49 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
}
-int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
- *pRequest = createRequest(pTscObj, TSDB_SQL_SELECT);
+int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest) {
+ *pRequest = createRequest(connId, TSDB_SQL_SELECT);
if (*pRequest == NULL) {
- tscError("failed to malloc sqlObj");
+ tscError("failed to malloc sqlObj, %s", sql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
(*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
if ((*pRequest)->sqlstr == NULL) {
- tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
- (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
+ tscError("0x%" PRIx64 " failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
+ destroyRequest(*pRequest);
+ *pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
(*pRequest)->sqlstr[sqlLen] = 0;
- (*pRequest)->sqlLen = sqlLen;
+ (*pRequest)->sqlLen = sqlLen;
+ (*pRequest)->validateOnly = validateSql;
+
+ if (param == NULL) {
+ SSyncQueryParam* pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
+ if (pParam == NULL) {
+ destroyRequest(*pRequest);
+ *pRequest = NULL;
+ return TSDB_CODE_TSC_OUT_OF_MEMORY;
+ }
+
+ tsem_init(&pParam->sem, 0, 0);
+ pParam->pRequest = (*pRequest);
+ param = pParam;
+ }
+
+ (*pRequest)->body.param = param;
+ STscObj* pTscObj = (*pRequest)->pTscObj;
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
sizeof((*pRequest)->self))) {
+ tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
+ (*pRequest)->requestId, pTscObj->id, sql);
+
destroyRequest(*pRequest);
*pRequest = NULL;
- tscError("put request to request hash failed");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
@@ -882,18 +902,16 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
return pRequest;
}
-SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
+SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
- int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
+ int32_t code = buildRequest(connId, sql, sqlLen, NULL, validateOnly, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
- pRequest->validateOnly = validateOnly;
-
code = parseSql(pRequest, false, &pQuery, NULL);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
@@ -1044,19 +1062,20 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
return TSDB_CODE_SUCCESS;
}
-SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
+// todo remove it soon
+SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
do {
destroyRequest(pRequest);
- pRequest = launchQuery(pTscObj, sql, sqlLen, validateOnly);
+ pRequest = launchQuery(connId, sql, sqlLen, validateOnly);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
- code = refreshMeta(pTscObj, pRequest);
+ code = refreshMeta(pRequest->pTscObj, pRequest);
if (code) {
pRequest->code = code;
break;
@@ -1064,7 +1083,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool valid
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
- removeMeta(pTscObj, pRequest->tableList);
+ removeMeta(pRequest->pTscObj, pRequest->tableList);
}
return pRequest;
@@ -1119,7 +1138,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj;
}
- SRequestObj* pRequest = createRequest(pTscObj, TDMT_MND_CONNECT);
+ SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT);
if (pRequest == NULL) {
destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
@@ -1446,15 +1465,10 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return NULL;
}
- SSyncQueryParam* pParam = pRequest->body.param;
- if (NULL == pParam) {
- pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
- tsem_init(&pParam->sem, 0, 0);
- }
-
// convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4;
+ SSyncQueryParam* pParam = pRequest->body.param;
taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
tsem_wait(&pParam->sem);
}
@@ -2026,22 +2040,9 @@ void syncQueryFn(void* param, void* res, int32_t code) {
tsem_post(&pParam->sem);
}
-void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
- if (NULL == taos) {
- terrno = TSDB_CODE_TSC_DISCONNECTED;
- fp(param, NULL, terrno);
- return;
- }
-
- int64_t rid = *(int64_t*)taos;
- STscObj* pTscObj = acquireTscObj(rid);
- if (pTscObj == NULL || sql == NULL || NULL == fp) {
+void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
+ if (sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
- if (pTscObj) {
- releaseTscObj(rid);
- } else {
- terrno = TSDB_CODE_TSC_DISCONNECTED;
- }
fp(param, NULL, terrno);
return;
}
@@ -2050,26 +2051,20 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void*
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
- releaseTscObj(rid);
-
fp(param, NULL, terrno);
return;
}
SRequestObj* pRequest = NULL;
- int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
+ int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
- releaseTscObj(rid);
fp(param, NULL, terrno);
return;
}
- pRequest->validateOnly = validateOnly;
pRequest->body.queryFp = fp;
- pRequest->body.param = param;
doAsyncQuery(pRequest, false);
- releaseTscObj(rid);
}
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
@@ -2078,36 +2073,22 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
return NULL;
}
- int64_t rid = *(int64_t*)taos;
- STscObj* pTscObj = acquireTscObj(rid);
- if (pTscObj == NULL || sql == NULL) {
- terrno = TSDB_CODE_TSC_DISCONNECTED;
- return NULL;
- }
-
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(¶m->sem, 0, 0);
- taosAsyncQueryImpl((TAOS*)&rid, sql, syncQueryFn, param, validateOnly);
+ taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(¶m->sem);
-
- releaseTscObj(rid);
-
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
- releaseTscObj(rid);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
- TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen, validateOnly);
-
- releaseTscObj(rid);
-
+ TAOS_RES* pRes = execQuery(connId, sql, sqlLen, validateOnly);
return pRes;
#endif
}
diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c
index e908046b1e3adfef72ceed24055d988e2fda9cb7..12de522cbc506fb4f2aa89fd571cc9399e7846f8 100644
--- a/source/client/src/clientMain.c
+++ b/source/client/src/clientMain.c
@@ -479,7 +479,6 @@ void taos_stop_query(TAOS_RES *res) {
}
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
-
tscDebug("request %" PRIx64 " killed", pRequest->requestId);
}
@@ -706,7 +705,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
- taosAsyncQueryImpl(taos, sql, fp, param, false);
+ int64_t connId = *(int64_t*)taos;
+ taosAsyncQueryImpl(connId, sql, fp, param, false);
}
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
@@ -915,7 +915,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return terrno;
}
- int64_t rid = *(int64_t *)taos;
+ int64_t connId = *(int64_t *)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0;
SRequestObj * pRequest = NULL;
@@ -933,12 +933,14 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
- STscObj *pTscObj = acquireTscObj(rid);
- if (pTscObj == NULL) {
- terrno = TSDB_CODE_TSC_DISCONNECTED;
- return terrno;
+ char *sql = "taos_load_table_info";
+ code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
+ if (code != TSDB_CODE_SUCCESS) {
+ terrno = code;
+ goto _return;
}
+ STscObj* pTscObj = pRequest->pTscObj;
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
if (code) {
goto _return;
@@ -950,36 +952,22 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return;
}
- char *sql = "taos_load_table_info";
- code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
- if (code != TSDB_CODE_SUCCESS) {
- terrno = code;
- goto _return;
- }
-
- SSyncQueryParam param = {0};
- tsem_init(¶m.sem, 0, 0);
- param.pRequest = pRequest;
-
SRequestConnInfo conn = {
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
- code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, ¶m, NULL);
+ code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, NULL, NULL);
if (code) {
goto _return;
}
- tsem_wait(¶m.sem);
+ SSyncQueryParam* pParam = pRequest->body.param;
+ tsem_wait(&pParam->sem);
_return:
-
taosArrayDestroy(catalogReq.pTableMeta);
destroyRequest(pRequest);
-
- releaseTscObj(rid);
-
return code;
}
diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c
index b6972e4670b972dc5d5a2d0235202bd8b318beb0..6e689adf957ae9be4e4f3b06f17d25278507ee69 100644
--- a/source/client/src/clientSml.c
+++ b/source/client/src/clientSml.c
@@ -2442,22 +2442,15 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL;
}
- int64_t rid = *(int64_t*)taos;
- STscObj* pTscObj = acquireTscObj(rid);
- if (NULL == pTscObj) {
- terrno = TSDB_CODE_TSC_DISCONNECTED;
- uError("SML:taos_schemaless_insert invalid taos");
- return NULL;
- }
-
- SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
+ SRequestObj* request = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
if(!request){
- releaseTscObj(rid);
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
int batchs = 0;
+ STscObj* pTscObj = request->pTscObj;
+
pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
@@ -2507,7 +2500,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
batchs = ceil(((double)numLines) / LINE_BATCH);
for (int i = 0; i < batchs; ++i) {
- SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
+ SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null");
@@ -2549,6 +2542,5 @@ end:
// ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
- releaseTscObj(rid);
return (TAOS_RES*)request;
}
diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c
index 1e0f30695dca6a851cf0cafd85d1057635a9fc77..d653b8720f63b14ab15e3f2a77448adbd4d1b0c6 100644
--- a/source/client/src/clientStmt.c
+++ b/source/client/src/clientStmt.c
@@ -5,6 +5,14 @@
#include "clientStmt.h"
+static int32_t stmtCreateRequest(STscStmt* pStmt) {
+ if (pStmt->exec.pRequest == NULL) {
+ return buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest);
+ } else {
+ return TSDB_CODE_SUCCESS;
+ }
+}
+
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
int32_t code = 0;
@@ -217,9 +225,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
.getExecInfoFn = stmtGetExecInfo,
};
- if (NULL == pStmt->exec.pRequest) {
- STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
- }
+ STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
@@ -532,9 +538,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
- if (NULL == pStmt->exec.pRequest) {
- STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
- }
+ STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
@@ -625,9 +629,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
pStmt->exec.pRequest = NULL;
}
- if (NULL == pStmt->exec.pRequest) {
- STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
- }
+ STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
@@ -873,9 +875,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
- if (NULL == pStmt->exec.pRequest) {
- STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
- }
+ STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
@@ -905,9 +905,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
- if (NULL == pStmt->exec.pRequest) {
- STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
- }
+ STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
@@ -933,10 +931,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
pStmt->exec.pRequest = NULL;
}
- if (NULL == pStmt->exec.pRequest) {
- STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
- }
-
+ STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
@@ -969,9 +964,7 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
pStmt->exec.pRequest = NULL;
}
- if (NULL == pStmt->exec.pRequest) {
- STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
- }
+ STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c
index cea6905c6944d0359274c3466582c2ec67db0f93..614174d97ae54acf64bb80d67c5dc3882409de04 100644
--- a/source/client/src/tmq.c
+++ b/source/client/src/tmq.c
@@ -2267,13 +2267,7 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
- STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
- if (NULL == pTscObj) {
- code = TSDB_CODE_TSC_DISCONNECTED;
- goto end;
- }
-
- code = buildRequest(pTscObj, "", 0, &pRequest);
+ code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@@ -2313,6 +2307,8 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
pReq.suid = req.suid;
pReq.source = 1;
+ STscObj* pTscObj = pRequest->pTscObj;
+
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@@ -2351,13 +2347,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
- STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
- if (NULL == pTscObj) {
- code = TSDB_CODE_TSC_DISCONNECTED;
- goto end;
- }
-
- code = buildRequest(pTscObj, "", 0, &pRequest);
+ code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@@ -2379,6 +2369,9 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
pReq.igNotExists = true;
pReq.source = 1;
pReq.suid = req.suid;
+
+ STscObj* pTscObj = pRequest->pTscObj;
+
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
@@ -2427,14 +2420,8 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
- STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
- if (NULL == pTscObj) {
- code = TSDB_CODE_TSC_DISCONNECTED;
- goto end;
- }
-
- code = buildRequest(pTscObj, "", 0, &pRequest);
+ code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@@ -2452,7 +2439,9 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
- SVCreateTbReq *pCreateReq = NULL;
+ STscObj* pTscObj = pRequest->pTscObj;
+
+ SVCreateTbReq *pCreateReq = NULL;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
@@ -2544,14 +2533,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
- STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
-
- if (NULL == pTscObj) {
- code = TSDB_CODE_TSC_DISCONNECTED;
- goto end;
- }
- code = buildRequest(pTscObj, "", 0, &pRequest);
+ code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@@ -2569,6 +2552,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
+ STscObj* pTscObj = pRequest->pTscObj;
+
SVDropTbReq *pDropReq = NULL;
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
@@ -2649,14 +2634,9 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
SQuery *pQuery = NULL;
SArray *pArray = NULL;
SVgDataBlocks *pVgData = NULL;
- STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
- if (NULL == pTscObj) {
- code = TSDB_CODE_TSC_DISCONNECTED;
- goto end;
- }
- code = buildRequest(pTscObj, "", 0, &pRequest);
+ code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@@ -2679,7 +2659,8 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
- SCatalog *pCatalog = NULL;
+ STscObj* pTscObj = pRequest->pTscObj;
+ SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index f650480ff2b58debb9a92d0f53587ecb3a13af8a..51f7f237762b8900c61c9c5d1e7bf42b4a52a9c5 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -59,7 +59,6 @@ typedef struct SBlockLoadSuppInfo {
SColumnDataAgg tsColAgg;
SColumnDataAgg** plist;
int16_t* colIds; // column ids for loading file block data
- int32_t* slotIds; // colId to slotId
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo;
@@ -183,7 +182,6 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return NULL;
}
- // todo apply the lastkey of table check to avoid to load header file
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
@@ -218,6 +216,30 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
}
}
+static void destroyBlockScanInfo(SHashObj* pTableMap) {
+ STableBlockScanInfo* p = NULL;
+
+ while ((p = taosHashIterate(pTableMap, p)) != NULL) {
+ p->iterInit = false;
+ p->iiter.hasVal = false;
+
+ if (p->iter.iter != NULL) {
+ tsdbTbDataIterDestroy(p->iter.iter);
+ p->iter.iter = NULL;
+ }
+
+ if (p->iiter.iter != NULL) {
+ tsdbTbDataIterDestroy(p->iiter.iter);
+ p->iiter.iter = NULL;
+ }
+
+ taosArrayDestroy(p->delSkyline);
+ p->delSkyline = NULL;
+ }
+
+ taosHashCleanup(pTableMap);
+}
+
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
ASSERT(pWindow != NULL);
return pWindow->skey > pWindow->ekey;
@@ -265,6 +287,10 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFSt
return TSDB_CODE_SUCCESS;
}
+static void cleanupFilesetIterator(SFilesetIter* pIter) {
+ taosArrayDestroy(pIter->pFileList);
+}
+
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pIter->order);
int32_t step = asc ? 1 : -1;
@@ -313,7 +339,15 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
pIter->order = order;
pIter->index = -1;
pIter->numOfBlocks = -1;
- pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
+ if (pIter->blockList == NULL) {
+ pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
+ } else {
+ taosArrayClear(pIter->blockList);
+ }
+}
+
+static void cleanupDataBlockIterator(SDataBlockIter* pIter) {
+ taosArrayDestroy(pIter->blockList);
}
static void initReaderStatus(SReaderStatus* pStatus) {
@@ -2515,6 +2549,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
tRowMergerInit(&merge, pRow, pReader->pSchema);
doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
+ tRowMergerClear(&merge);
}
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
@@ -2651,6 +2686,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
}
doAppendOneRow(pBlock, pReader, pTSRow);
+ taosMemoryFree(pTSRow);
// no data in buffer, return immediately
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
@@ -2778,11 +2814,24 @@ void tsdbReaderClose(STsdbReader* pReader) {
return;
}
+ SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
+
+ taosMemoryFreeClear(pSupInfo->plist);
+ taosMemoryFree(pSupInfo->colIds);
+
+ taosArrayDestroy(pSupInfo->pColAgg);
+ for(int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
+ if (pSupInfo->buildBuf[i] != NULL) {
+ taosMemoryFreeClear(pSupInfo->buildBuf[i]);
+ }
+ }
+ taosMemoryFree(pSupInfo->buildBuf);
+
+ cleanupFilesetIterator(&pReader->status.fileIter);
+ cleanupDataBlockIterator(&pReader->status.blockIter);
+ destroyBlockScanInfo(pReader->status.pTableMap);
blockDataDestroy(pReader->pResBlock);
- taosMemoryFreeClear(pReader->suppInfo.plist);
- taosArrayDestroy(pReader->suppInfo.pColAgg);
- taosMemoryFree(pReader->suppInfo.slotIds);
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index fc35fba93515482455c7e20ca95b6fd062d7b5c0..5a1a46a9b8312364ab937ba90ca1155d72a5beeb 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -184,7 +184,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return code;
}
-int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
+int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion) {
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index b209684388d83c4a128eca2fb6891277d8aa0e8a..8171c7e42ee1522da6e64fb2f21382f8ee0f20ec 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@@ -4052,7 +4053,7 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea
static SArray* extractColumnInfo(SNodeList* pNodeList);
-int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
+int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
int32_t code = metaGetTableEntryByUid(&mr, uid);
@@ -4076,10 +4077,20 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
}
metaReaderClear(&mr);
-
return TSDB_CODE_SUCCESS;
}
+static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) {
+ taosMemoryFreeClear(pTaskInfo->schemaVer.dbname);
+ if (pTaskInfo->schemaVer.sw == NULL) {
+ return;
+ }
+
+ taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema);
+ taosMemoryFree(pTaskInfo->schemaVer.sw);
+ taosMemoryFree(pTaskInfo->schemaVer.tablename);
+}
+
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
taosArrayClear(pTableListInfo->pGroupList);
SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
@@ -4253,7 +4264,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
- code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
+ code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
return NULL;
@@ -4271,7 +4282,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo->code = code;
return NULL;
}
- code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
+ code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
return NULL;
@@ -4368,7 +4379,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
// return NULL;
// }
- int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo);
+ int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
@@ -4867,11 +4878,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
- // taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
- // taosHashCleanup(pTaskInfo->summary.operatorProfResults);
+ cleanupTableSchemaInfo(pTaskInfo);
- taosMemoryFree(pTaskInfo->schemaVer.dbname);
- taosMemoryFree(pTaskInfo->schemaVer.tablename);
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo);
diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c
index 5aaf2b80381986ff1120bf527e81d5c537eab834..9c49cbcb1fd2f9cd735a74e6782d3ace298661ee 100644
--- a/source/libs/qworker/src/qwUtil.c
+++ b/source/libs/qworker/src/qwUtil.c
@@ -436,7 +436,7 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
char tbName[TSDB_TABLE_NAME_LEN] = {0};
- qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
+ qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
if (dbFName[0] && tbName[0]) {
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);