提交 8f16b9b8 编写于 作者: D dapan1121

catalog force update

上级 7f61dbc8
......@@ -68,6 +68,7 @@ typedef struct SCatalogReq {
SArray *pIndex; // element is index name
SArray *pUser; // element is SUserAuthInfo
bool qNodeRequired; // valid qnode
bool forceUpdate;
} SCatalogReq;
typedef struct SMetaData {
......@@ -280,7 +281,7 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId);
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);
/**
......
......@@ -222,7 +222,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|| (_type) == TDMT_VND_DROP_TABLE || (_type) == TDMT_VND_DROP_STB)
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define REQUEST_MAX_TRY_TIMES 1
......
......@@ -223,6 +223,10 @@ static void doDestroyRequest(void *p) {
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pDb);
......@@ -230,10 +234,6 @@ static void doDestroyRequest(void *p) {
doFreeReqResultInfo(&pRequest->body.resInfo);
qDestroyQueryPlan(pRequest->body.pDag);
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
}
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
......
......@@ -741,6 +741,20 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
do {
destroyRequest(pRequest);
pRequest = launchQuery(pTscObj, sql, sqlLen);
if (*sql == 'y') {
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false);
break;
} else if (*sql == 'z') {
SCatalog *pCatalog = NULL;
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false);
break;
}
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
break;
}
......
......@@ -180,7 +180,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
taosMemoryFreeClear(output.dbVgroup);
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
} else if (output.dbVgroup) {
} else if (output.dbVgroup && output.dbVgroup->vgHash) {
struct SCatalog* pCatalog = NULL;
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
......
......@@ -52,6 +52,7 @@ enum {
CTG_OP_UPDATE_VGROUP = 0,
CTG_OP_UPDATE_TB_META,
CTG_OP_DROP_DB_CACHE,
CTG_OP_DROP_DB_VGROUP,
CTG_OP_DROP_STB_META,
CTG_OP_DROP_TB_META,
CTG_OP_UPDATE_USER,
......@@ -266,26 +267,32 @@ typedef struct SCtgUpdateTblMsg {
STableMetaOutput* output;
} SCtgUpdateTblMsg;
typedef struct SCtgRemoveDBMsg {
typedef struct SCtgDropDBMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t dbId;
} SCtgRemoveDBMsg;
} SCtgDropDBMsg;
typedef struct SCtgRemoveStbMsg {
typedef struct SCtgDropDbVgroupMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDropDbVgroupMsg;
typedef struct SCtgDropStbMetaMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
uint64_t suid;
} SCtgRemoveStbMsg;
} SCtgDropStbMetaMsg;
typedef struct SCtgRemoveTblMsg {
typedef struct SCtgDropTblMetaMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId;
} SCtgRemoveTblMsg;
} SCtgDropTblMetaMsg;
typedef struct SCtgUpdateUserMsg {
SCatalog* pCtg;
......@@ -451,6 +458,7 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropDbCache(SCtgCacheOperation *action);
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *action);
int32_t ctgOpDropStbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropTbMeta(SCtgCacheOperation *action);
int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
......@@ -464,6 +472,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
......
......@@ -286,6 +286,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t taskIdx = 0;
for (int32_t i = 0; i < dbVgNum; ++i) {
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
if (pReq->forceUpdate) {
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
}
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
}
......@@ -301,6 +304,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
for (int32_t i = 0; i < tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
if (pReq->forceUpdate) {
catalogRemoveTableMeta(pCtg, name);
}
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
}
......
......@@ -35,6 +35,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
"drop DB",
ctgOpDropDbCache
},
{
CTG_OP_DROP_DB_VGROUP,
"drop DBVgroup",
ctgOpDropDbVgroup
},
{
CTG_OP_DROP_STB_META,
"drop stbMeta",
......@@ -563,9 +568,9 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_CACHE};
SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg));
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -590,13 +595,43 @@ _return:
CTG_RET(code);
}
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_DB_VGROUP, .syncOp = syncOp};
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
msg->pCtg = pCtg;
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
action.data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, &action));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(action.data);
CTG_RET(code);
}
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_STB_META, .syncOp = syncOp};
SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -623,9 +658,9 @@ _return:
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation action= {.opId = CTG_OP_DROP_TB_META, .syncOp = syncOp};
SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg));
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -1281,7 +1316,7 @@ _return:
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveDBMsg *msg = operation->data;
SCtgDropDBMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......@@ -1304,6 +1339,33 @@ _return:
CTG_RET(code);
}
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgDropDbVgroupMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
goto _return;
}
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
ctgFreeVgInfo(dbCache->vgInfo);
dbCache->vgInfo = NULL;
ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);
ctgWReleaseVgInfo(dbCache);
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
......@@ -1353,7 +1415,7 @@ _return:
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveStbMsg *msg = operation->data;
SCtgDropStbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......@@ -1399,7 +1461,7 @@ _return:
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgRemoveTblMsg *msg = operation->data;
SCtgDropTblMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
......
......@@ -132,7 +132,7 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
}
}
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId) {
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) {
int32_t code = 0;
SCatalogReq req = {0};
req.pTableMeta = taosArrayInit(2, sizeof(SName));
......@@ -144,6 +144,7 @@ int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
req.pIndex = NULL;//taosArrayInit(2, TSDB_INDEX_FNAME_LEN);
req.pUser = taosArrayInit(2, sizeof(SUserAuthInfo));
req.qNodeRequired = true;
req.forceUpdate = forceUpdate;
SName name = {0};
char dbFName[TSDB_DB_FNAME_LEN] = {0};
......
......@@ -377,15 +377,21 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
return TSDB_CODE_SUCCESS;
}
taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx));
if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) {
SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx);
} else {
SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx);
}
if (execIdx != pTask->execIdx) { // ignore it
SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx);
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) {
if (taosHashGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -393,6 +399,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx));
nodeInfo->handle = handle;
SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx);
return TSDB_CODE_SUCCESS;
}
......@@ -403,7 +411,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pTask, handle, execIdx);
schUpdateTaskExecNode(pJob, pTask, handle, execIdx);
return TSDB_CODE_SUCCESS;
}
......@@ -551,6 +559,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("set %dth condidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
++addNum;
}
}
......@@ -1110,6 +1120,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_UNLOCK(SCH_WRITE, &parent->lock);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task %" PRIx64, readyNum, parent->taskId);
SCH_ERR_RET(schLaunchTask(pJob, parent));
}
}
......@@ -1186,7 +1197,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
SCH_TASK_DLOG("task has %d exec address", size);
SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}
......@@ -1196,7 +1207,8 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
}
SCH_LOCK_TASK(pTask);
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) {
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx);
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
......@@ -1306,9 +1318,10 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execIdx++;
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx);
SCH_LOG_TASK_START_TS(pTask);
if (schJobNeedToStop(pJob, &status)) {
......
......@@ -1037,6 +1037,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
isCandidateAddr = true;
SCH_TASK_DLOG("target candidateIdx %d", pTask->candidateIdx);
}
SEpSet epSet = addr->epSet;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册