未验证 提交 313d4e97 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #15161 from taosdata/fix/TD-15197

fix: fix stmt memory leak
......@@ -50,6 +50,7 @@ bool tNameIsValid(const SName* name);
const char* tNameGetTableName(const SName* name);
int32_t tNameGetDbName(const SName* name, char* dst);
const char* tNameGetDbNameP(const SName* name);
int32_t tNameGetFullDbName(const SName* name, char* dst);
......
......@@ -193,7 +193,7 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* ctx);
/**
......@@ -205,7 +205,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo);
int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp);
......@@ -260,6 +260,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define REQUEST_TOTAL_EXEC_TIMES 2
#define IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
#define qFatal(...) \
do { \
if (qDebugFlag & DEBUG_FATAL) { \
......
......@@ -124,18 +124,16 @@ void *rpcReallocCont(void *ptr, int32_t contLen);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
int rpcSendResponse(const SRpcMsg *pMsg);
int rpcRegisterBrokenLinkArg(SRpcMsg *msg);
int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
// These functions will not be called in the child process
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void* rpcAllocHandle();
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void *rpcAllocHandle();
#ifdef __cplusplus
}
......
......@@ -33,7 +33,7 @@ typedef struct {
SDiskSize size;
} SDiskSpace;
bool taosCheckSystemIsSmallEnd();
bool taosCheckSystemIsLittleEnd();
void taosGetSystemInfo();
int32_t taosGetEmail(char *email, int32_t maxLen);
int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen);
......
......@@ -72,7 +72,6 @@ typedef struct SStmtBindInfo {
typedef struct SStmtExecInfo {
int32_t affectedRows;
SRequestObj* pRequest;
SHashObj* pVgHash;
SHashObj* pBlockHash;
bool autoCreateTbl;
} SStmtExecInfo;
......@@ -88,6 +87,7 @@ typedef struct SStmtSQLInfo {
SArray* nodeList;
SStmtQueryResInfo queryRes;
bool autoCreateTbl;
SHashObj* pVgHash;
} SStmtSQLInfo;
typedef struct STscStmt {
......
......@@ -88,7 +88,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) {
return false;
}
return true;
......
......@@ -590,6 +590,11 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
return code;
}
void freeVgList(void *list) {
SArray* pList = *(SArray**)list;
taosArrayDestroy(pList);
}
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
SArray* pDbVgList = NULL;
SArray* pQnodeList = NULL;
......@@ -641,7 +646,7 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
_return:
taosArrayDestroy(pDbVgList);
taosArrayDestroyEx(pDbVgList, freeVgList);
taosArrayDestroy(pQnodeList);
return code;
......
......@@ -6,11 +6,16 @@
#include "clientStmt.h"
static int32_t stmtCreateRequest(STscStmt* pStmt) {
int32_t code = 0;
if (pStmt->exec.pRequest == NULL) {
return buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest);
} else {
return TSDB_CODE_SUCCESS;
code = buildRequest(pStmt->taos->id, pStmt->sql.sqlStr, pStmt->sql.sqlLen, NULL, false, &pStmt->exec.pRequest);
if (TSDB_CODE_SUCCESS == code) {
pStmt->exec.pRequest->syncQuery = true;
}
}
return code;
}
int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
......@@ -155,7 +160,7 @@ int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags,
int32_t stmtUpdateExecInfo(TAOS_STMT* stmt, SHashObj* pVgHash, SHashObj* pBlockHash, bool autoCreateTbl) {
STscStmt* pStmt = (STscStmt*)stmt;
pStmt->exec.pVgHash = pVgHash;
pStmt->sql.pVgHash = pVgHash;
pStmt->exec.pBlockHash = pBlockHash;
pStmt->exec.autoCreateTbl = autoCreateTbl;
......@@ -177,7 +182,7 @@ int32_t stmtUpdateInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, char
int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHash) {
STscStmt* pStmt = (STscStmt*)stmt;
*pVgHash = pStmt->exec.pVgHash;
*pVgHash = pStmt->sql.pVgHash;
*pBlockHash = pStmt->exec.pBlockHash;
return TSDB_CODE_SUCCESS;
......@@ -308,6 +313,8 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
taosMemoryFree(pStmt->sql.sqlStr);
qDestroyQuery(pStmt->sql.pQuery);
taosArrayDestroy(pStmt->sql.nodeList);
taosHashCleanup(pStmt->sql.pVgHash);
pStmt->sql.pVgHash = NULL;
void* pIter = taosHashIterate(pStmt->sql.pTableCache, NULL);
while (pIter) {
......@@ -340,7 +347,7 @@ int32_t stmtRebuildDataBlock(STscStmt* pStmt, STableDataBlocks* pDataBlock, STab
STMT_ERR_RET(catalogGetTableHashVgroup(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &vgInfo));
STMT_ERR_RET(
taosHashPut(pStmt->exec.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
taosHashPut(pStmt->sql.pVgHash, (const char*)&vgInfo.vgId, sizeof(vgInfo.vgId), (char*)&vgInfo, sizeof(vgInfo)));
STMT_ERR_RET(qRebuildStmtDataBlock(newBlock, pDataBlock, uid, vgInfo.vgId));
......@@ -680,6 +687,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
if (pStmt->sql.pQuery->haveResultSet) {
setResSchemaInfo(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->pResSchema,
pStmt->sql.pQuery->numOfResCols);
taosMemoryFreeClear(pStmt->sql.pQuery->pResSchema);
setResPrecision(&pStmt->exec.pRequest->body.resInfo, pStmt->sql.pQuery->precision);
}
......@@ -804,7 +812,7 @@ int stmtExec(TAOS_STMT* stmt) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, NULL);
} else {
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->exec.pVgHash, pStmt->exec.pBlockHash));
STMT_ERR_RET(qBuildStmtOutput(pStmt->sql.pQuery, pStmt->sql.pVgHash, pStmt->exec.pBlockHash));
launchQueryImpl(pStmt->exec.pRequest, pStmt->sql.pQuery, true, (autoCreateTbl ? (void**)&pRsp : NULL));
}
......@@ -847,9 +855,10 @@ _return:
int stmtClose(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_RET(stmtCleanSQLInfo(pStmt));
stmtCleanSQLInfo(pStmt);
taosMemoryFree(stmt);
return TSDB_CODE_SUCCESS;
}
const char* stmtErrstr(TAOS_STMT* stmt) {
......
......@@ -51,15 +51,15 @@ int32_t tsNumOfShmThreads = 1;
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 1;
int32_t tsNumOfMnodeQueryThreads = 2;
int32_t tsNumOfMnodeQueryThreads = 4;
int32_t tsNumOfMnodeFetchThreads = 1;
int32_t tsNumOfMnodeReadThreads = 1;
int32_t tsNumOfVnodeQueryThreads = 2;
int32_t tsNumOfVnodeQueryThreads = 4;
int32_t tsNumOfVnodeStreamThreads = 2;
int32_t tsNumOfVnodeFetchThreads = 4;
int32_t tsNumOfVnodeWriteThreads = 2;
int32_t tsNumOfVnodeSyncThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 2;
int32_t tsNumOfQnodeQueryThreads = 4;
int32_t tsNumOfQnodeFetchThreads = 4;
int32_t tsNumOfSnodeSharedThreads = 2;
int32_t tsNumOfSnodeUniqueThreads = 2;
......@@ -402,16 +402,16 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
tsNumOfMnodeQueryThreads = tsNumOfCores / 8;
tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 1, 4);
tsNumOfMnodeQueryThreads = tsNumOfCores * 2;
tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 4, 8);
if (cfgAddInt32(pCfg, "numOfMnodeQueryThreads", tsNumOfMnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfMnodeReadThreads = tsNumOfCores / 8;
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeQueryThreads = tsNumOfCores / 4;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
tsNumOfVnodeQueryThreads = tsNumOfCores * 2;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
......@@ -430,8 +430,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;
tsNumOfQnodeQueryThreads = tsNumOfCores / 2;
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 1);
tsNumOfQnodeQueryThreads = tsNumOfCores * 2;
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
......
......@@ -190,6 +190,11 @@ int32_t tNameGetDbName(const SName* name, char* dst) {
return 0;
}
const char* tNameGetDbNameP(const SName* name) {
return &name->dbname[0];
}
int32_t tNameGetFullDbName(const SName* name, char* dst) {
assert(name != NULL && dst != NULL);
snprintf(dst, TSDB_DB_FNAME_LEN, "%d.%s", name->acctId, name->dbname);
......
......@@ -158,8 +158,8 @@ static void taosCleanupArgs() {
}
int main(int argc, char const *argv[]) {
if (!taosCheckSystemIsSmallEnd()) {
printf("failed to start since on non-small-end machines\n");
if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n");
return -1;
}
......
......@@ -460,8 +460,6 @@ typedef struct SCtgOperation {
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
......
......@@ -865,7 +865,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo *pConn, SArray*
tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
if (CTG_IS_SYS_DBNAME(name.dbname)) {
if (IS_SYS_DBNAME(name.dbname)) {
continue;
}
......@@ -936,7 +936,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
if (IS_SYS_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
......@@ -947,7 +947,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) {
CTG_API_ENTER();
if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
if (IS_SYS_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
......
......@@ -132,7 +132,7 @@ void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
if (p && IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
......@@ -694,7 +694,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
if (p && IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
......@@ -727,7 +727,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp)
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
if (p && IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
......@@ -823,7 +823,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
if (p && IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
......@@ -859,7 +859,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
}
char *p = strchr(output->dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
if (p && IS_SYS_DBNAME(p + 1)) {
memmove(output->dbFName, p + 1, strlen(p + 1));
}
......@@ -2123,7 +2123,7 @@ int32_t ctgStartUpdateThread() {
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
if (CTG_IS_SYS_DBNAME(ctx->pName->dbname)) {
if (IS_SYS_DBNAME(ctx->pName->dbname)) {
CTG_FLAG_SET_SYS_DB(ctx->flag);
}
......@@ -2177,7 +2177,7 @@ _return:
}
int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
if (IS_SYS_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
......
......@@ -375,6 +375,8 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -408,6 +410,8 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -447,6 +451,8 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -485,6 +491,8 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -523,6 +531,8 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -564,6 +574,8 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -602,6 +614,8 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const ch
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -640,6 +654,8 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -683,6 +699,8 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -740,6 +758,8 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -785,6 +805,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -825,6 +847,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......@@ -859,6 +883,8 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **ou
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}
......
......@@ -4291,7 +4291,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
REPLACE_NODE(pNew);
} else {
taosMemoryFree(keyBuf);
nodesClearList(groupNew);
nodesDestroyList(groupNew);
metaReaderClear(&mr);
return code;
}
......@@ -4309,7 +4309,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
if (tTagIsJson(data)) {
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
taosMemoryFree(keyBuf);
nodesClearList(groupNew);
nodesDestroyList(groupNew);
metaReaderClear(&mr);
return terrno;
}
......@@ -4332,7 +4332,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
info->groupId = groupId;
groupNum++;
nodesClearList(groupNew);
nodesDestroyList(groupNew);
metaReaderClear(&mr);
}
taosMemoryFree(keyBuf);
......
......@@ -359,6 +359,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p
SScalarParam param = {.columnData = pColInfoData};
fpSet.process(&srcParam, 1, &param);
colDataDestroy(&infoData);
}
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
......@@ -2045,8 +2046,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
int32_t code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, uid:0x%" PRIx64 ", code:%s, %s", suid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s",
pInfo->pCur->mr.me.name, suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
......@@ -2152,16 +2153,39 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
}
}
static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
pInfo->pRes->info.rows = 0;
pOperator->status = OP_EXEC_DONE;
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info;
const char* name = tNameGetTableName(&pInfo->name);
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTags(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && IS_SYS_DBNAME(pInfo->req.db)) {
return sysTableScanUserSTables(pOperator);
} else { // load the meta from mnode of the given epset
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
......@@ -2172,12 +2196,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
strcpy(pInfo->req.user, pInfo->pUser);
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen);
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
......
......@@ -913,8 +913,8 @@ void udfdConnectMnodeThreadFunc(void *args) {
}
int main(int argc, char *argv[]) {
if (!taosCheckSystemIsSmallEnd()) {
printf("failed to start since on non-small-end machines\n");
if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n");
return -1;
}
......
......@@ -707,6 +707,8 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
}
sifFreeRes(ctx.pRes);
SIF_RET(code);
}
......
......@@ -1497,7 +1497,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
pCxt->pVgroupsHashObj = NULL;
pCxt->pTableBlockHashObj = NULL;
pCxt->pTableMeta = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -1554,7 +1553,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
nodesDestroyNode((*pQuery)->pRoot);
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT;
......
......@@ -678,6 +678,7 @@ void qFreeStmtDataBlock(void* pDataBlock) {
return;
}
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pTableMeta);
taosMemoryFreeClear(((STableDataBlocks*)pDataBlock)->pData);
taosMemoryFreeClear(pDataBlock);
}
......
......@@ -866,13 +866,15 @@ STableCfg* tableCfgDup(STableCfg* pCfg) {
memcpy(pNew, pCfg, sizeof(*pNew));
if (NULL != pNew->pComment) {
pNew->pComment = strdup(pNew->pComment);
pNew->pComment = taosMemoryCalloc(pNew->commentLen + 1, 1);
memcpy(pNew->pComment, pCfg->pComment, pNew->commentLen);
}
if (NULL != pNew->pFuncs) {
pNew->pFuncs = taosArrayDup(pNew->pFuncs);
}
if (NULL != pNew->pTags) {
pNew->pTags = strdup(pNew->pTags);
pNew->pTags = taosMemoryCalloc(pNew->tagsLen + 1, 1);
memcpy(pNew->pTags, pCfg->pTags, pNew->tagsLen);
}
int32_t schemaSize = (pCfg->numOfColumns + pCfg->numOfTags) * sizeof(SSchema);
......
......@@ -82,11 +82,16 @@ static int32_t parseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, SParseMetaCa
}
static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
if (IS_VAR_DATA_TYPE(pVal->node.resType.type)) {
taosMemoryFreeClear(pVal->datum.p);
}
if (pParam->is_null && 1 == *(pParam->is_null)) {
pVal->node.resType.type = TSDB_DATA_TYPE_NULL;
pVal->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return TSDB_CODE_SUCCESS;
}
int32_t inputSize = (NULL != pParam->length ? *(pParam->length) : tDataTypes[pParam->buffer_type].bytes);
pVal->node.resType.type = pParam->buffer_type;
pVal->node.resType.bytes = inputSize;
......
......@@ -148,11 +148,12 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
taosMemoryFreeClear(pMsgBody);
}
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
bool persistHandle, void* rpcCtx) {
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
if (NULL == pMsg) {
qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
destroySendMsgInfo(pInfo);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return terrno;
}
......@@ -167,13 +168,15 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
.info.persistHandle = persistHandle,
.code = 0
};
assert(pInfo->fp != NULL);
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
return TSDB_CODE_SUCCESS;
int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
if (code) {
destroySendMsgInfo(pInfo);
}
return code;
}
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
}
......
......@@ -509,7 +509,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msgSendInfo->paramFreeFp = taosMemoryFree;
......@@ -535,7 +535,11 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
_return:
if (msgSendInfo) {
destroySendMsgInfo(msgSendInfo);
}
taosMemoryFree(msg);
SCH_RET(code);
}
......@@ -843,6 +847,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
pMsgSendInfo = NULL;
if (code) {
SCH_ERR_JRET(code);
}
......@@ -919,7 +924,9 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
addr.epSet.numOfEps = 1;
memcpy(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
SCH_ERR_JRET(schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx));
code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
msg = NULL;
SCH_ERR_JRET(code);
return TSDB_CODE_SUCCESS;
......@@ -1087,8 +1094,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)));
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
......
......@@ -102,14 +102,14 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
}
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL};
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task execNode added, execId:%d", execId);
SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
return TSDB_CODE_SUCCESS;
}
......@@ -752,12 +752,18 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
return;
}
int32_t i = 0;
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
while (nodeInfo) {
if (nodeInfo->handle) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
SCH_TASK_DLOG("start to drop task's %dth execNode", i);
} else {
SCH_TASK_DLOG("no need to drop task %dth execNode", i);
}
++i;
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
......
......@@ -226,11 +226,13 @@ typedef struct {
int index;
int nAsync;
uv_async_t* asyncs;
int8_t stop;
} SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
do { \
......@@ -289,14 +291,14 @@ void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle);
void transReleaseCliHandle(void* handle);
void transReleaseSrvHandle(void* handle);
int transReleaseCliHandle(void* handle);
int transReleaseSrvHandle(void* handle);
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg);
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int64_t transAllocHandle();
......
......@@ -25,7 +25,7 @@ void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
*ip = taosGetIpv4FromFqdn(localFqdn);
......@@ -112,7 +112,7 @@ void* rpcMallocCont(int32_t contLen) {
void rpcFreeCont(void* cont) {
if (cont == NULL) return;
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
tTrace("free mem:%p", (char*)cont - TRANS_MSG_OVERHEAD);
tTrace("rpc free cont:%p", (char*)cont - TRANS_MSG_OVERHEAD);
}
void* rpcReallocCont(void* ptr, int32_t contLen) {
......@@ -129,25 +129,20 @@ void* rpcReallocCont(void* ptr, int32_t contLen) {
return st + TRANS_MSG_OVERHEAD;
}
void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
// deprecated api
assert(0);
}
int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
transSendRequest(shandle, pEpSet, pMsg, NULL);
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(shandle, pEpSet, pMsg, NULL);
}
void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
transSendRequest(shandle, pEpSet, pMsg, pCtx);
int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
}
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv(shandle, pEpSet, pMsg, pRsp);
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
......@@ -159,15 +154,15 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(*taosUnRefHandle[type])(handle);
}
void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { transRegisterMsg(msg); }
void rpcReleaseHandle(void* handle, int8_t type) {
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*transReleaseHandle[type])(handle);
return (*transReleaseHandle[type])(handle);
}
void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later
transSetDefaultAddr(thandle, ip, fqdn);
return transSetDefaultAddr(thandle, ip, fqdn);
}
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
......
......@@ -70,6 +70,8 @@ typedef struct SCliThrd {
SCvtAddr cvtAddr;
SCliMsg* stopMsg;
bool quit;
} SCliThrd;
......@@ -761,14 +763,17 @@ void cliConnCb(uv_connect_t* req, int status) {
}
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
pThrd->stopMsg = pMsg;
return;
}
pThrd->stopMsg = NULL;
pThrd->quit = true;
tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg);
destroyConnPool(pThrd->pool);
uv_timer_stop(&pThrd->timer);
uv_walk(pThrd->loop, cliWalkCb, NULL);
// uv_stop(pThrd->loop);
}
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)(pMsg->msg.info.handle);
......@@ -925,6 +930,7 @@ static void cliAsyncCb(uv_async_t* handle) {
if (count >= 2) {
tTrace("cli process batch size:%d", count);
}
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
}
static void* cliWorkThread(void* arg) {
......@@ -1020,6 +1026,7 @@ void cliSendQuit(SCliThrd* thrd) {
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
msg->type = Quit;
transAsyncSend(thrd->asyncPool, &msg->q);
atomic_store_8(&thrd->asyncPool->stop, 1);
}
void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
......@@ -1225,33 +1232,38 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) {
}
return pThrd;
}
void transReleaseCliHandle(void* handle) {
int transReleaseCliHandle(void* handle) {
int idx = -1;
bool valid = false;
SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid);
if (pThrd == NULL) {
return;
return -1;
}
STransMsg tmsg = {.info.handle = handle};
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
cmsg->msg = tmsg;
cmsg->type = Release;
transAsyncSend(pThrd->asyncPool, &cmsg->q);
return;
if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
return -1;
}
return 0;
}
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return -1;
}
bool valid = false;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return -1;
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
......@@ -1276,21 +1288,28 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return -1;
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
}
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return -1;
}
bool valid = false;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle, &valid);
if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return -1;
}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
......@@ -1317,20 +1336,28 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transAsyncSend(pThrd->asyncPool, &(cliMsg->q));
if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) {
tsem_destroy(sem);
taosMemoryFree(sem);
destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1;
}
tsem_wait(sem);
tsem_destroy(sem);
taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return 0;
}
/*
*
**/
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
if (pTransInst == NULL) {
return -1;
}
SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) {
......@@ -1350,9 +1377,14 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
transAsyncSend(thrd->asyncPool, &(cliMsg->q));
if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
destroyCmsg(cliMsg);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return -1;
}
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
}
int64_t transAllocHandle() {
......
......@@ -124,6 +124,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
SConnBuffer* p = connBuf;
if (p->cap == 0) {
p->buf = (char*)taosMemoryCalloc(CAPACITY, sizeof(char));
tTrace("internal malloc mem:%p, size:%d", p->buf, CAPACITY);
p->len = 0;
p->cap = CAPACITY;
p->total = -1;
......@@ -136,7 +137,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
} else {
p->cap = p->total;
p->buf = taosMemoryRealloc(p->buf, p->cap);
tTrace("internal malloc mem:%p, size:%d", p->buf, p->cap);
tTrace("internal realloc mem:%p, size:%d", p->buf, p->cap);
uvBuf->base = p->buf + p->len;
uvBuf->len = p->cap - p->len;
......@@ -176,7 +177,6 @@ int transSetConnOption(uv_tcp_t* stream) {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
pool->index = 0;
pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
......@@ -206,6 +206,9 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool);
}
int transAsyncSend(SAsyncPool* pool, queue* q) {
if (atomic_load_8(&pool->stop) == 1) {
return -1;
}
int idx = pool->index;
idx = idx % pool->nAsync;
// no need mutex here
......@@ -225,6 +228,14 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
}
return uv_async_send(async);
}
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
void transCtxInit(STransCtx* ctx) {
// init transCtx
......@@ -240,7 +251,7 @@ void transCtxCleanup(STransCtx* ctx) {
ctx->freeFunc(iter->val);
iter = taosHashIterate(ctx->args, iter);
}
ctx->freeFunc(ctx->brokenVal.val);
taosHashCleanup(ctx->args);
ctx->args = NULL;
}
......
......@@ -1034,7 +1034,7 @@ void transUnrefSrvHandle(void* handle) {
}
}
void transReleaseSrvHandle(void* handle) {
int transReleaseSrvHandle(void* handle) {
SRpcHandleInfo* info = handle;
SExHandle* exh = info->handle;
int64_t refId = info->refId;
......@@ -1053,16 +1053,16 @@ void transReleaseSrvHandle(void* handle) {
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return 0;
_return1:
tTrace("handle %p failed to send to release handle", exh);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return -1;
_return2:
tTrace("handle %p failed to send to release handle", exh);
return;
return -1;
}
void transSendResponse(const STransMsg* msg) {
int transSendResponse(const STransMsg* msg) {
SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
......@@ -1082,18 +1082,18 @@ void transSendResponse(const STransMsg* msg) {
tGTrace("conn %p start to send resp (1/2)", exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return 0;
_return1:
tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return -1;
_return2:
tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
return;
return -1;
}
void transRegisterMsg(const STransMsg* msg) {
int transRegisterMsg(const STransMsg* msg) {
SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
......@@ -1112,16 +1112,17 @@ void transRegisterMsg(const STransMsg* msg) {
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return 0;
_return1:
tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return -1;
_return2:
tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
return -1;
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
......
......@@ -210,7 +210,7 @@ static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
}
bool taosCheckSystemIsSmallEnd() {
bool taosCheckSystemIsLittleEnd() {
union check {
int16_t i;
char ch[2];
......
......@@ -218,7 +218,7 @@ typedef struct {
} CaseCtrl;
#if 0
CaseCtrl gCaseCtrl = { // default
CaseCtrl gCaseCtrl = {
.precision = TIME_PRECISION_MICRO,
.bindNullNum = 0,
.printCreateTblSql = false,
......@@ -251,7 +251,7 @@ CaseCtrl gCaseCtrl = { // default
#if 1
CaseCtrl gCaseCtrl = {
CaseCtrl gCaseCtrl = { // default
.precision = TIME_PRECISION_MILLI,
.bindNullNum = 0,
.printCreateTblSql = false,
......@@ -299,7 +299,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
.printRes = true,
.runTimes = 0,
.caseRunIdx = -1,
.caseIdx = 23,
.caseIdx = 5,
.caseNum = 1,
.caseRunNum = 1,
};
......@@ -328,7 +328,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
//.optrIdxList = optrIdxList,
//.bindColTypeNum = tListLen(bindColTypeList),
//.bindColTypeList = bindColTypeList,
.caseIdx = 24,
.caseIdx = 8,
.caseNum = 1,
.caseRunNum = 1,
};
......@@ -1384,6 +1384,7 @@ void bpCheckTagFields(TAOS_STMT *stmt, TAOS_MULTI_BIND* pBind) {
}
bpCheckColTagFields(stmt, fieldNum, pFields, gCurCase->bindTagNum, pBind, BP_BIND_TAG);
taosMemoryFree(pFields);
}
void bpCheckColFields(TAOS_STMT *stmt, TAOS_MULTI_BIND* pBind) {
......@@ -1401,12 +1402,13 @@ void bpCheckColFields(TAOS_STMT *stmt, TAOS_MULTI_BIND* pBind) {
}
bpCheckColTagFields(stmt, fieldNum, pFields, gCurCase->bindColNum, pBind, BP_BIND_COL);
taosMemoryFree(pFields);
}
void bpShowBindParam(TAOS_MULTI_BIND *bind, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
TAOS_MULTI_BIND* b = &bind[i];
printf("Bind %d: type[%d],buf[%p],buflen[%d],len[%],null[%d],num[%d]\n",
printf("Bind %d: type[%d],buf[%p],buflen[%d],len[%d],null[%d],num[%d]\n",
i, b->buffer_type, b->buffer, b->buffer_length, b->length ? *b->length : 0, b->is_null ? *b->is_null : 0, b->num);
}
}
......@@ -2596,6 +2598,7 @@ void runAll(TAOS *taos) {
printf("%s Begin\n", gCaseCtrl.caseCatalog);
runCaseList(taos);
#if 0
strcpy(gCaseCtrl.caseCatalog, "Micro DB precision Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.precision = TIME_PRECISION_MICRO;
......@@ -2626,7 +2629,6 @@ void runAll(TAOS *taos) {
runCaseList(taos);
gCaseCtrl.bindRowNum = 0;
#if 0
strcpy(gCaseCtrl.caseCatalog, "Row Num Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.rowNum = 1000;
......@@ -2640,7 +2642,6 @@ void runAll(TAOS *taos) {
gCaseCtrl.runTimes = 2;
runCaseList(taos);
gCaseCtrl.runTimes = 0;
#endif
strcpy(gCaseCtrl.caseCatalog, "Check Param Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
......@@ -2648,19 +2649,20 @@ void runAll(TAOS *taos) {
runCaseList(taos);
gCaseCtrl.checkParamNum = false;
#if 0
strcpy(gCaseCtrl.caseCatalog, "Bind Col Num Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.bindColNum = 6;
runCaseList(taos);
gCaseCtrl.bindColNum = 0;
#endif
/*
strcpy(gCaseCtrl.caseCatalog, "Bind Col Type Test");
printf("%s Begin\n", gCaseCtrl.caseCatalog);
gCaseCtrl.bindColTypeNum = tListLen(bindColTypeList);
gCaseCtrl.bindColTypeList = bindColTypeList;
runCaseList(taos);
#endif
*/
printf("All Test End\n");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册