From d84f44c1499642c0bc975fbd50d40cd8cb01aaec Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 28 Oct 2021 11:36:51 +0800 Subject: [PATCH] minor changes --- include/util/taoserror.h | 8 +-- source/server/vnode/inc/vnodeInt.h | 2 - source/server/vnode/src/vnodeFile.c | 83 ++++++++++++---------------- source/server/vnode/src/vnodeInt.c | 35 ++++++++++++ source/server/vnode/src/vnodeMain.c | 36 ------------ source/server/vnode/src/vnodeWrite.c | 2 - source/util/src/terror.c | 8 +-- 7 files changed, 78 insertions(+), 96 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index cf8cd510c5..76c5f575a5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -233,11 +233,11 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) //"Missing data file") #define TSDB_CODE_VND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0508) //"Out of memory") #define TSDB_CODE_VND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0509) //"Unexpected generic error in vnode") -#define TSDB_CODE_VND_INVALID_VRESION_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid version file") -#define TSDB_CODE_VND_IS_FULL TAOS_DEF_ERROR_CODE(0, 0x050B) //"Database memory is full for commit failed") -#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full for waiting commit") +#define TSDB_CODE_VND_INVALID_CFG_FILE TAOS_DEF_ERROR_CODE(0, 0x050A) //"Invalid config file) +#define TSDB_CODE_VND_INVALID_TERM_FILE TAOS_DEF_ERROR_CODE(0, 0x050B) //"Invalid term file") +#define TSDB_CODE_VND_IS_FLOWCTRL TAOS_DEF_ERROR_CODE(0, 0x050C) //"Database memory is full") #define TSDB_CODE_VND_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x050D) //"Database is dropping") -#define TSDB_CODE_VND_IS_BALANCING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is balancing") +#define TSDB_CODE_VND_IS_UPDATING TAOS_DEF_ERROR_CODE(0, 0x050E) //"Database is updating") #define TSDB_CODE_VND_IS_CLOSING TAOS_DEF_ERROR_CODE(0, 0x0510) //"Database is closing") #define TSDB_CODE_VND_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0511) //"Database suspended") #define TSDB_CODE_VND_NO_WRITE_AUTH TAOS_DEF_ERROR_CODE(0, 0x0512) //"Database write operation denied") diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index 245455ef18..d94c5ba2b5 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -77,8 +77,6 @@ typedef struct SVnodeCfg { SSyncCfg sync; } SVnodeCfg; - - typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count diff --git a/source/server/vnode/src/vnodeFile.c b/source/server/vnode/src/vnodeFile.c index 8453c985c3..9835e3e0fb 100644 --- a/source/server/vnode/src/vnodeFile.c +++ b/source/server/vnode/src/vnodeFile.c @@ -280,90 +280,77 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) { return TSDB_CODE_SUCCESS; } -int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState){ -#if 0 +int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) { + int32_t ret = TSDB_CODE_VND_APP_ERROR; int32_t len = 0; int32_t maxLen = 100; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; - - terrno = TSDB_CODE_VND_INVALID_VRESION_FILE; - char file[TSDB_FILENAME_LEN + 30] = {0}; - sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; - fp = fopen(file, "r"); - if (!fp) { - if (errno != ENOENT) { - vError("vgId:%d, failed to read %s, error:%s", pVnode->vgId, file, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - } else { - terrno = TSDB_CODE_SUCCESS; - } - goto PARSE_VER_ERROR; - } + char file[PATH_MAX + 30] = {0}; + sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - vError("vgId:%d, failed to read %s, content is null", pVnode->vgId, file); - goto PARSE_VER_ERROR; + vError("vgId:%d, failed to read %s since content is null", vgId, file); + goto PARSE_TERM_ERROR; } root = cJSON_Parse(content); if (root == NULL) { - vError("vgId:%d, failed to read %s, invalid json format", pVnode->vgId, file); - goto PARSE_VER_ERROR; + vError("vgId:%d, failed to read %s since invalid json format", vgId, file); + goto PARSE_TERM_ERROR; } - cJSON *ver = cJSON_GetObjectItem(root, "version"); - if (!ver || ver->type != cJSON_Number) { - vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file); - goto PARSE_VER_ERROR; + cJSON *term = cJSON_GetObjectItem(root, "term"); + if (!term || term->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since term not found", vgId, file); + goto PARSE_TERM_ERROR; } -#if 0 - pVnode->version = (uint64_t)ver->valueint; + pState->term = (uint64_t)term->valueint; - terrno = TSDB_CODE_SUCCESS; - vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version); -#endif + cJSON *voteFor = cJSON_GetObjectItem(root, "voteFor"); + if (!voteFor || voteFor->type != cJSON_Number) { + vError("vgId:%d, failed to read %s since voteFor not found", vgId, file); + goto PARSE_TERM_ERROR; + } + pState->voteFor = (int64_t)voteFor->valueint; -PARSE_VER_ERROR: + vInfo("vgId:%d, read %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); + +PARSE_TERM_ERROR: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); - return terrno; -#endif - return 0; + return ret; } -int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState) { -#if 0 - char file[TSDB_FILENAME_LEN + 30] = {0}; - sprintf(file, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); +int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) { + char file[PATH_MAX + 30] = {0}; + sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId); FILE *fp = fopen(file, "w"); if (!fp) { - vError("vgId:%d, failed to write %s, reason:%s", pVnode->vgId, file, strerror(errno)); + vError("vgId:%d, failed to write %s since %s", vgId, file, strerror(errno)); return -1; } int32_t len = 0; int32_t maxLen = 100; - char * content = calloc(1, maxLen + 1); + char *content = calloc(1, maxLen + 1); -#if 0 len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion); + len += snprintf(content + len, maxLen - len, " \"term\": %" PRIu64 "\n", pState->term); + len += snprintf(content + len, maxLen - len, " \"voteFor\": %" PRIu64 "\n", pState->voteFor); len += snprintf(content + len, maxLen - len, "}\n"); -#endif + fwrite(content, 1, len, fp); taosFsyncFile(fileno(fp)); fclose(fp); free(content); - terrno = 0; - // vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion); -#endif + vInfo("vgId:%d, write %s success, voteFor:%" PRIu64 ", term:%" PRIu64, vgId, file, pState->voteFor, pState->term); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/server/vnode/src/vnodeInt.c b/source/server/vnode/src/vnodeInt.c index 5a5ba4df01..ed295160ba 100644 --- a/source/server/vnode/src/vnodeInt.c +++ b/source/server/vnode/src/vnodeInt.c @@ -24,6 +24,7 @@ static struct { SSteps *steps; SVnodeFp fp; + void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); } tsVint; void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) { @@ -36,7 +37,41 @@ void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsVint.fp.SendMsgToMnode)(rpcMsg); } +void vnodeProcessMsg(SRpcMsg *pMsg) { + if (tsVint.msgFp[pMsg->msgType]) { + (*tsVint.msgFp[pMsg->msgType])(pMsg); + } else { + assert(0); + } +} + +static void vnodeInitMsgFp() { + tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; + // mq related + tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; + tsVint.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; + // mq related end + tsVint.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVint.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; +} + int32_t vnodeInit(SVnodePara para) { + vnodeInitMsgFp(); tsVint.fp = para.fp; struct SSteps *steps = taosStepInit(8, NULL); diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index bfe2df9e43..5143f04c5b 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -44,7 +44,6 @@ static struct { SHashObj *hash; int32_t openVnodes; int32_t totalVnodes; - void (*msgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); } tsVnode; static bool vnodeSetInitStatus(SVnode *pVnode) { @@ -566,34 +565,7 @@ void vnodeRelease(SVnode *pVnode) { } } -static void vnodeInitMsgFp() { - tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; - // mq related - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; - // mq related end - tsVnode.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVnode.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; -} - int32_t vnodeInitMain() { - vnodeInitMsgFp(); - tsVnode.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVnode.hash == NULL) { vError("failed to init vnode mgmt"); @@ -654,11 +626,3 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) { } } } - -void vnodeProcessMsg(SRpcMsg *pMsg) { - if (tsVnode.msgFp[pMsg->msgType]) { - (*tsVnode.msgFp[pMsg->msgType])(pMsg); - } else { - assert(0); - } -} diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index 70fa9bff80..f3258af0bf 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -15,8 +15,6 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "tqueue.h" -#include "tworker.h" #include "vnodeMain.h" #include "vnodeWrite.h" #include "vnodeWriteMsg.h" diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 68cd067fb9..8e5d7a47fd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -245,11 +245,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_DISK_PERMISSIONS, "No write permission f TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR, "Missing data file") TAOS_DEFINE_ERROR(TSDB_CODE_VND_OUT_OF_MEMORY, "Out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, "Unexpected generic error in vnode") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VRESION_FILE, "Invalid version file") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FULL, "Database memory is full for commit failed") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full for waiting commit") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_CFG_FILE, "Invalid config file") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TERM_FILE, "Invalid term file") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_FLOWCTRL, "Database memory is full") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_DROPPING, "Database is dropping") -TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_BALANCING, "Database is balancing") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_UPDATING, "Database is updating") TAOS_DEFINE_ERROR(TSDB_CODE_VND_IS_CLOSING, "Database is closing") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_SYNCED, "Database suspended") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied") -- GitLab