提交 96eee800 编写于 作者: D dapan1121

feature/qnode

上级 9484d25a
......@@ -48,6 +48,7 @@ typedef struct SScanLogicNode {
uint8_t scanFlag; // denotes reversed scan of data or not
STimeWindow scanRange;
SName tableName;
bool showRewrite;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -171,6 +172,8 @@ typedef SScanPhysiNode SStreamScanPhysiNode;
typedef struct SSystemTableScanPhysiNode {
SScanPhysiNode scan;
SEpSet mgmtEpSet;
bool showRewrite;
int32_t accountId;
} SSystemTableScanPhysiNode;
typedef struct STableScanPhysiNode {
......
......@@ -54,6 +54,7 @@ typedef struct SQuery {
int32_t msgType;
SArray* pDbList;
SArray* pTableList;
bool showRewrite;
} SQuery;
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -29,6 +29,7 @@ typedef struct SPlanContext {
SNode* pAstRoot;
bool topicQuery;
bool streamQuery;
bool showRewrite;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -90,7 +90,7 @@ typedef struct {
int32_t msgType;
void *val;
int32_t (*clone)(void *src, void **dst);
void (*free)(void *arg);
void (*freeFunc)(const void *arg);
} SRpcBrokenlinkVal;
typedef struct {
......
......@@ -199,7 +199,8 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite
};
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
if (code != 0) {
......
......@@ -357,6 +357,13 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
// if free flag is set, client wants to clean the resources
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead);
if (rowsRead < 0) {
terrno = rowsRead;
rpcFreeCont(pRsp);
mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
mndReleaseShowObj((SShowObj*) pShow, true);
return -1;
}
}
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
......
......@@ -1613,7 +1613,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, char *data, int32
SDbObj* pDb = NULL;
if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return 0;
if (pDb == NULL) return terrno;
}
while (numOfRows < rows) {
......
......@@ -449,6 +449,8 @@ typedef struct SSysTableScanInfo {
SEpSet epSet;
tsem_t ready;
int32_t accountId;
bool showRewrite;
SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void *pCur; // cursor for iterate the local table meta store.
SArray *scanCols; // SArray<int16_t> scan column id list
......@@ -655,7 +657,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo);
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
......
......@@ -5489,7 +5489,8 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock*
}
static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) {
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param;
SOperatorInfo* operator = (SOperatorInfo *)param;
SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo *)operator->info;
if (TSDB_CODE_SUCCESS == code) {
pScanResInfo->pRsp = pMsg->pData;
......@@ -5498,6 +5499,8 @@ static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->handle = htobe64(pRsp->handle);
pRsp->compLen = htonl(pRsp->compLen);
} else {
operator->pTaskInfo->code = code;
}
tsem_post(&pScanResInfo->ready);
......@@ -5544,6 +5547,64 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
return pInfo->pRes->info.rows == 0? NULL:pInfo->pRes;
}
EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
int32_t code = TSDB_CODE_SUCCESS;
ENodeType nType = nodeType(pNode);
switch (nType) {
case QUERY_NODE_OPERATOR: {
SOperatorNode *node = (SOperatorNode *)pNode;
if (OP_TYPE_EQUAL == node->opType) {
*(int32_t *)pContext = 1;
return DEAL_RES_CONTINUE;
}
*(int32_t *)pContext = 0;
return DEAL_RES_IGNORE_CHILD;
}
case QUERY_NODE_COLUMN: {
if (1 != *(int32_t *)pContext) {
return DEAL_RES_CONTINUE;
}
SColumnNode *node = (SColumnNode *)pNode;
if (TSDB_INS_USER_STABLES_DBNAME_COLID == node->colId) {
*(int32_t *)pContext = 2;
return DEAL_RES_CONTINUE;
}
*(int32_t *)pContext = 0;
return DEAL_RES_CONTINUE;
}
case QUERY_NODE_VALUE: {
if (2 != *(int32_t *)pContext) {
return DEAL_RES_CONTINUE;
}
SValueNode *node = (SValueNode *)pNode;
char *dbName = nodesGetValueFromNode(node);
strncpy(pContext, varDataVal(dbName), varDataLen(dbName));
*((char *)pContext + varDataLen(dbName)) = 0;
return DEAL_RES_ERROR; // stop walk
}
default:
break;
}
return DEAL_RES_CONTINUE;
}
void getDBNameFromCondition(SNode *pCondition, char *dbName) {
if (NULL == pCondition) {
return;
}
nodesWalkNode(pCondition, getDBNameFromConditionWalker, dbName);
}
static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
// build message and send to mnode to fetch the content of system tables.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -5600,7 +5661,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
pInfo->req.type = pInfo->type;
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
getFullDBNameFromCondition(pInfo->pCondition, pInfo->req.db));
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen);
......@@ -5614,7 +5679,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
return NULL;
}
pMsgSendInfo->param = pInfo;
pMsgSendInfo->param = pOperator;
pMsgSendInfo->msgInfo.pData = buf1;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE;
......@@ -5624,6 +5689,10 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready);
if (pTaskInfo->code) {
return NULL;
}
SRetrieveMetaTableRsp* pRsp = pInfo->pRsp;
pInfo->req.showId = pRsp->handle;
......@@ -5645,7 +5714,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo *pOperator, bool* newgroup) {
}
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo) {
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -5655,10 +5724,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
return NULL;
}
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
pInfo->accountId = accountId;
pInfo->showRewrite = showRewrite;
pInfo->pRes = pResBlock;
pInfo->capacity = 4096;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
// TODO remove it
int32_t tableType = 0;
......@@ -8531,7 +8602,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(pHandle->meta, pResBlock, &pScanNode->tableName,
pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, pTaskInfo);
pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator;
} else {
ASSERT(0);
......
......@@ -237,6 +237,7 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(scanFlag);
COPY_SCALAR_FIELD(scanRange);
COPY_SCALAR_FIELD(tableName);
COPY_SCALAR_FIELD(showRewrite);
return (SNode*)pDst;
}
......
......@@ -764,6 +764,8 @@ static int32_t jsonToEpSet(const SJson* pJson, void* pObj) {
}
static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet";
static const char* jkSysTableScanPhysiPlanShowRewrite = "ShowRewrite";
static const char* jkSysTableScanPhysiPlanAccountId = "AccountId";
static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) {
const SSystemTableScanPhysiNode* pNode = (const SSystemTableScanPhysiNode*)pObj;
......@@ -772,6 +774,12 @@ static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, epSetToJson, &pNode->mgmtEpSet);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSysTableScanPhysiPlanShowRewrite, pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId);
}
return code;
}
......@@ -783,6 +791,12 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, jsonToEpSet, &pNode->mgmtEpSet);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSysTableScanPhysiPlanShowRewrite, &pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetNumberValue(pJson, jkSysTableScanPhysiPlanAccountId, pNode->accountId);
}
return code;
}
......
......@@ -1962,6 +1962,7 @@ static int32_t rewriteShow(STranslateContext* pCxt, SQuery* pQuery) {
code = createShowCondition((SShowStmt*)pQuery->pRoot, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
}
......
......@@ -161,6 +161,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pScan->tableName.acctId = pCxt->pPlanCxt->acctId;
strcpy(pScan->tableName.dbname, pRealTable->table.dbName);
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
// set columns to scan
SNodeList* pCols = NULL;
......
......@@ -331,18 +331,14 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
return TSDB_CODE_OUT_OF_MEMORY;
}
pScan->showRewrite = pScanLogicNode->showRewrite;
pScan->accountId = pCxt->pPlanCxt->acctId;
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
} else {
SQueryNodeAddr addr = { .nodeId = MND_VGID, .epSet = pCxt->pPlanCxt->mgmtEpSet };
taosArrayPush(pCxt->pExecNodeList, &addr);
//for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) {
// SQueryNodeAddr addr;
// vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr);
// taosArrayPush(pCxt->pExecNodeList, &addr);
//}
}
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
......
......@@ -917,7 +917,7 @@ _return:
qwReleaseTaskCtx(mgmt, ctx);
}
if (TSDB_CODE_SUCCESS == code && readyConnection) {
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
}
......
......@@ -74,15 +74,15 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
while (pIter) {
SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
(*ctxVal->free)(ctxVal->val);
(*ctxVal->freeFunc)(ctxVal->val);
pIter = taosHashIterate(pCtx->args, pIter);
}
taosHashCleanup(pCtx->args);
if (pCtx->brokenVal.free) {
(*pCtx->brokenVal.free)(pCtx->brokenVal.val);
if (pCtx->brokenVal.freeFunc) {
(*pCtx->brokenVal.freeFunc)(pCtx->brokenVal.val);
}
}
......@@ -1254,18 +1254,18 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
return TSDB_CODE_SUCCESS;
}
void schFreeRpcCtxVal(void *arg) {
void schFreeRpcCtxVal(const void *arg) {
if (NULL == arg) {
return;
}
SMsgSendInfo* pMsgSendInfo = arg;
tfree(pMsgSendInfo->param);
tfree(pMsgSendInfo);
SMsgSendInfo* pMsgSendInfo = (SMsgSendInfo *)arg;
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
}
int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
SSchTaskCallbackParam *param = calloc(1, sizeof(SSchTaskCallbackParam));
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1282,7 +1282,7 @@ int32_t schMakeTaskCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam)
}
int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
SSchHbCallbackParam *param = calloc(1, sizeof(SSchHbCallbackParam));
SSchHbCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1306,7 +1306,7 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *
int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = NULL;
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1327,14 +1327,14 @@ int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *
brokenVal->msgType = msgType;
brokenVal->val = pMsgSendInfo;
brokenVal->clone = schCloneSMsgSendInfo;
brokenVal->free = schFreeRpcCtxVal;
brokenVal->freeFunc = schFreeRpcCtxVal;
return TSDB_CODE_SUCCESS;
_return:
tfree(pMsgSendInfo->param);
tfree(pMsgSendInfo);
taosMemoryFreeClear(pMsgSendInfo->param);
taosMemoryFreeClear(pMsgSendInfo);
SCH_RET(code);
}
......@@ -1350,13 +1350,13 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param = calloc(1, sizeof(SSchTaskCallbackParam));
param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1374,7 +1374,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal};
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1387,8 +1387,8 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
_return:
taosHashCleanup(pCtx->args);
tfree(param);
tfree(pMsgSendInfo);
taosMemoryFreeClear(param);
taosMemoryFreeClear(pMsgSendInfo);
SCH_RET(code);
}
......@@ -1409,13 +1409,13 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
param = calloc(1, sizeof(SSchHbCallbackParam));
param = taosMemoryCalloc(1, sizeof(SSchHbCallbackParam));
if (NULL == param) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchHbCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1431,7 +1431,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
pMsgSendInfo->param = param;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .free = schFreeRpcCtxVal};
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1444,8 +1444,8 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
_return:
taosHashCleanup(pCtx->args);
tfree(param);
tfree(pMsgSendInfo);
taosMemoryFreeClear(param);
taosMemoryFreeClear(pMsgSendInfo);
SCH_RET(code);
}
......@@ -1479,7 +1479,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHeader **pDst) {
if (pSrc->isHbParam) {
SSchHbCallbackParam *dst = malloc(sizeof(SSchHbCallbackParam));
SSchHbCallbackParam *dst = taosMemoryMalloc(sizeof(SSchHbCallbackParam));
if (NULL == dst) {
qError("malloc SSchHbCallbackParam failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1491,7 +1491,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
return TSDB_CODE_SUCCESS;
}
SSchTaskCallbackParam *dst = malloc(sizeof(SSchTaskCallbackParam));
SSchTaskCallbackParam *dst = taosMemoryMalloc(sizeof(SSchTaskCallbackParam));
if (NULL == dst) {
qError("malloc SSchTaskCallbackParam failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1506,7 +1506,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
int32_t schCloneSMsgSendInfo(void *src, void **dst) {
SMsgSendInfo *pSrc = src;
int32_t code = 0;
SMsgSendInfo *pDst = malloc(sizeof(*pSrc));
SMsgSendInfo *pDst = taosMemoryMalloc(sizeof(*pSrc));
if (NULL == pDst) {
qError("malloc SMsgSendInfo for rpcCtx failed, len:%d", (int32_t)sizeof(*pSrc));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1523,7 +1523,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
_return:
tfree(pDst);
taosMemoryFreeClear(pDst);
SCH_RET(code);
}
......@@ -1553,7 +1553,7 @@ int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
if (taosHashPut(pDst->args, msgType, sizeof(*msgType), &dst, sizeof(dst))) {
qError("taosHashPut msg %d to rpcCtx failed", *msgType);
(*dst.free)(dst.val);
(*dst.freeFunc)(dst.val);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1601,8 +1601,9 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet*
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
qDebug("start to send %s msg, refId:%" PRIx64 "instance:%p, handle:%p",
TMSG_INFO(msgType), pJob->refId, trans->transInst, trans->transHandle);
qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p",
TMSG_INFO(msgType), ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port,
pJob->refId, trans->transInst, trans->transHandle);
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
......@@ -1649,7 +1650,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
qError("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *msg = calloc(1, msgSize);
void *msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
qError("calloc hb req %d failed", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1660,13 +1661,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("calloc SMsgSendInfo failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchTaskCallbackParam *param = calloc(1, sizeof(SSchTaskCallbackParam));
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
if (NULL == param) {
qError("calloc SSchTaskCallbackParam failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
......@@ -1702,9 +1703,9 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
_return:
tfree(msg);
tfree(param);
tfree(pMsgSendInfo);
taosMemoryFreeClear(msg);
taosMemoryFreeClear(param);
taosMemoryFreeClear(pMsgSendInfo);
schFreeRpcCtx(&rpcCtx);
SCH_RET(code);
}
......
......@@ -805,7 +805,7 @@ TEST(queryTest, readyFirstCase) {
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
ASSERT_EQ(pRsp->completed, 1);
ASSERT_EQ(pRsp->numOfRows, 10);
tfree(data);
taosMemoryFreeClear(data);
data = NULL;
code = schedulerFetchRows(job, &data);
......
......@@ -39,11 +39,10 @@ endi
print =============== drop database
sql drop database d1
# todo release
#sql show databases
#if $rows != 1 then
# return -1
#endi
sql show databases
if $rows != 1 then
return -1
endi
print =============== more databases
sql create database d2 vgroups 2
......
......@@ -58,11 +58,10 @@ endi
print =============== step3
sql drop database $db
# todo release
#sql show databases
#if $rows != 1 then
# return -1
#endi
sql show databases
if $rows != 1 then
return -1
endi
print =============== step4
sql_error drop database $db
......@@ -319,4 +318,4 @@ if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -61,6 +61,7 @@ endi
print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
sleep 1000
print =============== create database
sql_error drop database d1
......
......@@ -5,9 +5,6 @@ system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect
# todo remove
sql create database useless_db
print =============== show dnodes
sql show dnodes;
if $rows != 1 then
......@@ -83,9 +80,6 @@ if $data02 != master then
return -1
endi
# todo remove
sql drop database useless_db
print =============== create database
sql create database d1 vgroups 4;
sql create database d2;
......@@ -202,4 +196,4 @@ if $data00 != 1 then
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode2 -s stop -x SIGINT
......@@ -6,9 +6,6 @@ system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
# todo remove
sql create database useless_db
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
......@@ -26,9 +23,6 @@ if $data04 != ready then
goto check_dnode_ready
endi
# todo remove
sql drop database useless_db
#root@trd02 /data2/dnode $ tmq_demo --help
#Used to tmq_demo
# -c Configuration directory, default is
......
......@@ -3,9 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
# todo remove
sql create database useless_db
print =============== show users
sql show users
if $rows != 1 then
......@@ -74,7 +71,4 @@ print $data10 $data11 $data22
print $data20 $data11 $data22
print $data30 $data31 $data32
# todo remove
sql drop database useless_db
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册