未验证 提交 ffe4453b 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #14736 from taosdata/feature/3_liaohj

refactor: do some internal refactor and fix some memory leak.
...@@ -110,7 +110,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -110,7 +110,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @param tversion * @param tversion
* @return * @return
*/ */
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion); int32_t* tversion);
/** /**
......
...@@ -222,8 +222,8 @@ typedef struct SRequestObj { ...@@ -222,8 +222,8 @@ typedef struct SRequestObj {
SArray* tableList; SArray* tableList;
SQueryExecMetric metric; SQueryExecMetric metric;
SRequestSendRecvBody body; SRequestSendRecvBody body;
bool stableQuery; bool stableQuery; // todo refactor
bool validateOnly; bool validateOnly; // todo refactor
bool killed; bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
...@@ -247,9 +247,9 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo); ...@@ -247,9 +247,9 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo);
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq); int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code); void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly); SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly); TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly); void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res; SMqRspObj* msg = (SMqRspObj*)res;
...@@ -297,7 +297,7 @@ int32_t releaseTscObj(int64_t rid); ...@@ -297,7 +297,7 @@ int32_t releaseTscObj(int64_t rid);
uint64_t generateRequestId(); uint64_t generateRequestId();
void* createRequest(STscObj* pObj, int32_t type); void* createRequest(uint64_t connId, int32_t type);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);
SRequestObj* acquireRequest(int64_t rid); SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid); int32_t releaseRequest(int64_t rid);
...@@ -318,13 +318,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); ...@@ -318,13 +318,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType); uint16_t port, int connType);
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly); SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb); int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList); int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest);
void taos_close_internal(void* taos); void taos_close_internal(void* taos);
......
...@@ -49,13 +49,8 @@ void cleanupTscQhandle() { ...@@ -49,13 +49,8 @@ void cleanupTscQhandle() {
// destroy handle // destroy handle
taosCleanUpScheduler(tscQhandle); taosCleanUpScheduler(tscQhandle);
} }
static int32_t registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return terrno;
}
static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
// connection has been released already, abort creating request. // connection has been released already, abort creating request.
pRequest->self = taosAddRef(clientReqRefPool, pRequest); pRequest->self = taosAddRef(clientReqRefPool, pRequest);
...@@ -246,29 +241,34 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon ...@@ -246,29 +241,34 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); } int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
void *createRequest(STscObj *pObj, int32_t type) { void *createRequest(uint64_t connId, int32_t type) {
assert(pObj != NULL);
SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj)); SRequestObj *pRequest = (SRequestObj *)taosMemoryCalloc(1, sizeof(SRequestObj));
if (NULL == pRequest) { if (NULL == pRequest) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
STscObj* pTscObj = acquireTscObj(connId);
if (pTscObj == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
pRequest->resType = RES_TYPE__QUERY; pRequest->resType = RES_TYPE__QUERY;
pRequest->pDb = getDbOfConnection(pObj);
pRequest->requestId = generateRequestId(); pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampUs(); pRequest->metric.start = taosGetTimestampUs();
pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default
pRequest->type = type; pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->pDb = getDbOfConnection(pTscObj);
pRequest->pTscObj = pTscObj;
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE; pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;
tsem_init(&pRequest->body.rspSem, 0, 0); tsem_init(&pRequest->body.rspSem, 0, 0);
if (registerRequest(pRequest)) { if (registerRequest(pRequest, pTscObj)) {
doDestroyRequest(pRequest); doDestroyRequest(pRequest);
return NULL; return NULL;
} }
...@@ -327,8 +327,8 @@ void doDestroyRequest(void *p) { ...@@ -327,8 +327,8 @@ void doDestroyRequest(void *p) {
if (pRequest->self) { if (pRequest->self) {
deregisterRequest(pRequest); deregisterRequest(pRequest);
} }
taosMemoryFree(pRequest);
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
} }
...@@ -338,7 +338,6 @@ void destroyRequest(SRequestObj *pRequest) { ...@@ -338,7 +338,6 @@ void destroyRequest(SRequestObj *pRequest) {
} }
taos_stop_query(pRequest); taos_stop_query(pRequest);
removeRequest(pRequest->self); removeRequest(pRequest->self);
} }
......
...@@ -148,29 +148,49 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas ...@@ -148,29 +148,49 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType); return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
} }
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) { int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest) {
*pRequest = createRequest(pTscObj, TSDB_SQL_SELECT); *pRequest = createRequest(connId, TSDB_SQL_SELECT);
if (*pRequest == NULL) { if (*pRequest == NULL) {
tscError("failed to malloc sqlObj"); tscError("failed to malloc sqlObj, %s", sql);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1); (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
if ((*pRequest)->sqlstr == NULL) { if ((*pRequest)->sqlstr == NULL) {
tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self); tscError("0x%" PRIx64 " failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
(*pRequest)->msgBuf = strdup("failed to prepare sql string buffer"); destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen); strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
(*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlstr[sqlLen] = 0;
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql;
if (param == NULL) {
SSyncQueryParam* pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (pParam == NULL) {
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
pParam->pRequest = (*pRequest);
param = pParam;
}
(*pRequest)->body.param = param;
STscObj* pTscObj = (*pRequest)->pTscObj;
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
sizeof((*pRequest)->self))) { sizeof((*pRequest)->self))) {
tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
destroyRequest(*pRequest); destroyRequest(*pRequest);
*pRequest = NULL; *pRequest = NULL;
tscError("put request to request hash failed");
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
...@@ -882,18 +902,16 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue ...@@ -882,18 +902,16 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
return pRequest; return pRequest;
} }
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) { SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL; SQuery* pQuery = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); int32_t code = buildRequest(connId, sql, sqlLen, NULL, validateOnly, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return NULL; return NULL;
} }
pRequest->validateOnly = validateOnly;
code = parseSql(pRequest, false, &pQuery, NULL); code = parseSql(pRequest, false, &pQuery, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code; pRequest->code = code;
...@@ -1044,19 +1062,20 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) { ...@@ -1044,19 +1062,20 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool validateOnly) { // todo remove it soon
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t retryNum = 0; int32_t retryNum = 0;
int32_t code = 0; int32_t code = 0;
do { do {
destroyRequest(pRequest); destroyRequest(pRequest);
pRequest = launchQuery(pTscObj, sql, sqlLen, validateOnly); pRequest = launchQuery(connId, sql, sqlLen, validateOnly);
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break; break;
} }
code = refreshMeta(pTscObj, pRequest); code = refreshMeta(pRequest->pTscObj, pRequest);
if (code) { if (code) {
pRequest->code = code; pRequest->code = code;
break; break;
...@@ -1064,7 +1083,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool valid ...@@ -1064,7 +1083,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen, bool valid
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES); } while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) { if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
removeMeta(pTscObj, pRequest->tableList); removeMeta(pRequest->pTscObj, pRequest->tableList);
} }
return pRequest; return pRequest;
...@@ -1119,7 +1138,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t ...@@ -1119,7 +1138,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
return pTscObj; return pTscObj;
} }
SRequestObj* pRequest = createRequest(pTscObj, TDMT_MND_CONNECT); SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT);
if (pRequest == NULL) { if (pRequest == NULL) {
destroyTscObj(pTscObj); destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -1446,15 +1465,10 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU ...@@ -1446,15 +1465,10 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
return NULL; return NULL;
} }
SSyncQueryParam* pParam = pRequest->body.param;
if (NULL == pParam) {
pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&pParam->sem, 0, 0);
}
// convert ucs4 to native multi-bytes string // convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4; pResultInfo->convertUcs4 = convertUcs4;
SSyncQueryParam* pParam = pRequest->body.param;
taos_fetch_rows_a(pRequest, syncFetchFn, pParam); taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
tsem_wait(&pParam->sem); tsem_wait(&pParam->sem);
} }
...@@ -2026,22 +2040,9 @@ void syncQueryFn(void* param, void* res, int32_t code) { ...@@ -2026,22 +2040,9 @@ void syncQueryFn(void* param, void* res, int32_t code) {
tsem_post(&pParam->sem); tsem_post(&pParam->sem);
} }
void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) { void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
if (NULL == taos) { if (sql == NULL || NULL == fp) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
fp(param, NULL, terrno);
return;
}
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (pTscObj == NULL || sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
if (pTscObj) {
releaseTscObj(rid);
} else {
terrno = TSDB_CODE_TSC_DISCONNECTED;
}
fp(param, NULL, terrno); fp(param, NULL, terrno);
return; return;
} }
...@@ -2050,26 +2051,20 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void* ...@@ -2050,26 +2051,20 @@ void taosAsyncQueryImpl(TAOS* taos, const char* sql, __taos_async_fn_t fp, void*
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
releaseTscObj(rid);
fp(param, NULL, terrno); fp(param, NULL, terrno);
return; return;
} }
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); int32_t code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
releaseTscObj(rid);
fp(param, NULL, terrno); fp(param, NULL, terrno);
return; return;
} }
pRequest->validateOnly = validateOnly;
pRequest->body.queryFp = fp; pRequest->body.queryFp = fp;
pRequest->body.param = param;
doAsyncQuery(pRequest, false); doAsyncQuery(pRequest, false);
releaseTscObj(rid);
} }
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
...@@ -2078,36 +2073,22 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) { ...@@ -2078,36 +2073,22 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
return NULL; return NULL;
} }
int64_t rid = *(int64_t*)taos;
STscObj* pTscObj = acquireTscObj(rid);
if (pTscObj == NULL || sql == NULL) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
#if SYNC_ON_TOP_OF_ASYNC #if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taosAsyncQueryImpl((TAOS*)&rid, sql, syncQueryFn, param, validateOnly); taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
tsem_wait(&param->sem); tsem_wait(&param->sem);
releaseTscObj(rid);
return param->pRequest; return param->pRequest;
#else #else
size_t sqlLen = strlen(sql); size_t sqlLen = strlen(sql);
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
releaseTscObj(rid);
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
return NULL; return NULL;
} }
TAOS_RES* pRes = execQuery(pTscObj, sql, sqlLen, validateOnly); TAOS_RES* pRes = execQuery(connId, sql, sqlLen, validateOnly);
releaseTscObj(rid);
return pRes; return pRes;
#endif #endif
} }
...@@ -479,7 +479,6 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -479,7 +479,6 @@ void taos_stop_query(TAOS_RES *res) {
} }
schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED); schedulerFreeJob(&pRequest->body.queryJob, TSDB_CODE_TSC_QUERY_KILLED);
tscDebug("request %" PRIx64 " killed", pRequest->requestId); tscDebug("request %" PRIx64 " killed", pRequest->requestId);
} }
...@@ -706,7 +705,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -706,7 +705,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
} }
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) { void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
taosAsyncQueryImpl(taos, sql, fp, param, false); int64_t connId = *(int64_t*)taos;
taosAsyncQueryImpl(connId, sql, fp, param, false);
} }
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) { int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
...@@ -915,7 +915,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -915,7 +915,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return terrno; return terrno;
} }
int64_t rid = *(int64_t *)taos; int64_t connId = *(int64_t *)taos;
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
int32_t code = 0; int32_t code = 0;
SRequestObj * pRequest = NULL; SRequestObj * pRequest = NULL;
...@@ -933,12 +933,14 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -933,12 +933,14 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
STscObj *pTscObj = acquireTscObj(rid); char *sql = "taos_load_table_info";
if (pTscObj == NULL) { code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
terrno = TSDB_CODE_TSC_DISCONNECTED; if (code != TSDB_CODE_SUCCESS) {
return terrno; terrno = code;
goto _return;
} }
STscObj* pTscObj = pRequest->pTscObj;
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta); code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
if (code) { if (code) {
goto _return; goto _return;
...@@ -950,36 +952,22 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -950,36 +952,22 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
goto _return; goto _return;
} }
char *sql = "taos_load_table_info";
code = buildRequest(pTscObj, sql, strlen(sql), &pRequest);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _return;
}
SSyncQueryParam param = {0};
tsem_init(&param.sem, 0, 0);
param.pRequest = pRequest;
SRequestConnInfo conn = { SRequestConnInfo conn = {
.pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; .pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, &param, NULL); code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, NULL, NULL);
if (code) { if (code) {
goto _return; goto _return;
} }
tsem_wait(&param.sem); SSyncQueryParam* pParam = pRequest->body.param;
tsem_wait(&pParam->sem);
_return: _return:
taosArrayDestroy(catalogReq.pTableMeta); taosArrayDestroy(catalogReq.pTableMeta);
destroyRequest(pRequest); destroyRequest(pRequest);
releaseTscObj(rid);
return code; return code;
} }
......
...@@ -2442,22 +2442,15 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2442,22 +2442,15 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL; return NULL;
} }
int64_t rid = *(int64_t*)taos; SRequestObj* request = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
STscObj* pTscObj = acquireTscObj(rid);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
uError("SML:taos_schemaless_insert invalid taos");
return NULL;
}
SRequestObj* request = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT);
if(!request){ if(!request){
releaseTscObj(rid);
uError("SML:taos_schemaless_insert error request is null"); uError("SML:taos_schemaless_insert error request is null");
return NULL; return NULL;
} }
int batchs = 0; int batchs = 0;
STscObj* pTscObj = request->pTscObj;
pTscObj->schemalessType = 1; pTscObj->schemalessType = 1;
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf}; SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
...@@ -2507,7 +2500,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr ...@@ -2507,7 +2500,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
batchs = ceil(((double)numLines) / LINE_BATCH); batchs = ceil(((double)numLines) / LINE_BATCH);
for (int i = 0; i < batchs; ++i) { for (int i = 0; i < batchs; ++i) {
SRequestObj* req = (SRequestObj*)createRequest(pTscObj, TSDB_SQL_INSERT); SRequestObj* req = (SRequestObj*)createRequest(pTscObj->id, TSDB_SQL_INSERT);
if(!req){ if(!req){
request->code = TSDB_CODE_OUT_OF_MEMORY; request->code = TSDB_CODE_OUT_OF_MEMORY;
uError("SML:taos_schemaless_insert error request is null"); uError("SML:taos_schemaless_insert error request is null");
...@@ -2549,6 +2542,5 @@ end: ...@@ -2549,6 +2542,5 @@ end:
// ((STscObj *)taos)->schemalessType = 0; // ((STscObj *)taos)->schemalessType = 0;
pTscObj->schemalessType = 1; pTscObj->schemalessType = 1;
uDebug("resultend:%s", request->msgBuf); uDebug("resultend:%s", request->msgBuf);
releaseTscObj(rid);
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
...@@ -5,6 +5,14 @@ ...@@ -5,6 +5,14 @@
#include "clientStmt.h" #include "clientStmt.h"
static int32_t stmtCreateRequest(STscStmt* pStmt) {
if (pStmt->exec.pRequest == NULL) {
return buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest);
} else {
return TSDB_CODE_SUCCESS;
}
}
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) { int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
int32_t code = 0; int32_t code = 0;
...@@ -217,9 +225,7 @@ int32_t stmtParseSql(STscStmt* pStmt) { ...@@ -217,9 +225,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
.getExecInfoFn = stmtGetExecInfo, .getExecInfoFn = stmtGetExecInfo,
}; };
if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb)); STMT_ERR_RET(parseSql(pStmt->exec.pRequest, false, &pStmt->sql.pQuery, &stmtCb));
...@@ -532,9 +538,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { ...@@ -532,9 +538,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb, STMT_ERR_RET(qCreateSName(&pStmt->bInfo.sname, tbName, pStmt->taos->acctId, pStmt->exec.pRequest->pDb,
pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen)); pStmt->exec.pRequest->msgBuf, pStmt->exec.pRequest->msgBufLen));
...@@ -625,9 +629,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) { ...@@ -625,9 +629,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
...@@ -873,9 +875,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { ...@@ -873,9 +875,7 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
...@@ -905,9 +905,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) { ...@@ -905,9 +905,7 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
...@@ -933,10 +931,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) { ...@@ -933,10 +931,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
} }
...@@ -969,9 +964,7 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) { ...@@ -969,9 +964,7 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
if (NULL == pStmt->exec.pRequest) { STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERR_RET(stmtParseSql(pStmt));
......
...@@ -2275,13 +2275,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -2275,13 +2275,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -2321,6 +2315,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -2321,6 +2315,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.suid = req.suid; pReq.suid = req.suid;
pReq.source = 1; pReq.source = 1;
STscObj* pTscObj = pRequest->pTscObj;
SName tableName; SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
...@@ -2359,13 +2355,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -2359,13 +2355,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -2387,6 +2377,9 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -2387,6 +2377,9 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.igNotExists = true; pReq.igNotExists = true;
pReq.source = 1; pReq.source = 1;
pReq.suid = req.suid; pReq.suid = req.suid;
STscObj* pTscObj = pRequest->pTscObj;
SName tableName; SName tableName;
tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name);
...@@ -2428,21 +2421,15 @@ static void destroyCreateTbReqBatch(void* data) { ...@@ -2428,21 +2421,15 @@ static void destroyCreateTbReqBatch(void* data) {
taosArrayDestroy(pTbBatch->req.pArray); taosArrayDestroy(pTbBatch->req.pArray);
} }
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj *pRequest = NULL;
SQuery* pQuery = NULL; SQuery *pQuery = NULL;
SHashObj* pVgroupHashmap = NULL; SHashObj *pVgroupHashmap = NULL;
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest); code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -2460,8 +2447,10 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -2460,8 +2447,10 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
SVCreateTbReq* pCreateReq = NULL; STscObj* pTscObj = pRequest->pTscObj;
SCatalog* pCatalog = NULL;
SVCreateTbReq *pCreateReq = NULL;
SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
...@@ -2545,21 +2534,15 @@ static void destroyDropTbReqBatch(void* data) { ...@@ -2545,21 +2534,15 @@ static void destroyDropTbReqBatch(void* data) {
taosArrayDestroy(pTbBatch->req.pArray); taosArrayDestroy(pTbBatch->req.pArray);
} }
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
SVDropTbBatchReq req = {0}; SVDropTbBatchReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj *pRequest = NULL;
SQuery* pQuery = NULL; SQuery *pQuery = NULL;
SHashObj* pVgroupHashmap = NULL; SHashObj *pVgroupHashmap = NULL;
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -2577,8 +2560,10 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -2577,8 +2560,10 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
SVDropTbReq* pDropReq = NULL; STscObj* pTscObj = pRequest->pTscObj;
SCatalog* pCatalog = NULL;
SVDropTbReq *pDropReq = NULL;
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
...@@ -2649,22 +2634,17 @@ end: ...@@ -2649,22 +2634,17 @@ end:
return code; return code;
} }
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};
SDecoder coder = {0}; SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj *pRequest = NULL;
SQuery* pQuery = NULL; SQuery *pQuery = NULL;
SArray* pArray = NULL; SArray *pArray = NULL;
SVgDataBlocks* pVgData = NULL; SVgDataBlocks *pVgData = NULL;
STscObj* pTscObj = acquireTscObj(*(int64_t*)taos);
if (NULL == pTscObj) {
code = TSDB_CODE_TSC_DISCONNECTED;
goto end;
}
code = buildRequest(pTscObj, "", 0, &pRequest); code = buildRequest(*(int64_t*) taos, "", 0, NULL, false, &pRequest);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
...@@ -2687,6 +2667,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { ...@@ -2687,6 +2667,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end; goto end;
} }
STscObj* pTscObj = pRequest->pTscObj;
SCatalog* pCatalog = NULL; SCatalog* pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -59,7 +59,6 @@ typedef struct SBlockLoadSuppInfo { ...@@ -59,7 +59,6 @@ typedef struct SBlockLoadSuppInfo {
SColumnDataAgg tsColAgg; SColumnDataAgg tsColAgg;
SColumnDataAgg** plist; SColumnDataAgg** plist;
int16_t* colIds; // column ids for loading file block data int16_t* colIds; // column ids for loading file block data
int32_t* slotIds; // colId to slotId
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
...@@ -183,7 +182,6 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -183,7 +182,6 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
return NULL; return NULL;
} }
// todo apply the lastkey of table check to avoid to load header file
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
...@@ -218,6 +216,30 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) { ...@@ -218,6 +216,30 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
} }
} }
static void destroyBlockScanInfo(SHashObj* pTableMap) {
STableBlockScanInfo* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
p->iterInit = false;
p->iiter.hasVal = false;
if (p->iter.iter != NULL) {
tsdbTbDataIterDestroy(p->iter.iter);
p->iter.iter = NULL;
}
if (p->iiter.iter != NULL) {
tsdbTbDataIterDestroy(p->iiter.iter);
p->iiter.iter = NULL;
}
taosArrayDestroy(p->delSkyline);
p->delSkyline = NULL;
}
taosHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
ASSERT(pWindow != NULL); ASSERT(pWindow != NULL);
return pWindow->skey > pWindow->ekey; return pWindow->skey > pWindow->ekey;
...@@ -265,6 +287,10 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFSt ...@@ -265,6 +287,10 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void cleanupFilesetIterator(SFilesetIter* pIter) {
taosArrayDestroy(pIter->pFileList);
}
static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pIter->order); bool asc = ASCENDING_TRAVERSE(pIter->order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
...@@ -313,7 +339,15 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { ...@@ -313,7 +339,15 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
pIter->order = order; pIter->order = order;
pIter->index = -1; pIter->index = -1;
pIter->numOfBlocks = -1; pIter->numOfBlocks = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} else {
taosArrayClear(pIter->blockList);
}
}
static void cleanupDataBlockIterator(SDataBlockIter* pIter) {
taosArrayDestroy(pIter->blockList);
} }
static void initReaderStatus(SReaderStatus* pStatus) { static void initReaderStatus(SReaderStatus* pStatus) {
...@@ -2515,6 +2549,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe ...@@ -2515,6 +2549,7 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe
tRowMergerInit(&merge, pRow, pReader->pSchema); tRowMergerInit(&merge, pRow, pReader->pSchema);
doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader); doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow); tRowMergerGetRow(&merge, pTSRow);
tRowMergerClear(&merge);
} }
void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
...@@ -2651,6 +2686,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -2651,6 +2686,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
} }
doAppendOneRow(pBlock, pReader, pTSRow); doAppendOneRow(pBlock, pReader, pTSRow);
taosMemoryFree(pTSRow);
// no data in buffer, return immediately // no data in buffer, return immediately
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
...@@ -2778,11 +2814,24 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -2778,11 +2814,24 @@ void tsdbReaderClose(STsdbReader* pReader) {
return; return;
} }
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
taosMemoryFreeClear(pSupInfo->plist);
taosMemoryFree(pSupInfo->colIds);
taosArrayDestroy(pSupInfo->pColAgg);
for(int32_t i = 0; i < blockDataGetNumOfCols(pReader->pResBlock); ++i) {
if (pSupInfo->buildBuf[i] != NULL) {
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
}
}
taosMemoryFree(pSupInfo->buildBuf);
cleanupFilesetIterator(&pReader->status.fileIter);
cleanupDataBlockIterator(&pReader->status.blockIter);
destroyBlockScanInfo(pReader->status.pTableMap);
blockDataDestroy(pReader->pResBlock); blockDataDestroy(pReader->pResBlock);
taosMemoryFreeClear(pReader->suppInfo.plist);
taosArrayDestroy(pReader->suppInfo.pColAgg);
taosMemoryFree(pReader->suppInfo.slotIds);
#if 0 #if 0
// if (pReader->status.pTableScanInfo != NULL) { // if (pReader->status.pTableScanInfo != NULL) {
......
...@@ -184,7 +184,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -184,7 +184,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
return code; return code;
} }
int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion) { int32_t* tversion) {
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <executorimpl.h>
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
...@@ -4053,7 +4054,7 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea ...@@ -4053,7 +4054,7 @@ static STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRea
static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList);
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { int32_t extractTableSchemaInfo(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->meta, 0); metaReaderInit(&mr, pHandle->meta, 0);
int32_t code = metaGetTableEntryByUid(&mr, uid); int32_t code = metaGetTableEntryByUid(&mr, uid);
...@@ -4077,10 +4078,20 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI ...@@ -4077,10 +4078,20 @@ int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskI
} }
metaReaderClear(&mr); metaReaderClear(&mr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void cleanupTableSchemaInfo(SExecTaskInfo* pTaskInfo) {
taosMemoryFreeClear(pTaskInfo->schemaVer.dbname);
if (pTaskInfo->schemaVer.sw == NULL) {
return;
}
taosMemoryFree(pTaskInfo->schemaVer.sw->pSchema);
taosMemoryFree(pTaskInfo->schemaVer.sw);
taosMemoryFree(pTaskInfo->schemaVer.tablename);
}
static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) { static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) {
taosArrayClear(pTableListInfo->pGroupList); taosArrayClear(pTableListInfo->pGroupList);
SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t)); SArray* sortSupport = taosArrayInit(groupNum, sizeof(uint64_t));
...@@ -4254,7 +4265,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4254,7 +4265,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) { if (code) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
...@@ -4272,7 +4283,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4272,7 +4283,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
code = extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo);
if (code) { if (code) {
pTaskInfo->code = terrno; pTaskInfo->code = terrno;
return NULL; return NULL;
...@@ -4369,7 +4380,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4369,7 +4380,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
// return NULL; // return NULL;
// } // }
int32_t code = extractTableSchemaVersion(pHandle, pScanNode->uid, pTaskInfo); int32_t code = extractTableSchemaInfo(pHandle, pScanNode->uid, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
...@@ -4870,11 +4881,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -4870,11 +4881,8 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
doDestroyTableList(&pTaskInfo->tableqinfoList); doDestroyTableList(&pTaskInfo->tableqinfoList);
destroyOperatorInfo(pTaskInfo->pRoot); destroyOperatorInfo(pTaskInfo->pRoot);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents); cleanupTableSchemaInfo(pTaskInfo);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
taosMemoryFree(pTaskInfo->schemaVer.dbname);
taosMemoryFree(pTaskInfo->schemaVer.tablename);
taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo); taosMemoryFreeClear(pTaskInfo);
......
...@@ -436,7 +436,7 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { ...@@ -436,7 +436,7 @@ void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN] = {0}; char dbFName[TSDB_DB_FNAME_LEN] = {0};
char tbName[TSDB_TABLE_NAME_LEN] = {0}; char tbName[TSDB_TABLE_NAME_LEN] = {0};
qGetQueriedTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
if (dbFName[0] && tbName[0]) { if (dbFName[0] && tbName[0]) {
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName); sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册