提交 7dc7f478 编写于 作者: H Haojun Liao

refactor: do some internal refactor and fix some memory leak.

上级 5819b616
......@@ -109,7 +109,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @param tversion
* @return
*/
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion);
/**
......
......@@ -222,8 +222,8 @@ typedef struct SRequestObj {
SArray* tableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
bool stableQuery;
bool validateOnly;
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
......@@ -247,9 +247,9 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
......@@ -297,7 +297,7 @@ int32_t releaseTscObj(int64_t rid);
uint64_t generateRequestId();
void* createRequest(STscObj* pObj, int32_t type);
void* createRequest(uint64_t connId, int32_t type);
void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
......@@ -318,13 +318,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly);
SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest);
void taos_close_internal(void* taos);
......
......@@ -49,13 +49,8 @@ void cleanupTscQhandle() {
// destroy handle
taosCleanUpScheduler(tscQhandle);
}
static int32_t registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
// connection has been released already, abort creating request.
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
......@@ -246,29 +241,34 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
void *createRequest(STscObj *pObj, int32_t type) {
assert(pObj != NULL);
void *createRequest(uint64_t connId, int32_t type) {
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
if (NULL == pRequest) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
STscObj* pTscObj = acquireTscObj(connId);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
pRequest->resType = RES_TYPE__QUERY;
pRequest->pDb = getDbOfConnection(pObj);
pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampUs();
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->pDb = getDbOfConnection(pTscObj);
pRequest->pTscObj = pTscObj;
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0);
if (registerRequest(pRequest)) {
if (registerRequest(pRequest, pTscObj)) {
doDestroyRequest(pRequest);
return NULL;
}
......@@ -327,8 +327,8 @@ void doDestroyRequest(void *p) {
if (pRequest->self) {
deregisterRequest(pRequest);
}
taosMemoryFree(pRequest);
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
}
......@@ -338,7 +338,6 @@ void destroyRequest(SRequestObj *pRequest) {
}
taos_stop_query(pRequest);
removeRequest(pRequest->self);
}
......
......@@ -148,29 +148,49 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
}
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
*pRequest = createRequest(pTscObj, TSDB_SQL_SELECT);
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest) {
*pRequest = createRequest(connId, TSDB_SQL_SELECT);
if (*pRequest == NULL) {
tscError("failed to malloc sqlObj");
tscError("failed to malloc sqlObj, %s", sql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
(*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
if ((*pRequest)->sqlstr == NULL) {
tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
(*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
tscError("0x%" PRIx64 " failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
(*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen;
(*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql;
if (param == NULL) {
SSyncQueryParam* pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (pParam == NULL) {
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
pParam->pRequest = (*pRequest);
param = pParam;
}
(*pRequest)->body.param = param;
STscObj* pTscObj = (*pRequest)->pTscObj;
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
sizeof((*pRequest)->self))) {
tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
destroyRequest(*pRequest);
*pRequest = NULL;
tscError("put request to request hash failed");
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -882,18 +902,16 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
return pRequest;
}
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
int32_t code = buildRequest(connId, sql, sqlLen, NULL, validateOnly, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
pRequest->validateOnly = validateOnly;
code = parseSql(pRequest, false, &pQuery, NULL);
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
......@@ -1044,19 +1062,20 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
return TSDB_CODE_SUCCESS;
}
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) {
// todo remove it soon
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
do {
destroyRequest(pRequest);
pRequest = launchQuery(pTscObj, sql, sqlLen, validateOnly);
pRequest = launchQuery(connId, sql, sqlLen, validateOnly);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
code = refreshMeta(pTscObj, pRequest);
code = refreshMeta(pRequest->pTscObj, pRequest);
if (code) {
pRequest->code = code;
break;
......@@ -1064,7 +1083,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool valid
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList);
removeMeta(pRequest->pTscObj, pRequest->tableList);
}
return pRequest;
......@@ -1119,7 +1138,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj;
}
SRequestObj* pRequest = createRequest(pTscObj, TDMT_MND_CONNECT);
SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT);
if (pRequest == NULL) {
destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1446,15 +1465,10 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return NULL;
}
SSyncQueryParam* pParam = pRequest->body.param;
if (NULL == pParam) {
pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&pParam->sem, 0, 0);
}
// convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4;
SSyncQueryParam* pParam = pRequest->body.param;
taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
tsem_wait(&pParam->sem);
}
......@@ -2026,22 +2040,9 @@ void syncQueryFn(void* param, void* res, int32_t code) {
tsem_post(&pParam->sem);
}
void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
if (NULL == taos) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
fp(param, NULL, terrno);
return;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
if (sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj(rid);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
fp(param, NULL, terrno);
return;
}
......@@ -2050,26 +2051,20 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void*
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
releaseTscObj(rid);
fp(param, NULL, terrno);
return;
}
SRequestObj* pRequest = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
releaseTscObj(rid);
fp(param, NULL, terrno);
return;
}
pRequest->validateOnly = validateOnly;
pRequest->body.queryFp = fp;
pRequest->body.param = param;
doAsyncQuery(pRequest, false);
releaseTscObj(rid);
}
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
......@@ -2078,36 +2073,22 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
return NULL;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
#if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0);
taosAsyncQueryImpl((TAOS*)&rid, sql, syncQueryFn, param, validateOnly);
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(&param->sem);
releaseTscObj(rid);
return param->pRequest;
#else
size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(rid);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL;
}
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen, validateOnly);
releaseTscObj(rid);
TAOS_RES* pRes = execQuery(connId, sql, sqlLen, validateOnly);
return pRes;
#endif
}
......@@ -479,7 +479,6 @@ void taos_stop_query(TAOS_RES *res) {
}
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
tscDebug("request %" PRIx64 " killed", pRequest->requestId);
}
......@@ -706,7 +705,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
}
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
taosAsyncQueryImpl(taos, sql, fp, param, false);
int64_t connId = *(int64_t*)taos;
taosAsyncQueryImpl(connId, sql, fp, param, false);
}
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
......@@ -915,7 +915,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return terrno;
}
int64_t rid = *(int64_t *)taos;
int64_t connId = *(int64_t *)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0;
SRequestObj * pRequest = NULL;
......@@ -933,12 +933,14 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STscObj *pTscObj = acquireTscObj(rid);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
char *sql = "taos_load_table_info";
code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _return;
}
STscObj* pTscObj = pRequest->pTscObj;
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
if (code) {
goto _return;
......@@ -950,36 +952,22 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return;
}
char *sql = "taos_load_table_info";
code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _return;
}
SSyncQueryParam param = {0};
tsem_init(&param.sem, 0, 0);
param.pRequest = pRequest;
SRequestConnInfo conn = {
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, &param, NULL);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, NULL, NULL);
if (code) {
goto _return;
}
tsem_wait(&param.sem);
SSyncQueryParam* pParam = pRequest->body.param;
tsem_wait(&pParam->sem);
_return:
taosArrayDestroy(catalogReq.pTableMeta);
destroyRequest(pRequest);
releaseTscObj(rid);
return code;
}
......
......@@ -2442,22 +2442,15 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
uError("SML:taos_schemaless_insert invalid taos");
return NULL;
}
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
SRequestObj* request = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
if(!request){
releaseTscObj(rid);
uError("SML:taos_schemaless_insert error request is null");
return NULL;
}
int batchs = 0;
STscObj* pTscObj = request->pTscObj;
pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
......@@ -2507,7 +2500,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
batchs = ceil(((double)numLines) / LINE_BATCH);
for (int i = 0; i < batchs; ++i) {
SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null");
......@@ -2549,6 +2542,5 @@ end:
// ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf);
releaseTscObj(rid);
return (TAOS_RES*)request;
}
......@@ -5,6 +5,14 @@
#include "clientStmt.h"
static int32_t stmtCreateRequest(STscStmt* pStmt) {
if (pStmt->exec.pRequest == NULL) {
return buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest);
} else {
return TSDB_CODE_SUCCESS;
}
}
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
int32_t code = 0;
......@@ -217,9 +225,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
.getExecInfoFn = stmtGetExecInfo,
};
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
......@@ -532,9 +538,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
......@@ -625,9 +629,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
......@@ -873,9 +875,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
......@@ -905,9 +905,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
......@@ -933,10 +931,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
......@@ -969,9 +964,7 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
......
......@@ -2267,13 +2267,7 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
......@@ -2313,6 +2307,8 @@ static int32_t taosCreateStb(TAOS *taos, void *meta, int32_t metaLen){
pReq.suid = req.suid;
pReq.source = 1;
STscObj* pTscObj = pRequest->pTscObj;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
......@@ -2351,13 +2347,7 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
......@@ -2379,6 +2369,9 @@ static int32_t taosDropStb(TAOS *taos, void *meta, int32_t metaLen){
pReq.igNotExists = true;
pReq.source = 1;
pReq.suid = req.suid;
STscObj* pTscObj = pRequest->pTscObj;
SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
......@@ -2427,14 +2420,8 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
......@@ -2452,7 +2439,9 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
SVCreateTbReq *pCreateReq = NULL;
STscObj* pTscObj = pRequest->pTscObj;
SVCreateTbReq *pCreateReq = NULL;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2544,14 +2533,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
SRequestObj *pRequest = NULL;
SQuery *pQuery = NULL;
SHashObj *pVgroupHashmap = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
......@@ -2569,6 +2552,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
STscObj* pTscObj = pRequest->pTscObj;
SVDropTbReq *pDropReq = NULL;
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
......@@ -2649,14 +2634,9 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
SQuery *pQuery = NULL;
SArray *pArray = NULL;
SVgDataBlocks *pVgData = NULL;
STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
......@@ -2679,7 +2659,8 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
goto end;
}
SCatalog *pCatalog = NULL;
STscObj* pTscObj = pRequest->pTscObj;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
goto end;
......
......@@ -59,7 +59,6 @@ typedef struct SBlockLoadSuppInfo {
SColumnDataAgg tsColAgg;
SColumnDataAgg** plist;
int16_t* colIds; // column ids for loading file block data
int32_t* slotIds; // colId to slotId
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo;
......@@ -183,7 +182,6 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return NULL;
}
// todo apply the lastkey of table check to avoid to load header file
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
......@@ -218,6 +216,30 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
}
}
static void destroyBlockScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
p->iterInit = false;
p->iiter.hasVal = false;
if (p->iter.iter != NULL) {
tsdbTbDataIterDestroy(p->iter.iter);
p->iter.iter = NULL;
}
if (p->iiter.iter != NULL) {
tsdbTbDataIterDestroy(p->iiter.iter);
p->iiter.iter = NULL;
}
taosArrayDestroy(p->delSkyline);
p->delSkyline = NULL;
}
taosHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
ASSERT(pWindow != NULL);
return pWindow->skey > pWindow->ekey;
......@@ -265,6 +287,10 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFSt
return TSDB_CODE_SUCCESS;
}
static void cleanupFilesetIterator(SFilesetIter* pIter) {
taosArrayDestroy(pIter->pFileList);
}
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pIter->order);
int32_t step = asc ? 1 : -1;
......@@ -313,7 +339,15 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
pIter->order = order;
pIter->index = -1;
pIter->numOfBlocks = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} else {
taosArrayClear(pIter->blockList);
}
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) {
taosArrayDestroy(pIter->blockList);
}
static void initReaderStatus(SReaderStatus* pStatus) {
......@@ -2515,6 +2549,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
tRowMergerInit(&merge, pRow, pReader->pSchema);
doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge);
}
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
......@@ -2651,6 +2686,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
}
doAppendOneRow(pBlock, pReader, pTSRow);
taosMemoryFree(pTSRow);
// no data in buffer, return immediately
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
......@@ -2778,11 +2814,24 @@ void tsdbReaderClose(STsdbReader* pReader) {
return;
}
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
taosMemoryFreeClear(pSupInfo->plist);
taosMemoryFree(pSupInfo->colIds);
taosArrayDestroy(pSupInfo->pColAgg);
for(int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
if (pSupInfo->buildBuf[i] != NULL) {
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
}
}
taosMemoryFree(pSupInfo->buildBuf);
cleanupFilesetIterator(&pReader->status.fileIter);
cleanupDataBlockIterator(&pReader->status.blockIter);
destroyBlockScanInfo(pReader->status.pTableMap);
blockDataDestroy(pReader->pResBlock);
taosMemoryFreeClear(pReader->suppInfo.plist);
taosArrayDestroy(pReader->suppInfo.pColAgg);
taosMemoryFree(pReader->suppInfo.slotIds);
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
......
......@@ -184,7 +184,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return code;
}
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion) {
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
......@@ -4052,7 +4053,7 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea
static SArray* extractColumnInfo(SNodeList* pNodeList);
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0);
int32_t code = metaGetTableEntryByUid(&mr, uid);
......@@ -4076,10 +4077,20 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
}
metaReaderClear(&mr);
return TSDB_CODE_SUCCESS;
}
static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) {
taosMemoryFreeClear(pTaskInfo->schemaVer.dbname);
if (pTaskInfo->schemaVer.sw == NULL) {
return;
}
taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema);
taosMemoryFree(pTaskInfo->schemaVer.sw);
taosMemoryFree(pTaskInfo->schemaVer.tablename);
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
taosArrayClear(pTableListInfo->pGroupList);
SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
......@@ -4253,7 +4264,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL;
}
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
return NULL;
......@@ -4271,7 +4282,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo->code = code;
return NULL;
}
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo);
code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
return NULL;
......@@ -4368,7 +4379,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
// return NULL;
// }
int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo);
int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
......@@ -4867,11 +4878,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
cleanupTableSchemaInfo(pTaskInfo);
taosMemoryFree(pTaskInfo->schemaVer.dbname);
taosMemoryFree(pTaskInfo->schemaVer.tablename);
taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo);
......
......@@ -436,7 +436,7 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
char tbName[TSDB_TABLE_NAME_LEN] = {0};
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
if (dbFName[0] && tbName[0]) {
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册