未验证 提交 e75f14b9 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #13471 from taosdata/feature/qnode

feat: add force update option for catalog updating
......@@ -1982,6 +1982,7 @@ typedef struct {
typedef struct {
SClientHbKey connKey;
int64_t clusterId;
SQueryHbReqBasic* query;
SHashObj* info; // hash<Skv.key, Skv>
} SClientHbReq;
......
......@@ -68,6 +68,7 @@ typedef struct SCatalogReq {
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
bool qNodeRequired; // valid qnode
bool forceUpdate;
} SCatalogReq;
typedef struct SMetaData {
......@@ -280,7 +281,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);
/**
......
......@@ -222,7 +222,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define REQUEST_MAX_TRY_TIMES 1
......
......@@ -104,6 +104,8 @@ void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
void schedulerStopQueryHb(void *pTrans);
/**
* Cancel query job
......
......@@ -57,11 +57,6 @@ enum {
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {
void* param;
SClientHbReq* req;
} SHbConnInfo;
typedef struct {
char* key;
// statistics
......@@ -71,11 +66,8 @@ typedef struct {
int64_t startTime;
// ctl
SRWLatch lock; // lock is used in serialization
// connection
SAppInstInfo* pAppInstInfo;
// info
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
} SAppHbMgr;
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
......@@ -325,8 +317,6 @@ void appHbMgrCleanup(void);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
// --- mq
void hbMgrInitMqHbRspHandle();
......
......@@ -130,8 +130,13 @@ void destroyTscObj(void *pObj) {
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
if (0 == connNum) {
// TODO
//closeTransporter(pTscObj);
}
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj);
......@@ -223,6 +228,10 @@ static void doDestroyRequest(void *p) {
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pDb);
......@@ -230,10 +239,6 @@ static void doDestroyRequest(void *p) {
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryPlan(pRequest->body.pDag);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
......
......@@ -129,9 +129,9 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
}
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == pReq) {
tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
pRsp->connKey.connType);
return TSDB_CODE_SUCCESS;
}
......@@ -181,12 +181,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
......@@ -199,12 +198,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
......@@ -217,12 +215,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break;
}
int64_t *clusterId = (int64_t *)info->param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
......@@ -547,13 +544,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
if (info) {
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
continue;
}
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq);
if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
continue;
}
//hbClearClientHbReq(pOneReq);
......@@ -569,23 +563,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return pBatchReq;
}
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq *pOneReq = pIter;
tFreeReqKvHash(pOneReq->info);
taosHashClear(pOneReq->info);
if (pOneReq->query) {
taosArrayDestroy(pOneReq->query->queryDesc);
taosMemoryFreeClear(pOneReq->query);
}
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
}
void hbThreadFuncUnexpectedStopped(void) {
atomic_store_8(&clientHbMgr.threadStop, 2);
}
......@@ -715,14 +692,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
}
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
// init getInfoFunc
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
if (pAppHbMgr->connInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pAppHbMgr);
return NULL;
}
taosThreadMutexLock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
......@@ -745,15 +714,6 @@ void appHbMgrCleanup(void) {
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
pIter = taosHashIterate(pTarget->connInfo, NULL);
while (pIter != NULL) {
SHbConnInfo *info = pIter;
taosMemoryFree(info->param);
pIter = taosHashIterate(pTarget->connInfo, pIter);
}
taosHashCleanup(pTarget->connInfo);
pTarget->connInfo = NULL;
taosMemoryFree(pTarget->key);
taosMemoryFree(pTarget);
}
......@@ -791,7 +751,7 @@ void hbMgrCleanUp() {
clientHbMgr.appHbMgrs = NULL;
}
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
// init hash in activeinfo
void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
if (data != NULL) {
......@@ -799,17 +759,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
}
SClientHbReq hbReq = {0};
hbReq.connKey = connKey;
hbReq.clusterId = clusterId;
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
// init hash
if (info != NULL) {
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
info->req = pReq;
taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
}
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
return 0;
}
......@@ -819,15 +773,10 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
.tscRid = tscRefId,
.connType = connType,
};
SHbConnInfo info = {0};
switch (connType) {
case CONN_TYPE__QUERY: {
int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t));
*pClusterId = clusterId;
info.param = pClusterId;
return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
}
case CONN_TYPE__TMQ: {
return 0;
......@@ -844,26 +793,10 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
}
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
if (info) {
taosMemoryFree(info->param);
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
}
if (NULL == pReq || NULL == info) {
if (NULL == pReq) {
return;
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
}
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
int32_t valueLen) {
// find req by connection id
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
ASSERT(pReq != NULL);
taosHashPut(pReq->info, key, keyLen, value, valueLen);
return 0;
}
......@@ -180,7 +180,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
taosMemoryFreeClear(output.dbVgroup);
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup) {
} else if (output.dbVgroup && output.dbVgroup->vgHash) {
struct SCatalog* pCatalog = NULL;
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
......
......@@ -2332,6 +2332,8 @@ static int32_t isSchemalessDb(SSmlHandle* info){
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
return code;
}
taosArrayDestroy(pInfo.pRetensions);
if (!pInfo.schemaless){
info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF;
smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname);
......
......@@ -52,6 +52,7 @@ enum {
CTG_OP_UPDATE_VGROUP = 0,
CTG_OP_UPDATE_TB_META,
CTG_OP_DROP_DB_CACHE,
CTG_OP_DROP_DB_VGROUP,
CTG_OP_DROP_STB_META,
CTG_OP_DROP_TB_META,
CTG_OP_UPDATE_USER,
......@@ -266,26 +267,32 @@ typedef struct SCtgUpdateTblMsg {
STableMetaOutput* output;
} SCtgUpdateTblMsg;
typedef struct SCtgRemoveDBMsg {
typedef struct SCtgDropDBMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
} SCtgRemoveDBMsg;
} SCtgDropDBMsg;
typedef struct SCtgRemoveStbMsg {
typedef struct SCtgDropDbVgroupMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDropDbVgroupMsg;
typedef struct SCtgDropStbMetaMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid;
} SCtgRemoveStbMsg;
} SCtgDropStbMetaMsg;
typedef struct SCtgRemoveTblMsg {
typedef struct SCtgDropTblMetaMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
} SCtgRemoveTblMsg;
} SCtgDropTblMetaMsg;
typedef struct SCtgUpdateUserMsg {
SCatalog* pCtg;
......@@ -451,6 +458,7 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropDbCache(SCtgCacheOperation *action);
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *action);
int32_t ctgOpDropStbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropTbMeta(SCtgCacheOperation *action);
int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
......@@ -464,6 +472,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
......
......@@ -286,6 +286,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t taskIdx = 0;
for (int32_t i = 0; i < dbVgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
if (pReq->forceUpdate) {
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
}
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
}
......@@ -301,6 +304,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
for (int32_t i = 0; i < tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
if (pReq->forceUpdate) {
catalogRemoveTableMeta(pCtg, name);
}
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
}
......
......@@ -35,6 +35,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
"drop DB",
ctgOpDropDbCache
},
{
CTG_OP_DROP_DB_VGROUP,
"drop DBVgroup",
ctgOpDropDbVgroup
},
{
CTG_OP_DROP_STB_META,
"drop stbMeta",
......@@ -563,9 +568,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE};
SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg));
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -590,13 +595,43 @@ _return:
CTG_RET(code);
}
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp};
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
action.data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(action.data);
CTG_RET(code);
}
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp};
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -623,9 +658,9 @@ _return:
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp};
SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg));
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -1281,7 +1316,7 @@ _return:
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveDBMsg *msg = operation->data;
SCtgDropDBMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......@@ -1304,6 +1339,33 @@ _return:
CTG_RET(code);
}
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgDropDbVgroupMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
goto _return;
}
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
ctgFreeVgInfo(dbCache->vgInfo);
dbCache->vgInfo = NULL;
ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);
ctgWReleaseVgInfo(dbCache);
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
......@@ -1353,7 +1415,7 @@ _return:
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveStbMsg *msg = operation->data;
SCtgDropStbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......@@ -1399,7 +1461,7 @@ _return:
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveTblMsg *msg = operation->data;
SCtgDropTblMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......
......@@ -132,7 +132,22 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
}
}
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) {
/*
prepare SQL:
create database db1;
use db1;
create stable st1 (ts timestamp, f1 int) tags(t1 int);
create table tb1 using st1 tags(1);
insert into tb1 values (now, 1);
create qnode on dnode 1;
create user user1 pass "abc";
create database db2;
grant write on db2.* to user1;
create function udf1 as '/tmp/libudf1.so' outputtype int;
create aggregate function udf2 as '/tmp/libudf2.so' outputtype int;
*/
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) {
int32_t code = 0;
SCatalogReq req = {0};
req.pTableMeta = taosArrayInit(2, sizeof(SName));
......@@ -144,6 +159,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo));
req.qNodeRequired = true;
req.forceUpdate = forceUpdate;
SName name = {0};
char dbFName[TSDB_DB_FNAME_LEN] = {0};
......
......@@ -34,6 +34,7 @@ extern "C" {
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
#define QW_DEFAULT_SHORT_RUN_TIMES 2
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
#define QW_SCH_TIMEOUT_MSEC 180000
enum {
QW_PHASE_PRE_QUERY = 1,
......@@ -137,7 +138,7 @@ typedef struct SQWTaskCtx {
} SQWTaskCtx;
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
int64_t hbBrokenTs; // timestamp in msecond
SRWLatch hbConnLock;
SRpcHandleInfo hbConnInfo;
SQueryNodeEpId hbEpId;
......@@ -354,6 +355,8 @@ int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
......
......@@ -535,3 +535,9 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
return -1;
}
void qwClearExpiredSch(SArray* pExpiredSch) {
}
......@@ -21,10 +21,12 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
QW_LOCK(QW_WRITE, &sch->hbConnLock);
sch->hbBrokenTs = taosGetTimestampMs();
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
sch->hbConnInfo.handle = NULL;
......@@ -794,6 +796,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
SQWHbInfo *rspList = NULL;
SArray *pExpiredSch = NULL;
int32_t code = 0;
qwDbgDumpMgmtInfo(mgmt);
......@@ -809,8 +812,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
}
rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
if (NULL == rspList) {
pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
if (NULL == rspList || NULL == pExpiredSch) {
QW_UNLOCK(QW_READ, &mgmt->schLock);
taosMemoryFree(rspList);
taosArrayDestroy(pExpiredSch);
QW_ELOG("calloc %d SQWHbInfo failed", schNum);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
qwRelease(refId);
......@@ -820,6 +826,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
void *key = NULL;
size_t keyLen = 0;
int32_t i = 0;
int64_t currentMs = taosGetTimestampMs();
void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) {
......@@ -827,6 +834,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
if (NULL == sch->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch->tasksHash) <= 0) {
taosArrayPush(pExpiredSch, sId);
}
pIter = taosHashIterate(mgmt->schHash, pIter);
continue;
}
......@@ -852,7 +864,12 @@ _return:
tFreeSSchedulerHbRsp(&rspList[j].rsp);
}
if (taosArrayGetSize(pExpiredSch) > 0) {
qwClearExpiredSch(pExpiredSch);
}
taosMemoryFreeClear(rspList);
taosArrayDestroy(pExpiredSch);
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
qwRelease(refId);
......
......@@ -102,11 +102,11 @@ typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
SRWLatch lock;
bool exit;
int32_t jobRef;
int32_t jobNum;
SSchStat stat;
SRWLatch hbLock;
SHashObj *hbConnections;
} SSchedulerMgmt;
......@@ -320,6 +320,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void* pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
SSchJob *schAcquireJob(int64_t refId);
......
......@@ -126,30 +126,6 @@ _return:
SCH_RET(code);
}
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
if (!pTask->registerdHb) {
return;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
return;
}
atomic_sub_fetch_64(&hb->taskNum, 1);
pTask->registerdHb = false;
}
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask);
......@@ -377,15 +353,21 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx));
if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) {
SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx);
} else {
SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx);
}
if (execIdx != pTask->execIdx) { // ignore it
SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx);
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
if (taosHashGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -393,6 +375,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx));
nodeInfo->handle = handle;
SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx);
return TSDB_CODE_SUCCESS;
}
......@@ -403,7 +387,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pTask, handle, execIdx);
schUpdateTaskExecNode(pJob, pTask, handle, execIdx);
return TSDB_CODE_SUCCESS;
}
......@@ -551,6 +535,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("set %dth condidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
++addNum;
}
}
......@@ -1110,6 +1096,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_UNLOCK(SCH_WRITE, &parent->lock);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task %" PRIx64, readyNum, parent->taskId);
SCH_ERR_RET(schLaunchTask(pJob, parent));
}
}
......@@ -1186,7 +1173,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
SCH_TASK_DLOG("task has %d exec address", size);
SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}
......@@ -1196,7 +1183,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
}
SCH_LOCK_TASK(pTask);
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) {
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx);
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
......@@ -1306,9 +1294,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execIdx++;
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx);
SCH_LOG_TASK_START_TS(pTask);
if (schJobNeedToStop(pJob, &status)) {
......@@ -1471,9 +1460,10 @@ void schFreeJobImpl(void *job) {
qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
atomic_sub_fetch_32(&schMgmt.jobNum, 1);
schCloseJobRef();
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
if (jobNum == 0) {
schCloseJobRef();
}
}
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
......
......@@ -648,31 +648,6 @@ _return:
SCH_RET(code);
}
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0;
SSchHbTrans hb = {0};
hb.trans.pTrans = pJob->pTrans;
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
if (code) {
schFreeRpcCtx(&hb.rpcCtx);
if (HASH_NODE_EXIST(code)) {
*exist = true;
return TSDB_CODE_SUCCESS;
}
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code);
}
return TSDB_CODE_SUCCESS;
}
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
SSchedulerHbReq req = {0};
int32_t code = 0;
......@@ -684,17 +659,20 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
req.sId = schMgmt.sId;
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
nodeEpId->ep.port);
SCH_ERR_RET(code);
return TSDB_CODE_SUCCESS;
}
SCH_LOCK(SCH_WRITE, &hb->lock);
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
memcpy(&trans, &hb->trans, sizeof(trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_ERR_RET(code);
......@@ -764,60 +742,6 @@ _return:
SCH_RET(code);
}
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SSchHbTrans *hb = NULL;
while (true) {
hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
bool exist = false;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
if (!exist) {
SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL));
}
continue;
}
break;
}
atomic_add_fetch_64(&hb->taskNum, 1);
pTask->registerdHb = true;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
int32_t code = 0;
SSchHbTrans *hb = NULL;
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SCH_LOCK(SCH_WRITE, &hb->lock);
memcpy(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
return TSDB_CODE_SUCCESS;
}
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
SSchedulerHbRsp rsp = {0};
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
......@@ -1037,6 +961,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
isCandidateAddr = true;
SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx);
}
SEpSet epSet = addr->epSet;
......
......@@ -21,17 +21,189 @@
#include "tref.h"
#include "trpc.h"
void schCleanClusterHb(void* pTrans) {
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
while (hb) {
if (hb->trans.pTrans == pTrans) {
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
}
hb = taosHashIterate(schMgmt.hbConnections, hb);
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
}
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY
int32_t code = 0;
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId->nodeId, epId->ep.fqdn, epId->ep.port);
return TSDB_CODE_SUCCESS;
}
int64_t taskNum = atomic_load_64(&hb->taskNum);
if (taskNum <= 0) {
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
return TSDB_CODE_SUCCESS;
}
int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
int32_t code = 0;
SSchHbTrans hb = {0};
hb.trans.pTrans = pJob->pTrans;
hb.taskNum = 1;
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
if (code) {
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
schFreeRpcCtx(&hb.rpcCtx);
if (HASH_NODE_EXIST(code)) {
*exist = true;
return TSDB_CODE_SUCCESS;
}
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(code);
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
return TSDB_CODE_SUCCESS;
}
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *pEpId) {
SSchHbTrans *hb = NULL;
while (true) {
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
hb = taosHashGet(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
bool exist = false;
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_ERR_RET(schAddHbConnection(pJob, pTask, pEpId, &exist));
if (!exist) {
SCH_RET(schBuildAndSendHbMsg(pEpId, NULL));
}
continue;
}
break;
}
atomic_add_fetch_64(&hb->taskNum, 1);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
return TSDB_CODE_SUCCESS;
}
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
if (!pTask->registerdHb) {
return;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
SCH_TASK_WLOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
return;
}
int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
if (0 == taskNum) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
schRemoveHbConnection(pJob, pTask, &epId);
} else {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
}
pTask->registerdHb = false;
}
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId;
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
pTask->registerdHb = true;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
int32_t code = 0;
SSchHbTrans *hb = NULL;
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SCH_LOCK(SCH_WRITE, &hb->lock);
memcpy(&hb->trans, trans, sizeof(*trans));
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
return TSDB_CODE_SUCCESS;
}
void schCloseJobRef(void) {
if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
return;
}
SCH_LOCK(SCH_WRITE, &schMgmt.lock);
if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
if (schMgmt.jobRef >= 0) {
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = -1;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
}
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
......@@ -88,4 +260,3 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
(*pCtx->freeFunc)(pCtx->brokenVal.val);
}
......@@ -182,6 +182,14 @@ int32_t scheduleCancelJob(int64_t job) {
SCH_RET(code);
}
void schedulerStopQueryHb(void *pTrans) {
if (NULL == pTrans) {
return;
}
schCleanClusterHb(pTrans);
}
void schedulerFreeJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
......@@ -220,6 +228,7 @@ void schedulerDestroy(void) {
}
}
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
if (schMgmt.hbConnections) {
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
while (pIter != NULL) {
......@@ -230,4 +239,5 @@ void schedulerDestroy(void) {
taosHashCleanup(schMgmt.hbConnections);
schMgmt.hbConnections = NULL;
}
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
}
......@@ -180,6 +180,12 @@ static bool addHandleToAcceptloop(void* arg);
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
if (conn->regArg.init) { \
tTrace("server conn %p release, notify server app", conn); \
STrans* pTransInst = conn->pTransInst; \
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \
memset(&conn->regArg, 0, sizeof(conn->regArg)); \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
......@@ -1154,6 +1160,10 @@ int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
}
SExHandle* ex = thandle;
SSvrConn* pConn = ex->handle;
if (pConn == NULL) {
tTrace("invalid handle %p, failed to Get Conn info", thandle);
return -1;
}
struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册