diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index b655016f3c47ab0677abf4533dce9c35fe078b28..9bae38e3a72dddb276679efb38eb7f38e7e1c455 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -658,7 +658,7 @@ typedef struct { typedef struct SStatusMsg { int32_t sver; int32_t dnodeId; - int64_t clusterId; + int32_t clusterId; uint32_t rebootTime; // time stamp for last reboot int16_t numOfCores; int16_t numOfSupportMnodes; @@ -671,9 +671,9 @@ typedef struct SStatusMsg { typedef struct { int32_t dnodeId; + int32_t clusterId; int8_t dropped; - char reserved[3]; - int64_t clusterId; + char reserved[7]; } SDnodeCfg; typedef struct { diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index a8a8117886eaa6b109b9747628c78a4443dbddc6..c7415af0d50357b02cdf032425aaa70faca43ce8 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -45,7 +45,7 @@ typedef struct SMnodeLoad { typedef struct { int32_t dnodeId; - int64_t clusterId; + int32_t clusterId; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index a3919890bdc241be47ad7793c245ba286a29f54e..56f6b3e0da5419c7a9bed800b44e7325cb0ce602 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -52,7 +52,7 @@ bool taosGetSysMemory(float *memoryUsedMB); void taosPrintOsInfo(); int taosSystem(const char *cmd); void taosKillSystem(); -bool taosGetSystemUid(char *uid, int32_t uidlen); +int32_t taosGetSystemUid(char *uid, int32_t uidlen); char * taosGetCmdlineByPID(int pid); void taosSetCoreDump(bool enable); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 304fb56a6ae3e8372a6bb391722ea7c48a76247d..413c84fe315d04ea66c06fffe00025d4c3833342 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -115,25 +115,25 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range") // mnode -#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed") -#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0301) //"Message is progressing") -#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302) //"Messag need to be reprocessed") -#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303) //"Insufficient privilege for operation") -#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0304) //"Unexpected generic error in mnode") -#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0305) //"Invalid message connection") -#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0306) //"Incompatible protocol version") -#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307) //"Invalid message length") -#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308) //"Invalid message type") -#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309) //"Too many connections") -#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B) //"Data expired") -#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C) //"Invalid query id") -#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D) //"Invalid stream id") -#define TSDB_CODE_MND_INVALID_CONN_ID TAOS_DEF_ERROR_CODE(0, 0x030E) //"Invalid connection id") -#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310) //"mnode is alreay running") -#define TSDB_CODE_MND_FAILED_TO_CONFIG_SYNC TAOS_DEF_ERROR_CODE(0, 0x0311) //"failed to config sync") -#define TSDB_CODE_MND_FAILED_TO_START_SYNC TAOS_DEF_ERROR_CODE(0, 0x0312) //"failed to start sync") -#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir") -#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components") +#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) +#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0301) +#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302) +#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303) +#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0304) +#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0305) +#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0306) +#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307) +#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308) +#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309) +#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B) +#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C) +#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D) +#define TSDB_CODE_MND_INVALID_CONN_ID TAOS_DEF_ERROR_CODE(0, 0x030E) +#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310) +#define TSDB_CODE_MND_FAILED_TO_CONFIG_SYNC TAOS_DEF_ERROR_CODE(0, 0x0311) +#define TSDB_CODE_MND_FAILED_TO_START_SYNC TAOS_DEF_ERROR_CODE(0, 0x0312) +#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) +#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) #define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320) #define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321) diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index 4bb4cad8cc8a00a2707ba1e436d44040a6022964..c21c6a0b860514ce815f4628d1cfc946d68b3123 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -27,7 +27,7 @@ void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndGetDnodeId(SDnode *pDnode); -int64_t dndGetClusterId(SDnode *pDnode); +int32_t dndGetClusterId(SDnode *pDnode); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 39243a1795a176efb86eff71e76e9558f4e76ea7..136f6eee0c823c461b768ccf75e7f7e7f78b832c 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -57,8 +57,8 @@ typedef struct { typedef struct { int32_t dnodeId; int32_t dropped; + int32_t clusterId; uint32_t rebootTime; - int64_t clusterId; SEpSet mnodeEpSet; char *file; SHashObj *dnodeHash; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 7b8afa96bbb8815b8fe26e35e3b81cb9366bfc3d..c89741d03ba44e47bdeb9b93e1a85935514bd597 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -26,10 +26,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) { return dnodeId; } -int64_t dndGetClusterId(SDnode *pDnode) { +int32_t dndGetClusterId(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - int64_t clusterId = pMgmt->clusterId; + int32_t clusterId = pMgmt->clusterId; taosRUnLockLatch(&pMgmt->latch); return clusterId; } @@ -190,78 +190,78 @@ static int32_t dndReadDnodes(SDnode *pDnode) { } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_String) { + if (!dnodeId || dnodeId->type != cJSON_Number) { dError("failed to read %s since dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pMgmt->dnodeId = atoi(dnodeId->valuestring); + pMgmt->dnodeId = dnodeId->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); - if (!clusterId || clusterId->type != cJSON_String) { + if (!clusterId || clusterId->type != cJSON_Number) { dError("failed to read %s since clusterId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pMgmt->clusterId = atoll(clusterId->valuestring); + pMgmt->clusterId = clusterId->valueint; cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_String) { + if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pMgmt->dropped = atoi(dropped->valuestring); + pMgmt->dropped = dropped->valueint; - cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); - if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", pMgmt->file); + cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); + if (!dnodes || dnodes->type != cJSON_Array) { + dError("failed to read %s since dnodes not found", pMgmt->file); goto PRASE_DNODE_OVER; } - int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); - if (dnodeInfosSize <= 0) { - dError("failed to read %s since dnodeInfos size:%d invalid", pMgmt->file, dnodeInfosSize); + int32_t numOfNodes = cJSON_GetArraySize(dnodes); + if (numOfNodes <= 0) { + dError("failed to read %s since numOfNodes:%d invalid", pMgmt->file, numOfNodes); goto PRASE_DNODE_OVER; } - pMgmt->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + pMgmt->dnodeEps = calloc(1, numOfNodes * sizeof(SDnodeEp) + sizeof(SDnodeEps)); if (pMgmt->dnodeEps == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); goto PRASE_DNODE_OVER; } - pMgmt->dnodeEps->num = dnodeInfosSize; + pMgmt->dnodeEps->num = numOfNodes; - for (int32_t i = 0; i < dnodeInfosSize; ++i) { - cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); - if (dnodeInfo == NULL) break; + for (int32_t i = 0; i < numOfNodes; ++i) { + cJSON *node = cJSON_GetArrayItem(dnodes, i); + if (node == NULL) break; SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; - cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_String) { + cJSON *dnodeId = cJSON_GetObjectItem(node, "id"); + if (!dnodeId || dnodeId->type != cJSON_Number) { dError("failed to read %s, dnodeId not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnodeEp->id = atoi(dnodeId->valuestring); + pDnodeEp->id = dnodeId->valueint; - cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); - if (!isMnode || isMnode->type != cJSON_String) { - dError("failed to read %s, isMnode not found", pMgmt->file); - goto PRASE_DNODE_OVER; - } - pDnodeEp->isMnode = atoi(isMnode->valuestring); - - cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); + cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { dError("failed to read %s, dnodeFqdn not found", pMgmt->file); goto PRASE_DNODE_OVER; } tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); - cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); - if (!dnodePort || dnodePort->type != cJSON_String) { + cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); + if (!dnodePort || dnodePort->type != cJSON_Number) { dError("failed to read %s, dnodePort not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnodeEp->port = atoi(dnodePort->valuestring); + pDnodeEp->port = dnodePort->valueint; + + cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); + if (!isMnode || isMnode->type != cJSON_Number) { + dError("failed to read %s, isMnode not found", pMgmt->file); + goto PRASE_DNODE_OVER; + } + pDnodeEp->isMnode = isMnode->valueint; } code = 0; @@ -307,16 +307,16 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped); - len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId); + len += snprintf(content + len, maxLen - len, " \"clusterId\": %d,\n", pMgmt->clusterId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; - len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnodeEp->id); - len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", pDnodeEp->isMnode); - len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", pDnodeEp->fqdn); - len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", pDnodeEp->port); + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->port); + len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode); if (i < pMgmt->dnodeEps->num - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { @@ -344,13 +344,11 @@ static void dndSendStatusMsg(SDnode *pDnode) { return; } - bool changed = false; - SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); pStatus->sver = htonl(pDnode->opt.sver); pStatus->dnodeId = htonl(pMgmt->dnodeId); - pStatus->clusterId = htobe64(pMgmt->clusterId); + pStatus->clusterId = htonl(pMgmt->clusterId); pStatus->rebootTime = htonl(pMgmt->rebootTime); pStatus->numOfCores = htons(pDnode->opt.numOfCores); pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores); @@ -379,7 +377,7 @@ static void dndSendStatusMsg(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) { - dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); + dInfo("set dnodeId:%d clusterId:%d dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; @@ -420,7 +418,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SStatusRsp *pRsp = pMsg->pCont; SDnodeCfg *pCfg = &pRsp->dnodeCfg; pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->clusterId = htobe64(pCfg->clusterId); + pCfg->clusterId = htonl(pCfg->clusterId); dndUpdateDnodeCfg(pDnode, pCfg); if (pCfg->dropped) return; @@ -440,6 +438,7 @@ static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { a static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { + dDebug("config msg is received"); SCfgDnodeMsg *pCfg = pMsg->pCont; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; @@ -449,12 +448,12 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { } static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { - dInfo("startup msg is received"); + dDebug("startup msg is received"); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); dndGetStartup(pDnode, pStartup); - dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); + dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)}; rpcSendResponse(&rpcRsp); @@ -470,7 +469,7 @@ static void *dnodeThreadRoutine(void *param) { pthread_testcancel(); if (dndGetStat(pDnode) == DND_STAT_RUNNING) { - // dndSendStatusMsg(pDnode); + dndSendStatusMsg(pDnode); } } } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 342d14463ab51de85017c129fd3b4d440c2fdcc9..4ec08d5fb3773f28e85c09aaccfd7f88799b0b4e 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -130,43 +130,43 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { } cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); - if (!deployed || deployed->type != cJSON_String) { + if (!deployed || deployed->type != cJSON_Number) { dError("failed to read %s since deployed not found", pMgmt->file); goto PRASE_MNODE_OVER; } - pMgmt->deployed = atoi(deployed->valuestring); + pMgmt->deployed = deployed->valueint; cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_String) { + if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", pMgmt->file); goto PRASE_MNODE_OVER; } - pMgmt->dropped = atoi(dropped->valuestring); + pMgmt->dropped = dropped->valueint; - cJSON *nodes = cJSON_GetObjectItem(root, "nodes"); - if (!nodes || nodes->type != cJSON_Array) { + cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); + if (!mnodes || mnodes->type != cJSON_Array) { dError("failed to read %s since nodes not found", pMgmt->file); goto PRASE_MNODE_OVER; } - pMgmt->replica = cJSON_GetArraySize(nodes); + pMgmt->replica = cJSON_GetArraySize(mnodes); if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { - dError("failed to read %s since nodes size %d invalid", pMgmt->file, pMgmt->replica); + dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica); goto PRASE_MNODE_OVER; } for (int32_t i = 0; i < pMgmt->replica; ++i) { - cJSON *node = cJSON_GetArrayItem(nodes, i); + cJSON *node = cJSON_GetArrayItem(mnodes, i); if (node == NULL) break; SReplica *pReplica = &pMgmt->replicas[i]; cJSON *id = cJSON_GetObjectItem(node, "id"); - if (!id || id->type != cJSON_String || id->valuestring == NULL) { + if (!id || id->type != cJSON_Number) { dError("failed to read %s since id not found", pMgmt->file); goto PRASE_MNODE_OVER; } - pReplica->id = atoi(id->valuestring); + pReplica->id = id->valueint; cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { @@ -176,15 +176,15 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); cJSON *port = cJSON_GetObjectItem(node, "port"); - if (!port || port->type != cJSON_String || port->valuestring == NULL) { + if (!port || port->type != cJSON_Number) { dError("failed to read %s since port not found", pMgmt->file); goto PRASE_MNODE_OVER; } - pReplica->port = atoi(port->valuestring); + pReplica->port = port->valueint; } code = 0; - dInfo("succcessed to read file %s", pMgmt->file); + dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); PRASE_MNODE_OVER: if (content != NULL) free(content); @@ -213,15 +213,15 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) { char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", pMgmt->deployed); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); - len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped); - len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n"); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); for (int32_t i = 0; i < pMgmt->replica; ++i) { SReplica *pReplica = &pMgmt->replicas[i]; - len += snprintf(content + len, maxLen - len, " \"id\": \"%d\",\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": \"%u\"\n", pReplica->port); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); if (i < pMgmt->replica - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { @@ -241,7 +241,7 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) { return -1; } - dInfo("successed to write %s", pMgmt->file); + dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); return 0; } @@ -289,7 +289,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->deployed = 0; - pMgmt->pMnode = NULL; taosWUnLockLatch(&pMgmt->latch); while (pMgmt->refCount > 1) taosMsleep(10); @@ -396,29 +395,34 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t code = dndStartMnodeWorker(pDnode); - if (code != 0) { - dError("failed to start mnode worker since %s", terrstr()); - return code; - } - SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption); if (pMnode == NULL) { dError("failed to open mnode since %s", terrstr()); + return -1; + } + pMgmt->deployed = 1; + + int32_t code = dndWriteMnodeFile(pDnode); + if (code != 0) { + dError("failed to write mnode file since %s", terrstr()); code = terrno; - dndStopMnodeWorker(pDnode); + pMgmt->deployed = 0; + mndClose(pMnode); + mndDestroy(pDnode->dir.mnode); terrno = code; - return code; + return -1; } - if (dndWriteMnodeFile(pDnode) != 0) { - dError("failed to write mnode file since %s", terrstr()); + code = dndStartMnodeWorker(pDnode); + if (code != 0) { + dError("failed to start mnode worker since %s", terrstr()); code = terrno; + pMgmt->deployed = 0; dndStopMnodeWorker(pDnode); mndClose(pMnode); mndDestroy(pDnode->dir.mnode); terrno = code; - return code; + return -1; } taosWLockLatch(&pMgmt->latch); @@ -426,6 +430,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); + dInfo("mnode open successfully"); return 0; } @@ -914,6 +919,7 @@ void dndCleanupMnode(SDnode *pDnode) { dndStopMnodeWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode); tfree(pMgmt->file); + mndClose(pMgmt->pMnode); dInfo("dnode-mnode is cleaned up"); } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index eee2e36c0f762ec85ac860deed7f64d9700f6956..44ca1d1407f5f5533f07b7d65374043c70d5b389 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -58,6 +58,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DB] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DB] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_USE_DB] = dndProcessMnodeWriteMsg; @@ -115,12 +117,12 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dndProcessMnodeWriteMsg; // message from dnode to mnode - pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp; pMgmt->msgFp[TSDB_MSG_TYPE_GRANT] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dndProcessDnodeRsp; pMgmt->msgFp[TSDB_MSG_TYPE_STATUS] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dndProcessDnodeRsp; + pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp; } static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -131,19 +133,20 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (dndGetStat(pDnode) == DND_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); + dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle); rpcFreeCont(pMsg->pCont); return; } DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { - dTrace("RPC %p, rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code)); (*fp)(pDnode, pMsg, pEpSet); + dTrace("RPC %p, rsp:%s app:%p is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->ahandle, + pMsg->code & 0XFFFF); } else { - dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); - rpcFreeCont(pMsg->pCont); + dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle); } + rpcFreeCont(pMsg->pCont); } static int32_t dndInitClient(SDnode *pDnode) { @@ -187,19 +190,19 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - dTrace("RPC %p, network test req will be processed", pMsg->handle); + dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle); dndProcessDnodeReq(pDnode, pMsg, pEpSet); return; } if (dndGetStat(pDnode) == DND_STAT_STOPPED) { - dError("RPC %p, req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]); + dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); return; } else if (dndGetStat(pDnode) != DND_STAT_RUNNING) { - dError("RPC %p, req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); + dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); @@ -207,7 +210,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { } if (pMsg->pCont == NULL) { - dTrace("RPC %p, req:%s not processed since content is null", pMsg->handle, taosMsg[msgType]); + dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; rpcSendResponse(&rspMsg); return; @@ -215,10 +218,10 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { DndMsgFp fp = pMgmt->msgFp[msgType]; if (fp != NULL) { - dTrace("RPC %p, req:%s will be processed", pMsg->handle, taosMsg[msgType]); + dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle); (*fp)(pDnode, pMsg, pEpSet); } else { - dError("RPC %p, req:%s is not processed", pMsg->handle, taosMsg[msgType]); + dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, taosMsg[msgType], pMsg->ahandle); SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 6e328297e6ffeb3645f11a8acac40f9969508e4c..a6eb916aeffd88f139d90e9a0512288acc733ee1 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -897,6 +897,7 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) { return -1; } + dDebug("vnode mgmt worker is initialized"); return 0; } @@ -905,6 +906,7 @@ static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) { tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); tWorkerCleanup(&pMgmt->mgmtPool); pMgmt->pMgmtQ = NULL; + dDebug("vnode mgmt worker is closed"); } static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { @@ -965,6 +967,7 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { return -1; } + dDebug("vnode read worker is initialized"); return 0; } @@ -972,6 +975,7 @@ static void dndCleanupVnodeReadWorker(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tWorkerCleanup(&pMgmt->fetchPool); tWorkerCleanup(&pMgmt->queryPool); + dDebug("vnode close worker is initialized"); } static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { @@ -1018,12 +1022,14 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { return -1; } + dDebug("vnode write worker is initialized"); return 0; } static void dndCleanupVnodeWriteWorker(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->writePool); + dDebug("vnode write worker is closed"); } static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { @@ -1047,8 +1053,8 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { int32_t maxThreads = tsNumOfCores / 2; if (maxThreads < 1) maxThreads = 1; - SVnodesMgmt * pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->writePool; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SMWorkerPool *pPool = &pMgmt->syncPool; pPool->name = "vnode-sync"; pPool->max = maxThreads; if (tMWorkerInit(pPool) != 0) { @@ -1056,12 +1062,14 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { return -1; } + dDebug("vnode sync worker is initialized"); return 0; } static void dndCleanupVnodeSyncWorker(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->syncPool); + dDebug("vnode sync worker is closed"); } int32_t dndInitVnodes(SDnode *pDnode) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ded66edaa7e3ada05f5bc7b1d3f61b7f9d05bf67..9facb8829cebfc98a2a836ae9294b483939d96f4 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -115,7 +115,7 @@ typedef struct STrans { } STrans; typedef struct SClusterObj { - int64_t id; + int32_t id; char uid[TSDB_CLUSTER_ID_LEN]; int64_t createdTime; int64_t updateTime; @@ -299,25 +299,18 @@ typedef struct { char payload[]; } SShowObj; -typedef struct { - int32_t len; - void *rsp; -} SMnodeRsp; - typedef struct SMnodeMsg { + char user[TSDB_USER_LEN]; SMnode *pMnode; - void (*fp)(SMnodeMsg *pMsg, int32_t code); - SRpcConnInfo conn; - SUserObj *pUser; - int16_t received; - int16_t successed; - int16_t expected; - int16_t retry; - int32_t code; - int64_t createdTime; - SMnodeRsp rpcRsp; - SRpcMsg rpcMsg; - char pCont[]; + int16_t received; + int16_t successed; + int16_t expected; + int16_t retry; + int32_t code; + int64_t createdTime; + SRpcMsg rpcMsg; + int32_t contLen; + void *pCont; } SMnodeMsg; #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index bb3f0ca2632828d89dcca71d382274ff8d0a9605..8910ed4e631ef0b46a1d30c61b84ab41f47b61bb 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -36,7 +36,7 @@ typedef struct { typedef struct SMnode { int32_t dnodeId; - int64_t clusterId; + int32_t clusterId; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; @@ -58,10 +58,6 @@ typedef struct SMnode { char *charset; } SMnode; -tmr_h mndGetTimer(SMnode *pMnode); -int32_t mndGetDnodeId(SMnode *pMnode); -int64_t mndGetClusterId(SMnode *pMnode); - void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg); void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg); diff --git a/source/dnode/mnode/impl/inc/mndUser.h b/source/dnode/mnode/impl/inc/mndUser.h index ce570773bd82d0705ec875a0d9bfe377db12d21c..4d31a87b191761b7a027837d8b3ec9617faff1cf 100644 --- a/source/dnode/mnode/impl/inc/mndUser.h +++ b/source/dnode/mnode/impl/inc/mndUser.h @@ -22,8 +22,10 @@ extern "C" { #endif -int32_t mndInitUser(SMnode *pMnode); -void mndCleanupUser(SMnode *pMnode); +int32_t mndInitUser(SMnode *pMnode); +void mndCleanupUser(SMnode *pMnode); +SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName); +void mndReleaseUser(SMnode *pMnode, SUserObj *pUser); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 73b016742245d2d4259fc7974745e9f48f191da1..23328506e636e58b53a76ba2a7df277112952250 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -44,6 +44,7 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sver != SDB_ACCT_VER) { + mError("failed to decode acct since %s", terrstr()); terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } @@ -68,14 +69,26 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) { return pRow; } -static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { return 0; } +static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { + mTrace("acct:%s, perform insert action", pAcct->acct); + memset(&pAcct->info, 0, sizeof(SAcctInfo)); + return 0; +} -static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { return 0; } +static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { + mTrace("acct:%s, perform delete action", pAcct->acct); + return 0; +} static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) { - SAcctObj tObj; - int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj); - memcpy(pDstAcct, pSrcAcct, len); + mTrace("acct:%s, perform update action", pSrcAcct->acct); + + memcpy(pSrcAcct->acct, pDstAcct->acct, TSDB_USER_LEN); + pSrcAcct->createdTime = pDstAcct->createdTime; + pSrcAcct->updateTime = pDstAcct->updateTime; + pSrcAcct->acctId = pDstAcct->acctId; + pSrcAcct->status = pDstAcct->status; + pSrcAcct->cfg = pDstAcct->cfg; return 0; } @@ -98,6 +111,7 @@ static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); + mTrace("acct:%s, will be created while deploy sdb", acctObj.acct); return sdbWrite(pMnode->pSdb, pRaw); } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 98e9e702297e88e7b30d59f4265f5e799925e34d..1780c88d6b2ff1e35419212e7f5fc596382bc42d 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -14,8 +14,97 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndCluster.h" +#include "mndTrans.h" -int32_t mndInitCluster(SMnode *pMnode) { return 0; } -void mndCleanupCluster(SMnode *pMnode) {} \ No newline at end of file +#define SDB_CLUSTER_VER 1 + +static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { + SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj)); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, pCluster->id); + SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime) + SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime) + SDB_SET_BINARY(pRaw, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN) + + return pRaw; +} + +static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_CLUSTER_VER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode cluster since %s", terrstr()); + return NULL; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj)); + SClusterObj *pCluster = sdbGetRowObj(pRow); + if (pCluster == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime) + SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN) + + return pRow; +} + +static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) { + mTrace("cluster:%d, perform insert action", pCluster->id); + return 0; +} + +static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { + mTrace("cluster:%d, perform delete action", pCluster->id); + return 0; +} + +static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster) { + mTrace("cluster:%d, perform update action", pSrcCluster->id); + return 0; +} + +static int32_t mndCreateDefaultCluster(SMnode *pMnode) { + SClusterObj clusterObj = {0}; + clusterObj.createdTime = taosGetTimestampMs(); + clusterObj.updateTime = clusterObj.createdTime; + + int32_t code = taosGetSystemUid(clusterObj.uid, TSDB_CLUSTER_ID_LEN); + if (code != 0) { + strcpy(clusterObj.uid, "tdengine2.0"); + mError("failed to get uid from system, set to default val %s", clusterObj.uid); + } else { + mDebug("cluster:%d, uid is %s", clusterObj.id, clusterObj.uid); + } + clusterObj.id = MurmurHash3_32(clusterObj.uid, TSDB_CLUSTER_ID_LEN); + clusterObj.id = abs(clusterObj.id); + pMnode->clusterId = clusterObj.id; + + SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + + mTrace("cluster:%d, will be created while deploy sdb", clusterObj.id); + return sdbWrite(pMnode->pSdb, pRaw); +} + +int32_t mndInitCluster(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_CLUSTER, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultCluster, + .encodeFp = (SdbEncodeFp)mndClusterActionEncode, + .decodeFp = (SdbDecodeFp)mndClusterActionDecode, + .insertFp = (SdbInsertFp)mndClusterActionInsert, + .updateFp = (SdbUpdateFp)mndClusterActionUpdate, + .deleteFp = (SdbDeleteFp)mndClusterActionDelete}; + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupCluster(SMnode *pMnode) {} diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 067faba558b0b2fb31dc1423d4230cbcb98e25f6..a6cee3efc5d6549f0a18b63766ddacf4396f519a 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -91,19 +91,24 @@ static void mnodeResetDnode(SDnodeObj *pDnode) { } static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) { + mTrace("dnode:%d, perform insert action", pDnode->id); mnodeResetDnode(pDnode); return 0; } -static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { return 0; } +static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { + mTrace("dnode:%d, perform delete action", pDnode->id); + return 0; +} static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) { + mTrace("dnode:%d, perform update action", pSrcDnode->id); pSrcDnode->id = pDstDnode->id; pSrcDnode->createdTime = pDstDnode->createdTime; pSrcDnode->updateTime = pDstDnode->updateTime; pSrcDnode->port = pDstDnode->port; - memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); - mnodeResetDnode(pSrcDnode); + memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); + return 0; } static int32_t mndCreateDefaultDnode(SMnode *pMnode) { @@ -118,6 +123,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); + mTrace("dnode:%d, will be created while deploy sdb", dnodeObj.id); return sdbWrite(pMnode->pSdb, pRaw); } @@ -169,7 +175,7 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { i++; } - pEps->num = i; + pEps->num = htonl(i); } static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { @@ -205,11 +211,10 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { return 0; } -static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { - SStatusMsg *pStatus = pMsg->rpcMsg.pCont; +static void mndParseStatusMsg(SStatusMsg *pStatus) { pStatus->sver = htonl(pStatus->sver); pStatus->dnodeId = htonl(pStatus->dnodeId); - pStatus->clusterId = htobe64(pStatus->clusterId); + pStatus->clusterId = htonl(pStatus->clusterId); pStatus->rebootTime = htonl(pStatus->rebootTime); pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes); @@ -219,6 +224,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum); pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); +} + +static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { + SStatusMsg *pStatus = pMsg->rpcMsg.pCont; + mndParseStatusMsg(pStatus); SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { @@ -249,15 +259,14 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return TSDB_CODE_MND_INVALID_MSG_VERSION; } - int64_t clusterId = mndGetClusterId(pMnode); if (pStatus->dnodeId == 0) { - mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, clusterId); + mDebug("dnode:%d %s, first access, set clusterId %d", pDnode->id, pDnode->ep, pMnode->clusterId); } else { - if (pStatus->clusterId != clusterId) { + if (pStatus->clusterId != pMnode->clusterId) { if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; } - mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, clusterId); + mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId); mndReleaseDnode(pMnode, pDnode); return TSDB_CODE_MND_INVALID_CLUSTER_ID; } else { @@ -296,11 +305,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) { pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dropped = 0; - pRsp->dnodeCfg.clusterId = htobe64(clusterId); + pRsp->dnodeCfg.clusterId = htonl(pMnode->clusterId); mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); - pMsg->rpcRsp.len = contLen; - pMsg->rpcRsp.rsp = pRsp; + pMsg->contLen = contLen; + pMsg->pCont = pRsp; mndReleaseDnode(pMnode, pDnode); return 0; @@ -325,7 +334,7 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg); - mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS_RSP, mndProcessStatusMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS, mndProcessStatusMsg); return sdbSetTable(pMnode->pSdb, table); } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index bbf39dcff97a91e615c12fa55d7418e695d72d6a..d2ace31a3661c9294fd57b1b9649d6d2cd05f0f5 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -59,9 +59,11 @@ static void mnodeResetMnode(SMnodeObj *pMnodeObj) { } static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) { + mTrace("mnode:%d, perform insert action", pMnodeObj->id); pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id); if (pMnodeObj->pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + mError("mnode:%d, failed to perform insert action since %s", pMnodeObj->id, terrstr()); return -1; } @@ -70,6 +72,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) { } static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { + mTrace("mnode:%d, perform delete action", pMnodeObj->id); if (pMnodeObj->pDnode != NULL) { sdbRelease(pSdb, pMnodeObj->pDnode); pMnodeObj->pDnode = NULL; @@ -79,15 +82,16 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) { } static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) { + mTrace("mnode:%d, perform update action", pSrcMnode->id); pSrcMnode->id = pDstMnode->id; pSrcMnode->createdTime = pDstMnode->createdTime; pSrcMnode->updateTime = pDstMnode->updateTime; - mnodeResetMnode(pSrcMnode); + return 0; } static int32_t mndCreateDefaultMnode(SMnode *pMnode) { SMnodeObj mnodeObj = {0}; - mnodeObj.id = 0; + mnodeObj.id = 1; mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; @@ -95,6 +99,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); + mTrace("mnode:%d, will be created while deploy sdb", mnodeObj.id); return sdbWrite(pMnode->pSdb, pRaw); } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index f7fafc7095050835eb8fb78545bc1a35fb0630a1..f9f349aad8610933ab414d152d73d29b750ce4cc 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -203,9 +203,9 @@ static void mndSendTelemetryReport() { return; } - int64_t clusterId = mndGetClusterId(NULL); + int32_t clusterId = 0; char clusterIdStr[20] = {0}; - snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId); + snprintf(clusterIdStr, sizeof(clusterIdStr), "%d", clusterId); SBufferWriter bw = tbufInitWriter(NULL, false); mndBeginObject(&bw); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 4156d2ab37676346232ae5fb547ad09fc0dd417f..abbe41a60d43c88d2a93a3eddaa52bfcc8230493 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -15,8 +15,8 @@ #define _DEFAULT_SOURCE #include "mndSync.h" -#include "tkey.h" #include "mndTrans.h" +#include "tkey.h" #define SDB_USER_VER 1 @@ -41,6 +41,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sver != SDB_USER_VER) { + mError("failed to decode user since %s", terrstr()); terrno = TSDB_CODE_SDB_INVALID_DATA_VER; return NULL; } @@ -61,15 +62,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { } static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) { + mTrace("user:%s, perform insert action", pUser->user); pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pUser->prohibitDbHash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr()); return -1; } pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct); if (pUser->pAcct == NULL) { terrno = TSDB_CODE_MND_ACCT_NOT_EXIST; + mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr()); return -1; } @@ -77,12 +81,13 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) { } static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { + mTrace("user:%s, perform delete action", pUser->user); if (pUser->prohibitDbHash) { taosHashCleanup(pUser->prohibitDbHash); pUser->prohibitDbHash = NULL; } - if (pUser->acct != NULL) { + if (pUser->pAcct != NULL) { sdbRelease(pSdb, pUser->pAcct); pUser->pAcct = NULL; } @@ -91,9 +96,13 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { } static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) { - SUserObj tObj; - int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj); - memcpy(pDstUser, pSrcUser, len); + mTrace("user:%s, perform update action", pSrcUser->user); + memcpy(pSrcUser->user, pDstUser->user, TSDB_USER_LEN); + memcpy(pSrcUser->pass, pDstUser->pass, TSDB_KEY_LEN); + memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN); + pSrcUser->createdTime = pDstUser->createdTime; + pSrcUser->updateTime = pDstUser->updateTime; + pSrcUser->rootAuth = pDstUser->rootAuth; return 0; } @@ -113,6 +122,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); + mTrace("user:%s, will be created while deploy sdb", userObj.user); return sdbWrite(pMnode->pSdb, pRaw); } @@ -196,7 +206,7 @@ static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return -1; } - SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->conn.user); + SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->user); if (pOperUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; mError("user:%s, failed to create since %s", pCreate->user, terrstr()); @@ -229,4 +239,15 @@ int32_t mndInitUser(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupUser(SMnode *pMnode) {} \ No newline at end of file +void mndCleanupUser(SMnode *pMnode) {} + +SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName) { + SSdb *pSdb = pMnode->pSdb; + return sdbAcquire(pSdb, SDB_USER, &userName); +} + +void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pUser); +} + diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index e1c7b66c36a228edc34f77f4b7ef9dd780111d4b..e390deda6d6dec465c87c5d35f0e0674ae648816 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -32,30 +32,6 @@ #include "mndUser.h" #include "mndVgroup.h" -int32_t mndGetDnodeId(SMnode *pMnode) { - if (pMnode != NULL) { - return pMnode->dnodeId; - } - - return -1; -} - -int64_t mndGetClusterId(SMnode *pMnode) { - if (pMnode != NULL) { - return pMnode->clusterId; - } - - return -1; -} - -tmr_h mndGetTimer(SMnode *pMnode) { - if (pMnode != NULL) { - return pMnode->timer; - } - - return NULL; -} - void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) { if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) { (*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg); @@ -112,6 +88,7 @@ static int32_t mnodeCreateDir(SMnode *pMnode, const char *path) { static int32_t mndInitSdb(SMnode *pMnode) { SSdbOpt opt = {0}; opt.path = pMnode->path; + opt.pMnode = pMnode; pMnode->pSdb = sdbInit(&opt); if (pMnode->pSdb == NULL) { @@ -177,11 +154,11 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) { if (pMnode->pSteps == NULL) return; if (pos == -1) { - pos = taosArrayGetSize(pMnode->pSteps); + pos = taosArrayGetSize(pMnode->pSteps) - 1; } for (int32_t s = pos; s >= 0; s--) { - SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos); + SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s); mDebug("step:%s will cleanup", pStep->name); if (pStep->cleanupFp != NULL) { (*pStep->cleanupFp)(pMnode); @@ -189,6 +166,7 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) { } taosArrayClear(pMnode->pSteps); + taosArrayDestroy(pMnode->pSteps); pMnode->pSteps = NULL; } @@ -235,7 +213,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) { - terrno = TSDB_CODE_MND_APP_ERROR; + terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; } @@ -266,8 +244,9 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { } int32_t code = mnodeCreateDir(pMnode, path); - if (mnodeCreateDir(pMnode, path) != 0) { - mError("failed to open mnode since %s", tstrerror(code)); + if (code != 0) { + code = terrno; + mError("failed to open mnode since %s", terrstr()); mndClose(pMnode); terrno = code; return NULL; @@ -275,7 +254,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { code = mndSetOptions(pMnode, pOption); if (code != 0) { - mError("failed to open mnode since %s", tstrerror(code)); + code = terrno; + mError("failed to open mnode since %s", terrstr()); mndClose(pMnode); terrno = code; return NULL; @@ -283,7 +263,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { code = mndInitSteps(pMnode); if (code != 0) { - mError("failed to open mnode since %s", tstrerror(code)); + code = terrno; + mError("failed to open mnode since %s", terrstr()); mndClose(pMnode); terrno = code; return NULL; @@ -291,7 +272,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { code = mndExecSteps(pMnode); if (code != 0) { - mError("failed to open mnode since %s", tstrerror(code)); + code = terrno; + mError("failed to open mnode since %s", terrstr()); mndClose(pMnode); terrno = code; return NULL; @@ -345,31 +327,36 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to create msg since %s", terrstr()); return NULL; } - if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) { + SRpcConnInfo connInfo = {0}; + if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { mndCleanupMsg(pMsg); - mError("can not get user from conn:%p", pMsg->rpcMsg.handle); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + mError("failed to create msg since %s", terrstr()); return NULL; } + memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); + pMsg->pMnode = pMnode; pMsg->rpcMsg = *pRpcMsg; pMsg->createdTime = taosGetTimestampSec(); + mTrace("msg:%p, is created", pMsg); return pMsg; } void mndCleanupMsg(SMnodeMsg *pMsg) { - if (pMsg->pUser != NULL) { - sdbRelease(pMsg->pMnode->pSdb, pMsg->pUser); - } - taosFreeQitem(pMsg); + mTrace("msg:%p, is destroyed", pMsg); } -void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {} +void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; + rpcSendResponse(&rpcRsp); +} static void mndProcessRpcMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; @@ -378,29 +365,34 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType % 2 == 1); + mTrace("msg:%p, app:%p will be processed", pMsg, ahandle); + if (isReq && !mndIsMaster(pMnode)) { code = TSDB_CODE_APP_NOT_READY; + mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); goto PROCESS_RPC_END; } if (isReq && pMsg->rpcMsg.pCont == NULL) { - mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]); code = TSDB_CODE_MND_INVALID_MSG_LEN; + mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); goto PROCESS_RPC_END; } MndMsgFp fp = pMnode->msgFp[msgType]; if (fp == NULL) { - mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]); code = TSDB_CODE_MSG_NOT_PROCESSED; + mError("msg:%p, app:%p failed to process since not handle", pMsg, ahandle); goto PROCESS_RPC_END; } code = (*fp)(pMnode, pMsg); if (code != 0) { code = terrno; - mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], terrstr()); + mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr()); goto PROCESS_RPC_END; + } else { + mTrace("msg:%p, app:%p is processed", pMsg, ahandle); } PROCESS_RPC_END: @@ -408,13 +400,13 @@ PROCESS_RPC_END: if (code == TSDB_CODE_APP_NOT_READY) { mndSendRedirectMsg(pMnode, &pMsg->rpcMsg); } else if (code != 0) { - SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = code}; - rpcSendResponse(&rspMsg); + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; + rpcSendResponse(&rpcRsp); } else { + SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; + rpcSendResponse(&rpcRsp); } } - - mndCleanupMsg(pMsg); } void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 1ceb3862eeb8ef0cab7d7965260415f1ec22fce6..68cb7be68b249d34a26a854b46635f8d6cb5d0fb 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -40,7 +40,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { return NULL; } - for (int32_t i = 0; i < SDB_MAX; ++i) { + for (ESdbType i = 0; i < SDB_MAX; ++i) { taosInitRWLatch(&pSdb->locks[i]); } @@ -69,47 +69,67 @@ void sdbCleanup(SSdb *pSdb) { tfree(pSdb->tmpDir); } - for (int32_t i = 0; i < SDB_MAX; ++i) { + for (ESdbType i = 0; i < SDB_MAX; ++i) { SHashObj *hash = pSdb->hashObjs[i]; - if (hash != NULL) { - taosHashClear(hash); - taosHashCleanup(hash); + if (hash == NULL) continue; + + SdbDeleteFp deleteFp = pSdb->deleteFps[i]; + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow == NULL) continue; + + if (deleteFp != NULL) { + (*deleteFp)(pSdb, pRow->pObj); + } + sdbFreeRow(pRow); + ppRow = taosHashIterate(hash, ppRow); } + } + + for (ESdbType i = 0; i < SDB_MAX; ++i) { + SHashObj *hash = pSdb->hashObjs[i]; + if (hash == NULL) continue; + + taosHashClear(hash); + taosHashCleanup(hash); pSdb->hashObjs[i] = NULL; + mTrace("sdb table:%d is cleaned up", i); } + free(pSdb); mDebug("sdb is cleaned up"); } int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { - ESdbType sdb = table.sdbType; - pSdb->keyTypes[sdb] = table.keyType; - pSdb->insertFps[sdb] = table.insertFp; - pSdb->updateFps[sdb] = table.updateFp; - pSdb->deleteFps[sdb] = table.deleteFp; - pSdb->deployFps[sdb] = table.deployFp; - pSdb->encodeFps[sdb] = table.encodeFp; - pSdb->decodeFps[sdb] = table.decodeFp; - - for (int32_t i = 0; i < SDB_MAX; ++i) { - int32_t type; - if (pSdb->keyTypes[i] == SDB_KEY_INT32) { - type = TSDB_DATA_TYPE_INT; - } else if (pSdb->keyTypes[i] == SDB_KEY_INT64) { - type = TSDB_DATA_TYPE_BIGINT; - } else { - type = TSDB_DATA_TYPE_BINARY; - } - - SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK); - if (hash == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + ESdbType sdbType = table.sdbType; + EKeyType keyType = table.keyType; + pSdb->keyTypes[sdbType] = table.keyType; + pSdb->insertFps[sdbType] = table.insertFp; + pSdb->updateFps[sdbType] = table.updateFp; + pSdb->deleteFps[sdbType] = table.deleteFp; + pSdb->deployFps[sdbType] = table.deployFp; + pSdb->encodeFps[sdbType] = table.encodeFp; + pSdb->decodeFps[sdbType] = table.decodeFp; + + int32_t hashType = 0; + if (keyType == SDB_KEY_INT32) { + hashType = TSDB_DATA_TYPE_INT; + } else if (keyType == SDB_KEY_INT64) { + hashType = TSDB_DATA_TYPE_BIGINT; + } else { + hashType = TSDB_DATA_TYPE_BINARY; + } - pSdb->hashObjs[i] = hash; - taosInitRWLatch(&pSdb->locks[i]); + SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(hashType), true, HASH_NO_LOCK); + if (hash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } + pSdb->hashObjs[sdbType] = hash; + taosInitRWLatch(&pSdb->locks[sdbType]); + mTrace("sdb table:%d is initialized", sdbType); + return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index b285675b855eb2e55bec78f0ba1b96fefd4f7c85..6f88f08b2c3b3741d289a5d47c94b650bd5bf9fb 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -42,7 +42,7 @@ static int32_t sdbCreateDir(SSdb *pSdb) { static int32_t sdbRunDeployFp(SSdb *pSdb) { mDebug("start to deploy sdb"); - for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { + for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; @@ -150,7 +150,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { return -1; } - for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { + for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) { SdbEncodeFp encodeFp = pSdb->encodeFps[i]; if (encodeFp == NULL) continue; @@ -173,6 +173,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { if (taosWriteFile(fd, pRaw, writeLen) != writeLen) { code = TAOS_SYSTEM_ERROR(terrno); taosHashCancelIterate(hash, ppRow); + free(pRaw); break; } @@ -180,6 +181,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) { code = TAOS_SYSTEM_ERROR(terrno); taosHashCancelIterate(hash, ppRow); + free(pRaw); break; } } else { @@ -188,6 +190,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { break; } + free(pRaw); ppRow = taosHashIterate(hash, ppRow); } taosWUnLockLatch(pLock); diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 0db5c0beb607093caefb452e2b475eb3a58d1039..bdca5eaa987a6498b4d36229e5929984980b33f2 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -237,7 +237,15 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { SRWLatch *pLock = &pSdb->locks[type]; taosRLockLatch(pLock); - SSdbRow **ppRow = taosHashIterate(hash, ppRow); + if (pIter != NULL) { + SSdbRow *pLastRow = *(SSdbRow **)pIter; + int32_t ref = atomic_sub_fetch_32(&pLastRow->refCount, 1); + if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) { + sdbFreeRow(pLastRow); + } + } + + SSdbRow **ppRow = taosHashIterate(hash, pIter); while (ppRow != NULL) { SSdbRow *pRow = *ppRow; if (pRow == NULL || pRow->status != SDB_STATUS_READY) { diff --git a/source/dnode/vnode/impl/CMakeLists.txt b/source/dnode/vnode/impl/CMakeLists.txt index 040b02d2b6633faaf711f7ceff2aa02adb499215..3886308ec941934dec0e0d4a09600c4edfc6cfe8 100644 --- a/source/dnode/vnode/impl/CMakeLists.txt +++ b/source/dnode/vnode/impl/CMakeLists.txt @@ -19,5 +19,5 @@ target_link_libraries( # test if(${BUILD_TEST}) - add_subdirectory(test) + # add_subdirectory(test) endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index e37e059b7d29da14baea9f9cb5d9757a4a4b5634..ca817c4c1e809bd3a74f2385167403de584f9da8 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -252,14 +252,14 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) { void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); } -bool taosGetSystemUid(char *uid) { +int32_t taosGetSystemUid(char *uid, int32_t uidlen) { GUID guid; CoCreateGuid(&guid); sprintf(uid, "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0], guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]); - return true; + return 0; } char *taosGetCmdlineByPID(int pid) { return ""; } @@ -452,12 +452,12 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { } } -bool taosGetSystemUid(char *uid) { +int32_t taosGetSystemUid(char *uid, int32_t uidlen) { uuid_t uuid = {0}; uuid_generate(uuid); // it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null uuid_unparse_lower(uuid, uid); - return true; + return 0; } char *taosGetCmdlineByPID(int pid) { @@ -1070,13 +1070,13 @@ void taosSetCoreDump(bool enable) { #endif } -bool taosGetSystemUid(char *uid, int32_t uidlen) { +int32_t taosGetSystemUid(char *uid, int32_t uidlen) { int fd; int len = 0; fd = open("/proc/sys/kernel/random/uuid", 0); if (fd < 0) { - return false; + return -1; } else { len = read(fd, uid, uidlen); close(fd); @@ -1084,9 +1084,10 @@ bool taosGetSystemUid(char *uid, int32_t uidlen) { if (len >= 36) { uid[36] = 0; - return true; + return 0; } - return false; + + return -1; } char *taosGetCmdlineByPID(int pid) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6d4f4eb6a702c079b7dc66302fd3b21dd1b877c4..11b40f61f65d6dc6ed89f32ca286b4eae8e81ca1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -130,7 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Message need to be reprocessed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Unexpected generic error in mnode") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_OPTIONS, "Invalid mnode options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol version") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length")