提交 73ee288a 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

......@@ -658,7 +658,7 @@ typedef struct {
typedef struct SStatusMsg {
int32_t sver;
int32_t dnodeId;
int64_t clusterId;
int32_t clusterId;
uint32_t rebootTime; // time stamp for last reboot
int16_t numOfCores;
int16_t numOfSupportMnodes;
......@@ -671,9 +671,9 @@ typedef struct SStatusMsg {
typedef struct {
int32_t dnodeId;
int32_t clusterId;
int8_t dropped;
char reserved[3];
int64_t clusterId;
char reserved[7];
} SDnodeCfg;
typedef struct {
......
......@@ -45,7 +45,7 @@ typedef struct SMnodeLoad {
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int32_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
......
......@@ -52,7 +52,7 @@ bool taosGetSysMemory(float *memoryUsedMB);
void taosPrintOsInfo();
int taosSystem(const char *cmd);
void taosKillSystem();
bool taosGetSystemUid(char *uid, int32_t uidlen);
int32_t taosGetSystemUid(char *uid, int32_t uidlen);
char * taosGetCmdlineByPID(int pid);
void taosSetCoreDump(bool enable);
......
......@@ -115,25 +115,25 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0301) //"Message is progressing")
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302) //"Messag need to be reprocessed")
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303) //"Insufficient privilege for operation")
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0304) //"Unexpected generic error in mnode")
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0305) //"Invalid message connection")
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0306) //"Incompatible protocol version")
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307) //"Invalid message length")
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308) //"Invalid message type")
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309) //"Too many connections")
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B) //"Data expired")
#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C) //"Invalid query id")
#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D) //"Invalid stream id")
#define TSDB_CODE_MND_INVALID_CONN_ID TAOS_DEF_ERROR_CODE(0, 0x030E) //"Invalid connection id")
#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310) //"mnode is alreay running")
#define TSDB_CODE_MND_FAILED_TO_CONFIG_SYNC TAOS_DEF_ERROR_CODE(0, 0x0311) //"failed to config sync")
#define TSDB_CODE_MND_FAILED_TO_START_SYNC TAOS_DEF_ERROR_CODE(0, 0x0312) //"failed to start sync")
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313) //"failed to create mnode dir")
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314) //"failed to init components")
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x030B)
#define TSDB_CODE_MND_INVALID_QUERY_ID TAOS_DEF_ERROR_CODE(0, 0x030C)
#define TSDB_CODE_MND_INVALID_STREAM_ID TAOS_DEF_ERROR_CODE(0, 0x030D)
#define TSDB_CODE_MND_INVALID_CONN_ID TAOS_DEF_ERROR_CODE(0, 0x030E)
#define TSDB_CODE_MND_MNODE_IS_RUNNING TAOS_DEF_ERROR_CODE(0, 0x0310)
#define TSDB_CODE_MND_FAILED_TO_CONFIG_SYNC TAOS_DEF_ERROR_CODE(0, 0x0311)
#define TSDB_CODE_MND_FAILED_TO_START_SYNC TAOS_DEF_ERROR_CODE(0, 0x0312)
#define TSDB_CODE_MND_FAILED_TO_CREATE_DIR TAOS_DEF_ERROR_CODE(0, 0x0313)
#define TSDB_CODE_MND_FAILED_TO_INIT_STEP TAOS_DEF_ERROR_CODE(0, 0x0314)
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0320)
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0321)
......
......@@ -27,7 +27,7 @@ void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnode);
int64_t dndGetClusterId(SDnode *pDnode);
int32_t dndGetClusterId(SDnode *pDnode);
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg);
......
......@@ -57,8 +57,8 @@ typedef struct {
typedef struct {
int32_t dnodeId;
int32_t dropped;
int32_t clusterId;
uint32_t rebootTime;
int64_t clusterId;
SEpSet mnodeEpSet;
char *file;
SHashObj *dnodeHash;
......
......@@ -26,10 +26,10 @@ int32_t dndGetDnodeId(SDnode *pDnode) {
return dnodeId;
}
int64_t dndGetClusterId(SDnode *pDnode) {
int32_t dndGetClusterId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
int64_t clusterId = pMgmt->clusterId;
int32_t clusterId = pMgmt->clusterId;
taosRUnLockLatch(&pMgmt->latch);
return clusterId;
}
......@@ -190,78 +190,78 @@ static int32_t dndReadDnodes(SDnode *pDnode) {
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s since dnodeId not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pMgmt->dnodeId = atoi(dnodeId->valuestring);
pMgmt->dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
if (!clusterId || clusterId->type != cJSON_Number) {
dError("failed to read %s since clusterId not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pMgmt->clusterId = atoll(clusterId->valuestring);
pMgmt->clusterId = clusterId->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pMgmt->dropped = atoi(dropped->valuestring);
pMgmt->dropped = dropped->valueint;
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s since dnodeInfos not found", pMgmt->file);
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
if (!dnodes || dnodes->type != cJSON_Array) {
dError("failed to read %s since dnodes not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize <= 0) {
dError("failed to read %s since dnodeInfos size:%d invalid", pMgmt->file, dnodeInfosSize);
int32_t numOfNodes = cJSON_GetArraySize(dnodes);
if (numOfNodes <= 0) {
dError("failed to read %s since numOfNodes:%d invalid", pMgmt->file, numOfNodes);
goto PRASE_DNODE_OVER;
}
pMgmt->dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
pMgmt->dnodeEps = calloc(1, numOfNodes * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (pMgmt->dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_DNODE_OVER;
}
pMgmt->dnodeEps->num = dnodeInfosSize;
pMgmt->dnodeEps->num = numOfNodes;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
for (int32_t i = 0; i < numOfNodes; ++i) {
cJSON *node = cJSON_GetArrayItem(dnodes, i);
if (node == NULL) break;
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
cJSON *dnodeId = cJSON_GetObjectItem(node, "id");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s, dnodeId not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pDnodeEp->id = atoi(dnodeId->valuestring);
pDnodeEp->id = dnodeId->valueint;
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
if (!isMnode || isMnode->type != cJSON_String) {
dError("failed to read %s, isMnode not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pDnodeEp->isMnode = atoi(isMnode->valuestring);
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
tstrncpy(pDnodeEp->fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_String) {
cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s, dnodePort not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pDnodeEp->port = atoi(dnodePort->valuestring);
pDnodeEp->port = dnodePort->valueint;
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
if (!isMnode || isMnode->type != cJSON_Number) {
dError("failed to read %s, isMnode not found", pMgmt->file);
goto PRASE_DNODE_OVER;
}
pDnodeEp->isMnode = isMnode->valueint;
}
code = 0;
......@@ -307,16 +307,16 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pMgmt->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pMgmt->dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": %d,\n", pMgmt->clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) {
SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", pDnodeEp->id);
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", pDnodeEp->isMnode);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", pDnodeEp->fqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", pDnodeEp->port);
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->port);
len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode);
if (i < pMgmt->dnodeEps->num - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
......@@ -344,13 +344,11 @@ static void dndSendStatusMsg(SDnode *pDnode) {
return;
}
bool changed = false;
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->opt.sver);
pStatus->dnodeId = htonl(pMgmt->dnodeId);
pStatus->clusterId = htobe64(pMgmt->clusterId);
pStatus->clusterId = htonl(pMgmt->clusterId);
pStatus->rebootTime = htonl(pMgmt->rebootTime);
pStatus->numOfCores = htons(pDnode->opt.numOfCores);
pStatus->numOfSupportMnodes = htons(pDnode->opt.numOfCores);
......@@ -379,7 +377,7 @@ static void dndSendStatusMsg(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) {
dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
dInfo("set dnodeId:%d clusterId:%d dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
......@@ -420,7 +418,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SStatusRsp *pRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
pCfg->clusterId = htonl(pCfg->clusterId);
dndUpdateDnodeCfg(pDnode, pCfg);
if (pCfg->dropped) return;
......@@ -440,6 +438,7 @@ static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { a
static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); }
static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
dDebug("config msg is received");
SCfgDnodeMsg *pCfg = pMsg->pCont;
int32_t code = TSDB_CODE_OPS_NOT_SUPPORT;
......@@ -449,12 +448,12 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) {
}
static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) {
dInfo("startup msg is received");
dDebug("startup msg is received");
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
dndGetStartup(pDnode, pStartup);
dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
rpcSendResponse(&rpcRsp);
......@@ -470,7 +469,7 @@ static void *dnodeThreadRoutine(void *param) {
pthread_testcancel();
if (dndGetStat(pDnode) == DND_STAT_RUNNING) {
// dndSendStatusMsg(pDnode);
dndSendStatusMsg(pDnode);
}
}
}
......
......@@ -130,43 +130,43 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_String) {
if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", pMgmt->file);
goto PRASE_MNODE_OVER;
}
pMgmt->deployed = atoi(deployed->valuestring);
pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", pMgmt->file);
goto PRASE_MNODE_OVER;
}
pMgmt->dropped = atoi(dropped->valuestring);
pMgmt->dropped = dropped->valueint;
cJSON *nodes = cJSON_GetObjectItem(root, "nodes");
if (!nodes || nodes->type != cJSON_Array) {
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", pMgmt->file);
goto PRASE_MNODE_OVER;
}
pMgmt->replica = cJSON_GetArraySize(nodes);
pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since nodes size %d invalid", pMgmt->file, pMgmt->replica);
dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica);
goto PRASE_MNODE_OVER;
}
for (int32_t i = 0; i < pMgmt->replica; ++i) {
cJSON *node = cJSON_GetArrayItem(nodes, i);
cJSON *node = cJSON_GetArrayItem(mnodes, i);
if (node == NULL) break;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_String || id->valuestring == NULL) {
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", pMgmt->file);
goto PRASE_MNODE_OVER;
}
pReplica->id = atoi(id->valuestring);
pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
......@@ -176,15 +176,15 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) {
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_String || port->valuestring == NULL) {
if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", pMgmt->file);
goto PRASE_MNODE_OVER;
}
pReplica->port = atoi(port->valuestring);
pReplica->port = port->valueint;
}
code = 0;
dInfo("succcessed to read file %s", pMgmt->file);
dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
PRASE_MNODE_OVER:
if (content != NULL) free(content);
......@@ -213,15 +213,15 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) {
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", pMgmt->deployed);
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"nodes\": [{\n");
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < pMgmt->replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
len += snprintf(content + len, maxLen - len, " \"id\": \"%d\",\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": \"%u\"\n", pReplica->port);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
if (i < pMgmt->replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
......@@ -241,7 +241,7 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) {
return -1;
}
dInfo("successed to write %s", pMgmt->file);
dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped);
return 0;
}
......@@ -289,7 +289,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
pMgmt->pMnode = NULL;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 1) taosMsleep(10);
......@@ -396,29 +395,34 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = dndStartMnodeWorker(pDnode);
if (code != 0) {
dError("failed to start mnode worker since %s", terrstr());
return code;
}
SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
return -1;
}
pMgmt->deployed = 1;
int32_t code = dndWriteMnodeFile(pDnode);
if (code != 0) {
dError("failed to write mnode file since %s", terrstr());
code = terrno;
dndStopMnodeWorker(pDnode);
pMgmt->deployed = 0;
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
terrno = code;
return code;
return -1;
}
if (dndWriteMnodeFile(pDnode) != 0) {
dError("failed to write mnode file since %s", terrstr());
code = dndStartMnodeWorker(pDnode);
if (code != 0) {
dError("failed to start mnode worker since %s", terrstr());
code = terrno;
pMgmt->deployed = 0;
dndStopMnodeWorker(pDnode);
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
terrno = code;
return code;
return -1;
}
taosWLockLatch(&pMgmt->latch);
......@@ -426,6 +430,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
pMgmt->deployed = 1;
taosWUnLockLatch(&pMgmt->latch);
dInfo("mnode open successfully");
return 0;
}
......@@ -914,6 +919,7 @@ void dndCleanupMnode(SDnode *pDnode) {
dndStopMnodeWorker(pDnode);
dndCleanupMnodeMgmtWorker(pDnode);
tfree(pMgmt->file);
mndClose(pMgmt->pMnode);
dInfo("dnode-mnode is cleaned up");
}
......
......@@ -58,6 +58,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_USE_DB] = dndProcessMnodeWriteMsg;
......@@ -115,12 +117,12 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dndProcessMnodeWriteMsg;
// message from dnode to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
......@@ -131,19 +133,20 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
dTrace("RPC %p, rsp:%s app:%p is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
rpcFreeCont(pMsg->pCont);
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
(*fp)(pDnode, pMsg, pEpSet);
dTrace("RPC %p, rsp:%s app:%p is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->ahandle,
pMsg->code & 0XFFFF);
} else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
dError("RPC %p, rsp:%s app:%p not processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
}
rpcFreeCont(pMsg->pCont);
}
static int32_t dndInitClient(SDnode *pDnode) {
......@@ -187,19 +190,19 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dTrace("RPC %p, network test req will be processed", pMsg->handle);
dTrace("RPC %p, network test req, app:%p will be processed", pMsg->handle, pMsg->ahandle);
dndProcessDnodeReq(pDnode, pMsg, pEpSet);
return;
}
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("RPC %p, req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
} else if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
dError("RPC %p, req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
......@@ -207,7 +210,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
if (pMsg->pCont == NULL) {
dTrace("RPC %p, req:%s not processed since content is null", pMsg->handle, taosMsg[msgType]);
dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
rpcSendResponse(&rspMsg);
return;
......@@ -215,10 +218,10 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, req:%s will be processed", pMsg->handle, taosMsg[msgType]);
dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
(*fp)(pDnode, pMsg, pEpSet);
} else {
dError("RPC %p, req:%s is not processed", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, taosMsg[msgType], pMsg->ahandle);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
......
......@@ -897,6 +897,7 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode mgmt worker is initialized");
return 0;
}
......@@ -905,6 +906,7 @@ static void dndCleanupVnodeMgmtWorker(SDnode *pDnode) {
tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ);
tWorkerCleanup(&pMgmt->mgmtPool);
pMgmt->pMgmtQ = NULL;
dDebug("vnode mgmt worker is closed");
}
static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) {
......@@ -965,6 +967,7 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode read worker is initialized");
return 0;
}
......@@ -972,6 +975,7 @@ static void dndCleanupVnodeReadWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tWorkerCleanup(&pMgmt->fetchPool);
tWorkerCleanup(&pMgmt->queryPool);
dDebug("vnode close worker is initialized");
}
static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) {
......@@ -1018,12 +1022,14 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode write worker is initialized");
return 0;
}
static void dndCleanupVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerCleanup(&pMgmt->writePool);
dDebug("vnode write worker is closed");
}
static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
......@@ -1047,8 +1053,8 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
int32_t maxThreads = tsNumOfCores / 2;
if (maxThreads < 1) maxThreads = 1;
SVnodesMgmt * pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->writePool;
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SMWorkerPool *pPool = &pMgmt->syncPool;
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
......@@ -1056,12 +1062,14 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) {
return -1;
}
dDebug("vnode sync worker is initialized");
return 0;
}
static void dndCleanupVnodeSyncWorker(SDnode *pDnode) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
tMWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode sync worker is closed");
}
int32_t dndInitVnodes(SDnode *pDnode) {
......
......@@ -115,7 +115,7 @@ typedef struct STrans {
} STrans;
typedef struct SClusterObj {
int64_t id;
int32_t id;
char uid[TSDB_CLUSTER_ID_LEN];
int64_t createdTime;
int64_t updateTime;
......@@ -299,25 +299,18 @@ typedef struct {
char payload[];
} SShowObj;
typedef struct {
int32_t len;
void *rsp;
} SMnodeRsp;
typedef struct SMnodeMsg {
char user[TSDB_USER_LEN];
SMnode *pMnode;
void (*fp)(SMnodeMsg *pMsg, int32_t code);
SRpcConnInfo conn;
SUserObj *pUser;
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime;
SMnodeRsp rpcRsp;
SRpcMsg rpcMsg;
char pCont[];
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime;
SRpcMsg rpcMsg;
int32_t contLen;
void *pCont;
} SMnodeMsg;
#ifdef __cplusplus
......
......@@ -36,7 +36,7 @@ typedef struct {
typedef struct SMnode {
int32_t dnodeId;
int64_t clusterId;
int32_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
......@@ -58,10 +58,6 @@ typedef struct SMnode {
char *charset;
} SMnode;
tmr_h mndGetTimer(SMnode *pMnode);
int32_t mndGetDnodeId(SMnode *pMnode);
int64_t mndGetClusterId(SMnode *pMnode);
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg);
......
......@@ -22,8 +22,10 @@
extern "C" {
#endif
int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode);
int32_t mndInitUser(SMnode *pMnode);
void mndCleanupUser(SMnode *pMnode);
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName);
void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
#ifdef __cplusplus
}
......
......@@ -44,6 +44,7 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) {
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_ACCT_VER) {
mError("failed to decode acct since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
}
......@@ -68,14 +69,26 @@ static SSdbRow *mnodeAcctActionDecode(SSdbRaw *pRaw) {
return pRow;
}
static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) { return 0; }
static int32_t mnodeAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct) {
mTrace("acct:%s, perform insert action", pAcct->acct);
memset(&pAcct->info, 0, sizeof(SAcctInfo));
return 0;
}
static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { return 0; }
static int32_t mnodeAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) {
mTrace("acct:%s, perform delete action", pAcct->acct);
return 0;
}
static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *pDstAcct) {
SAcctObj tObj;
int32_t len = (int32_t)((int8_t *)&tObj.info - (int8_t *)&tObj);
memcpy(pDstAcct, pSrcAcct, len);
mTrace("acct:%s, perform update action", pSrcAcct->acct);
memcpy(pSrcAcct->acct, pDstAcct->acct, TSDB_USER_LEN);
pSrcAcct->createdTime = pDstAcct->createdTime;
pSrcAcct->updateTime = pDstAcct->updateTime;
pSrcAcct->acctId = pDstAcct->acctId;
pSrcAcct->status = pDstAcct->status;
pSrcAcct->cfg = pDstAcct->cfg;
return 0;
}
......@@ -98,6 +111,7 @@ static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("acct:%s, will be created while deploy sdb", acctObj.acct);
return sdbWrite(pMnode->pSdb, pRaw);
}
......
......@@ -14,8 +14,97 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndCluster.h"
#include "mndTrans.h"
int32_t mndInitCluster(SMnode *pMnode) { return 0; }
void mndCleanupCluster(SMnode *pMnode) {}
\ No newline at end of file
#define SDB_CLUSTER_VER 1
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_CLUSTER, SDB_CLUSTER_VER, sizeof(SClusterObj));
if (pRaw == NULL) return NULL;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pCluster->id);
SDB_SET_INT64(pRaw, dataPos, pCluster->createdTime)
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime)
SDB_SET_BINARY(pRaw, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN)
return pRaw;
}
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_CLUSTER_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to decode cluster since %s", terrstr());
return NULL;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
SClusterObj *pCluster = sdbGetRowObj(pRow);
if (pCluster == NULL) return NULL;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pCluster->id)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pCluster->updateTime)
SDB_GET_BINARY(pRaw, pRow, dataPos, pCluster->uid, TSDB_CLUSTER_ID_LEN)
return pRow;
}
static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform insert action", pCluster->id);
return 0;
}
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) {
mTrace("cluster:%d, perform delete action", pCluster->id);
return 0;
}
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pSrcCluster, SClusterObj *pDstCluster) {
mTrace("cluster:%d, perform update action", pSrcCluster->id);
return 0;
}
static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
SClusterObj clusterObj = {0};
clusterObj.createdTime = taosGetTimestampMs();
clusterObj.updateTime = clusterObj.createdTime;
int32_t code = taosGetSystemUid(clusterObj.uid, TSDB_CLUSTER_ID_LEN);
if (code != 0) {
strcpy(clusterObj.uid, "tdengine2.0");
mError("failed to get uid from system, set to default val %s", clusterObj.uid);
} else {
mDebug("cluster:%d, uid is %s", clusterObj.id, clusterObj.uid);
}
clusterObj.id = MurmurHash3_32(clusterObj.uid, TSDB_CLUSTER_ID_LEN);
clusterObj.id = abs(clusterObj.id);
pMnode->clusterId = clusterObj.id;
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("cluster:%d, will be created while deploy sdb", clusterObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
int32_t mndInitCluster(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CLUSTER,
.keyType = SDB_KEY_INT32,
.deployFp = (SdbDeployFp)mndCreateDefaultCluster,
.encodeFp = (SdbEncodeFp)mndClusterActionEncode,
.decodeFp = (SdbDecodeFp)mndClusterActionDecode,
.insertFp = (SdbInsertFp)mndClusterActionInsert,
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupCluster(SMnode *pMnode) {}
......@@ -91,19 +91,24 @@ static void mnodeResetDnode(SDnodeObj *pDnode) {
}
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) {
mTrace("dnode:%d, perform insert action", pDnode->id);
mnodeResetDnode(pDnode);
return 0;
}
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { return 0; }
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
mTrace("dnode:%d, perform delete action", pDnode->id);
return 0;
}
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) {
mTrace("dnode:%d, perform update action", pSrcDnode->id);
pSrcDnode->id = pDstDnode->id;
pSrcDnode->createdTime = pDstDnode->createdTime;
pSrcDnode->updateTime = pDstDnode->updateTime;
pSrcDnode->port = pDstDnode->port;
memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN);
mnodeResetDnode(pSrcDnode);
memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN);
return 0;
}
static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
......@@ -118,6 +123,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("dnode:%d, will be created while deploy sdb", dnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
......@@ -169,7 +175,7 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
i++;
}
pEps->num = i;
pEps->num = htonl(i);
}
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
......@@ -205,11 +211,10 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
return 0;
}
static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->sver = htonl(pStatus->sver);
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->clusterId = htonl(pStatus->clusterId);
pStatus->rebootTime = htonl(pStatus->rebootTime);
pStatus->numOfCores = htons(pStatus->numOfCores);
pStatus->numOfSupportMnodes = htons(pStatus->numOfSupportMnodes);
......@@ -219,6 +224,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(pStatus->clusterCfg.mnodeEqualVnodeNum);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
}
static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
mndParseStatusMsg(pStatus);
SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) {
......@@ -249,15 +259,14 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_MSG_VERSION;
}
int64_t clusterId = mndGetClusterId(pMnode);
if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, clusterId);
mDebug("dnode:%d %s, first access, set clusterId %d", pDnode->id, pDnode->ep, pMnode->clusterId);
} else {
if (pStatus->clusterId != clusterId) {
if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
}
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, clusterId);
mError("dnode:%d, clusterId %d not match exist %d", pDnode->id, pStatus->clusterId, pMnode->clusterId);
mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_INVALID_CLUSTER_ID;
} else {
......@@ -296,11 +305,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
pRsp->dnodeCfg.dropped = 0;
pRsp->dnodeCfg.clusterId = htobe64(clusterId);
pRsp->dnodeCfg.clusterId = htonl(pMnode->clusterId);
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
pMsg->rpcRsp.len = contLen;
pMsg->rpcRsp.rsp = pRsp;
pMsg->contLen = contLen;
pMsg->pCont = pRsp;
mndReleaseDnode(pMnode, pDnode);
return 0;
......@@ -325,7 +334,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS_RSP, mndProcessStatusMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_STATUS, mndProcessStatusMsg);
return sdbSetTable(pMnode->pSdb, table);
}
......
......@@ -59,9 +59,11 @@ static void mnodeResetMnode(SMnodeObj *pMnodeObj) {
}
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) {
mTrace("mnode:%d, perform insert action", pMnodeObj->id);
pMnodeObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pMnodeObj->id);
if (pMnodeObj->pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("mnode:%d, failed to perform insert action since %s", pMnodeObj->id, terrstr());
return -1;
}
......@@ -70,6 +72,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pMnodeObj) {
}
static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) {
mTrace("mnode:%d, perform delete action", pMnodeObj->id);
if (pMnodeObj->pDnode != NULL) {
sdbRelease(pSdb, pMnodeObj->pDnode);
pMnodeObj->pDnode = NULL;
......@@ -79,15 +82,16 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj) {
}
static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pSrcMnode, SMnodeObj *pDstMnode) {
mTrace("mnode:%d, perform update action", pSrcMnode->id);
pSrcMnode->id = pDstMnode->id;
pSrcMnode->createdTime = pDstMnode->createdTime;
pSrcMnode->updateTime = pDstMnode->updateTime;
mnodeResetMnode(pSrcMnode);
return 0;
}
static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
SMnodeObj mnodeObj = {0};
mnodeObj.id = 0;
mnodeObj.id = 1;
mnodeObj.createdTime = taosGetTimestampMs();
mnodeObj.updateTime = mnodeObj.createdTime;
......@@ -95,6 +99,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("mnode:%d, will be created while deploy sdb", mnodeObj.id);
return sdbWrite(pMnode->pSdb, pRaw);
}
......
......@@ -203,9 +203,9 @@ static void mndSendTelemetryReport() {
return;
}
int64_t clusterId = mndGetClusterId(NULL);
int32_t clusterId = 0;
char clusterIdStr[20] = {0};
snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
snprintf(clusterIdStr, sizeof(clusterIdStr), "%d", clusterId);
SBufferWriter bw = tbufInitWriter(NULL, false);
mndBeginObject(&bw);
......
......@@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE
#include "mndSync.h"
#include "tkey.h"
#include "mndTrans.h"
#include "tkey.h"
#define SDB_USER_VER 1
......@@ -41,6 +41,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != SDB_USER_VER) {
mError("failed to decode user since %s", terrstr());
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
return NULL;
}
......@@ -61,15 +62,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
}
static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform insert action", pUser->user);
pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->prohibitDbHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr());
return -1;
}
pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct);
if (pUser->pAcct == NULL) {
terrno = TSDB_CODE_MND_ACCT_NOT_EXIST;
mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr());
return -1;
}
......@@ -77,12 +81,13 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
}
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform delete action", pUser->user);
if (pUser->prohibitDbHash) {
taosHashCleanup(pUser->prohibitDbHash);
pUser->prohibitDbHash = NULL;
}
if (pUser->acct != NULL) {
if (pUser->pAcct != NULL) {
sdbRelease(pSdb, pUser->pAcct);
pUser->pAcct = NULL;
}
......@@ -91,9 +96,13 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
}
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDstUser) {
SUserObj tObj;
int32_t len = (int32_t)((int8_t *)tObj.prohibitDbHash - (int8_t *)&tObj);
memcpy(pDstUser, pSrcUser, len);
mTrace("user:%s, perform update action", pSrcUser->user);
memcpy(pSrcUser->user, pDstUser->user, TSDB_USER_LEN);
memcpy(pSrcUser->pass, pDstUser->pass, TSDB_KEY_LEN);
memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN);
pSrcUser->createdTime = pDstUser->createdTime;
pSrcUser->updateTime = pDstUser->updateTime;
pSrcUser->rootAuth = pDstUser->rootAuth;
return 0;
}
......@@ -113,6 +122,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mTrace("user:%s, will be created while deploy sdb", userObj.user);
return sdbWrite(pMnode->pSdb, pRaw);
}
......@@ -196,7 +206,7 @@ static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return -1;
}
SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->conn.user);
SUserObj *pOperUser = sdbAcquire(pMnode->pSdb, SDB_USER, pMsg->user);
if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
......@@ -229,4 +239,15 @@ int32_t mndInitUser(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupUser(SMnode *pMnode) {}
\ No newline at end of file
void mndCleanupUser(SMnode *pMnode) {}
SUserObj *mndAcquireUser(SMnode *pMnode, const char *userName) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_USER, &userName);
}
void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pUser);
}
......@@ -32,30 +32,6 @@
#include "mndUser.h"
#include "mndVgroup.h"
int32_t mndGetDnodeId(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->dnodeId;
}
return -1;
}
int64_t mndGetClusterId(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->clusterId;
}
return -1;
}
tmr_h mndGetTimer(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->timer;
}
return NULL;
}
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) {
(*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
......@@ -112,6 +88,7 @@ static int32_t mnodeCreateDir(SMnode *pMnode, const char *path) {
static int32_t mndInitSdb(SMnode *pMnode) {
SSdbOpt opt = {0};
opt.path = pMnode->path;
opt.pMnode = pMnode;
pMnode->pSdb = sdbInit(&opt);
if (pMnode->pSdb == NULL) {
......@@ -177,11 +154,11 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
if (pMnode->pSteps == NULL) return;
if (pos == -1) {
pos = taosArrayGetSize(pMnode->pSteps);
pos = taosArrayGetSize(pMnode->pSteps) - 1;
}
for (int32_t s = pos; s >= 0; s--) {
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
mDebug("step:%s will cleanup", pStep->name);
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)(pMnode);
......@@ -189,6 +166,7 @@ static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
}
taosArrayClear(pMnode->pSteps);
taosArrayDestroy(pMnode->pSteps);
pMnode->pSteps = NULL;
}
......@@ -235,7 +213,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
pMnode->statusInterval < 1 || pOption->mnodeEqualVnodeNum < 0) {
terrno = TSDB_CODE_MND_APP_ERROR;
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1;
}
......@@ -266,8 +244,9 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
}
int32_t code = mnodeCreateDir(pMnode, path);
if (mnodeCreateDir(pMnode, path) != 0) {
mError("failed to open mnode since %s", tstrerror(code));
if (code != 0) {
code = terrno;
mError("failed to open mnode since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
......@@ -275,7 +254,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
code = mndSetOptions(pMnode, pOption);
if (code != 0) {
mError("failed to open mnode since %s", tstrerror(code));
code = terrno;
mError("failed to open mnode since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
......@@ -283,7 +263,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
code = mndInitSteps(pMnode);
if (code != 0) {
mError("failed to open mnode since %s", tstrerror(code));
code = terrno;
mError("failed to open mnode since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
......@@ -291,7 +272,8 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
code = mndExecSteps(pMnode);
if (code != 0) {
mError("failed to open mnode since %s", tstrerror(code));
code = terrno;
mError("failed to open mnode since %s", terrstr());
mndClose(pMnode);
terrno = code;
return NULL;
......@@ -345,31 +327,36 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create msg since %s", terrstr());
return NULL;
}
if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
SRpcConnInfo connInfo = {0};
if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
mndCleanupMsg(pMsg);
mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("failed to create msg since %s", terrstr());
return NULL;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->pMnode = pMnode;
pMsg->rpcMsg = *pRpcMsg;
pMsg->createdTime = taosGetTimestampSec();
mTrace("msg:%p, is created", pMsg);
return pMsg;
}
void mndCleanupMsg(SMnodeMsg *pMsg) {
if (pMsg->pUser != NULL) {
sdbRelease(pMsg->pMnode->pSdb, pMsg->pUser);
}
taosFreeQitem(pMsg);
mTrace("msg:%p, is destroyed", pMsg);
}
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {}
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp);
}
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
......@@ -378,29 +365,34 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType % 2 == 1);
mTrace("msg:%p, app:%p will be processed", pMsg, ahandle);
if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
goto PROCESS_RPC_END;
}
if (isReq && pMsg->rpcMsg.pCont == NULL) {
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MND_INVALID_MSG_LEN;
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
goto PROCESS_RPC_END;
}
MndMsgFp fp = pMnode->msgFp[msgType];
if (fp == NULL) {
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
code = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, app:%p failed to process since not handle", pMsg, ahandle);
goto PROCESS_RPC_END;
}
code = (*fp)(pMnode, pMsg);
if (code != 0) {
code = terrno;
mError("msg:%p, app:%p type:%s failed to process since %s", pMsg, ahandle, taosMsg[msgType], terrstr());
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
goto PROCESS_RPC_END;
} else {
mTrace("msg:%p, app:%p is processed", pMsg, ahandle);
}
PROCESS_RPC_END:
......@@ -408,13 +400,13 @@ PROCESS_RPC_END:
if (code == TSDB_CODE_APP_NOT_READY) {
mndSendRedirectMsg(pMnode, &pMsg->rpcMsg);
} else if (code != 0) {
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rspMsg);
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp);
} else {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
rpcSendResponse(&rpcRsp);
}
}
mndCleanupMsg(pMsg);
}
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) {
......
......@@ -40,7 +40,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
return NULL;
}
for (int32_t i = 0; i < SDB_MAX; ++i) {
for (ESdbType i = 0; i < SDB_MAX; ++i) {
taosInitRWLatch(&pSdb->locks[i]);
}
......@@ -69,47 +69,67 @@ void sdbCleanup(SSdb *pSdb) {
tfree(pSdb->tmpDir);
}
for (int32_t i = 0; i < SDB_MAX; ++i) {
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash != NULL) {
taosHashClear(hash);
taosHashCleanup(hash);
if (hash == NULL) continue;
SdbDeleteFp deleteFp = pSdb->deleteFps[i];
SSdbRow **ppRow = taosHashIterate(hash, NULL);
while (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow == NULL) continue;
if (deleteFp != NULL) {
(*deleteFp)(pSdb, pRow->pObj);
}
sdbFreeRow(pRow);
ppRow = taosHashIterate(hash, ppRow);
}
}
for (ESdbType i = 0; i < SDB_MAX; ++i) {
SHashObj *hash = pSdb->hashObjs[i];
if (hash == NULL) continue;
taosHashClear(hash);
taosHashCleanup(hash);
pSdb->hashObjs[i] = NULL;
mTrace("sdb table:%d is cleaned up", i);
}
free(pSdb);
mDebug("sdb is cleaned up");
}
int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
ESdbType sdb = table.sdbType;
pSdb->keyTypes[sdb] = table.keyType;
pSdb->insertFps[sdb] = table.insertFp;
pSdb->updateFps[sdb] = table.updateFp;
pSdb->deleteFps[sdb] = table.deleteFp;
pSdb->deployFps[sdb] = table.deployFp;
pSdb->encodeFps[sdb] = table.encodeFp;
pSdb->decodeFps[sdb] = table.decodeFp;
for (int32_t i = 0; i < SDB_MAX; ++i) {
int32_t type;
if (pSdb->keyTypes[i] == SDB_KEY_INT32) {
type = TSDB_DATA_TYPE_INT;
} else if (pSdb->keyTypes[i] == SDB_KEY_INT64) {
type = TSDB_DATA_TYPE_BIGINT;
} else {
type = TSDB_DATA_TYPE_BINARY;
}
SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(type), true, HASH_NO_LOCK);
if (hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
ESdbType sdbType = table.sdbType;
EKeyType keyType = table.keyType;
pSdb->keyTypes[sdbType] = table.keyType;
pSdb->insertFps[sdbType] = table.insertFp;
pSdb->updateFps[sdbType] = table.updateFp;
pSdb->deleteFps[sdbType] = table.deleteFp;
pSdb->deployFps[sdbType] = table.deployFp;
pSdb->encodeFps[sdbType] = table.encodeFp;
pSdb->decodeFps[sdbType] = table.decodeFp;
int32_t hashType = 0;
if (keyType == SDB_KEY_INT32) {
hashType = TSDB_DATA_TYPE_INT;
} else if (keyType == SDB_KEY_INT64) {
hashType = TSDB_DATA_TYPE_BIGINT;
} else {
hashType = TSDB_DATA_TYPE_BINARY;
}
pSdb->hashObjs[i] = hash;
taosInitRWLatch(&pSdb->locks[i]);
SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(hashType), true, HASH_NO_LOCK);
if (hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pSdb->hashObjs[sdbType] = hash;
taosInitRWLatch(&pSdb->locks[sdbType]);
mTrace("sdb table:%d is initialized", sdbType);
return 0;
}
\ No newline at end of file
......@@ -42,7 +42,7 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
static int32_t sdbRunDeployFp(SSdb *pSdb) {
mDebug("start to deploy sdb");
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
SdbDeployFp fp = pSdb->deployFps[i];
if (fp == NULL) continue;
......@@ -150,7 +150,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
return -1;
}
for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
for (ESdbType i = SDB_MAX - 1; i > SDB_START; --i) {
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue;
......@@ -173,6 +173,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (taosWriteFile(fd, pRaw, writeLen) != writeLen) {
code = TAOS_SYSTEM_ERROR(terrno);
taosHashCancelIterate(hash, ppRow);
free(pRaw);
break;
}
......@@ -180,6 +181,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
if (taosWriteFile(fd, &cksum, sizeof(int32_t)) != sizeof(int32_t)) {
code = TAOS_SYSTEM_ERROR(terrno);
taosHashCancelIterate(hash, ppRow);
free(pRaw);
break;
}
} else {
......@@ -188,6 +190,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
break;
}
free(pRaw);
ppRow = taosHashIterate(hash, ppRow);
}
taosWUnLockLatch(pLock);
......
......@@ -237,7 +237,15 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) {
SRWLatch *pLock = &pSdb->locks[type];
taosRLockLatch(pLock);
SSdbRow **ppRow = taosHashIterate(hash, ppRow);
if (pIter != NULL) {
SSdbRow *pLastRow = *(SSdbRow **)pIter;
int32_t ref = atomic_sub_fetch_32(&pLastRow->refCount, 1);
if (ref <= 0 && pLastRow->status == SDB_STATUS_DROPPED) {
sdbFreeRow(pLastRow);
}
}
SSdbRow **ppRow = taosHashIterate(hash, pIter);
while (ppRow != NULL) {
SSdbRow *pRow = *ppRow;
if (pRow == NULL || pRow->status != SDB_STATUS_READY) {
......
......@@ -19,5 +19,5 @@ target_link_libraries(
# test
if(${BUILD_TEST})
add_subdirectory(test)
# add_subdirectory(test)
endif(${BUILD_TEST})
\ No newline at end of file
......@@ -252,14 +252,14 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) {
void taosSetCoreDump() { SetUnhandledExceptionFilter(&FlCrashDump); }
bool taosGetSystemUid(char *uid) {
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
GUID guid;
CoCreateGuid(&guid);
sprintf(uid, "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3, guid.Data4[0],
guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6], guid.Data4[7]);
return true;
return 0;
}
char *taosGetCmdlineByPID(int pid) { return ""; }
......@@ -452,12 +452,12 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
}
}
bool taosGetSystemUid(char *uid) {
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
uuid_t uuid = {0};
uuid_generate(uuid);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower(uuid, uid);
return true;
return 0;
}
char *taosGetCmdlineByPID(int pid) {
......@@ -1070,13 +1070,13 @@ void taosSetCoreDump(bool enable) {
#endif
}
bool taosGetSystemUid(char *uid, int32_t uidlen) {
int32_t taosGetSystemUid(char *uid, int32_t uidlen) {
int fd;
int len = 0;
fd = open("/proc/sys/kernel/random/uuid", 0);
if (fd < 0) {
return false;
return -1;
} else {
len = read(fd, uid, uidlen);
close(fd);
......@@ -1084,9 +1084,10 @@ bool taosGetSystemUid(char *uid, int32_t uidlen) {
if (len >= 36) {
uid[36] = 0;
return true;
return 0;
}
return false;
return -1;
}
char *taosGetCmdlineByPID(int pid) {
......
......@@ -130,7 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_NEED_REPROCESSED, "Message need to be reprocessed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NO_RIGHTS, "Insufficient privilege for operation")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Unexpected generic error in mnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_OPTIONS, "Invalid mnode options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONNECTION, "Invalid message connection")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_VERSION, "Incompatible protocol version")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_MSG_LEN, "Invalid message length")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册