未验证 提交 1d8fcda1 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8524 from taosdata/feature/dnode3

refact files
...@@ -21,8 +21,8 @@ extern "C" { ...@@ -21,8 +21,8 @@ extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dnodeInt.h"
int32_t dnodeInitEps(); int32_t dnodeInitConfig();
void dnodeCleanupEps(); void dnodeCleanupConfig();
void dnodeUpdateCfg(SDnodeCfg *data); void dnodeUpdateCfg(SDnodeCfg *data);
void dnodeUpdateDnodeEps(SDnodeEps *data); void dnodeUpdateDnodeEps(SDnodeEps *data);
...@@ -32,7 +32,6 @@ int64_t dnodeGetClusterId(); ...@@ -32,7 +32,6 @@ int64_t dnodeGetClusterId();
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet); void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -19,27 +19,27 @@ ...@@ -19,27 +19,27 @@
#include "thash.h" #include "thash.h"
static struct { static struct {
int32_t dnodeId; int32_t dnodeId;
int32_t dropped; int32_t dropped;
int64_t clusterId; int64_t clusterId;
SDnodeEps *dnodeEps; SDnodeEps *dnodeEps;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SRpcEpSet mnodeEpSetForShell; SRpcEpSet mnodeEpSetForShell;
SRpcEpSet mnodeEpSetForPeer; SRpcEpSet mnodeEpSetForPeer;
char file[PATH_MAX + 20]; char file[PATH_MAX + 20];
pthread_mutex_t mutex; pthread_mutex_t mutex;
} tsEps; } tsConfig;
void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { vstaticoid dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsEps.mnodeEpSetForPeer; *epSet = tsConfig.mnodeEpSetForPeer;
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
} }
void dnodeGetEpSetForShell(SRpcEpSet *epSet) { static void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
*epSet = tsEps.mnodeEpSetForShell; *epSet = tsConfig.mnodeEpSetForShell;
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
} }
void dnodeUpdateMnodeEps(SRpcEpSet *ep) { void dnodeUpdateMnodeEps(SRpcEpSet *ep) {
...@@ -48,18 +48,18 @@ void dnodeUpdateMnodeEps(SRpcEpSet *ep) { ...@@ -48,18 +48,18 @@ void dnodeUpdateMnodeEps(SRpcEpSet *ep) {
return; return;
} }
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse); dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse);
tsEps.mnodeEpSetForPeer = *ep; tsConfig.mnodeEpSetForPeer = *ep;
for (int32_t i = 0; i < ep->numOfEps; ++i) { for (int32_t i = 0; i < ep->numOfEps; ++i) {
ep->port[i] -= TSDB_PORT_DNODEDNODE; ep->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
} }
tsEps.mnodeEpSetForShell = *ep; tsConfig.mnodeEpSetForShell = *ep;
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
} }
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
...@@ -92,9 +92,9 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { ...@@ -92,9 +92,9 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
} }
static void dnodePrintEps() { static void dnodePrintEps() {
dDebug("print dnode list, num:%d", tsEps.dnodeEps->dnodeNum); dDebug("print dnode list, num:%d", tsConfig.dnodeEps->dnodeNum);
for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; i++) { for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode); dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode);
} }
} }
...@@ -104,35 +104,35 @@ static void dnodeResetEps(SDnodeEps *data) { ...@@ -104,35 +104,35 @@ static void dnodeResetEps(SDnodeEps *data) {
int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp); int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp);
if (data->dnodeNum > tsEps.dnodeEps->dnodeNum) { if (data->dnodeNum > tsConfig.dnodeEps->dnodeNum) {
SDnodeEps *tmp = calloc(1, size); SDnodeEps *tmp = calloc(1, size);
if (tmp == NULL) return; if (tmp == NULL) return;
tfree(tsEps.dnodeEps); tfree(tsConfig.dnodeEps);
tsEps.dnodeEps = tmp; tsConfig.dnodeEps = tmp;
} }
if (tsEps.dnodeEps != data) { if (tsConfig.dnodeEps != data) {
memcpy(tsEps.dnodeEps, data, size); memcpy(tsConfig.dnodeEps, data, size);
} }
tsEps.mnodeEpSetForPeer.inUse = 0; tsConfig.mnodeEpSetForPeer.inUse = 0;
tsEps.mnodeEpSetForShell.inUse = 0; tsConfig.mnodeEpSetForShell.inUse = 0;
int32_t index = 0; int32_t index = 0;
for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; i++) { for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
if (!ep->isMnode) continue; if (!ep->isMnode) continue;
if (index >= TSDB_MAX_REPLICA) continue; if (index >= TSDB_MAX_REPLICA) continue;
strcpy(tsEps.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn); strcpy(tsConfig.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn);
strcpy(tsEps.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn); strcpy(tsConfig.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn);
tsEps.mnodeEpSetForShell.port[index] = ep->dnodePort; tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort;
tsEps.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort; tsConfig.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort;
index++; index++;
} }
for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; ++i) { for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
taosHashPut(tsEps.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); taosHashPut(tsConfig.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
} }
dnodePrintEps(); dnodePrintEps();
...@@ -141,9 +141,9 @@ static void dnodeResetEps(SDnodeEps *data) { ...@@ -141,9 +141,9 @@ static void dnodeResetEps(SDnodeEps *data) {
static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
bool changed = false; bool changed = false;
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsEps.dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) { if (ep != NULL) {
char epSaved[TSDB_EP_LEN + 1]; char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
...@@ -151,7 +151,7 @@ static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { ...@@ -151,7 +151,7 @@ static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
tstrncpy(epstr, epSaved, TSDB_EP_LEN); tstrncpy(epstr, epSaved, TSDB_EP_LEN);
} }
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
return changed; return changed;
} }
...@@ -163,101 +163,101 @@ static int32_t dnodeReadEps() { ...@@ -163,101 +163,101 @@ static int32_t dnodeReadEps() {
cJSON *root = NULL; cJSON *root = NULL;
FILE *fp = NULL; FILE *fp = NULL;
fp = fopen(tsEps.file, "r"); fp = fopen(tsConfig.file, "r");
if (!fp) { if (!fp) {
dDebug("file %s not exist", tsEps.file); dDebug("file %s not exist", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
len = (int32_t)fread(content, 1, maxLen, fp); len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", tsEps.file); dError("failed to read %s since content is null", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", tsEps.file); dError("failed to read %s since invalid json format", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) { if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s since dnodeId not found", tsEps.file); dError("failed to read %s since dnodeId not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
tsEps.dnodeId = atoi(dnodeId->valuestring); tsConfig.dnodeId = atoi(dnodeId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) { if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", tsEps.file); dError("failed to read %s since dropped not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
tsEps.dropped = atoi(dropped->valuestring); tsConfig.dropped = atoi(dropped->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) { if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", tsEps.file); dError("failed to read %s since clusterId not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
tsEps.clusterId = atoll(clusterId->valuestring); tsConfig.clusterId = atoll(clusterId->valuestring);
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s since dnodeInfos not found", tsEps.file); dError("failed to read %s since dnodeInfos not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize <= 0) { if (dnodeInfosSize <= 0) {
dError("failed to read %s since dnodeInfos size:%d invalid", tsEps.file, dnodeInfosSize); dError("failed to read %s since dnodeInfos size:%d invalid", tsConfig.file, dnodeInfosSize);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
tsEps.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); tsConfig.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (tsEps.dnodeEps == NULL) { if (tsConfig.dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno)); dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
tsEps.dnodeEps->dnodeNum = dnodeInfosSize; tsConfig.dnodeEps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) { for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break; if (dnodeInfo == NULL) break;
SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) { if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s, dnodeId not found", tsEps.file); dError("failed to read %s, dnodeId not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
ep->dnodeId = atoi(dnodeId->valuestring); ep->dnodeId = atoi(dnodeId->valuestring);
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
if (!isMnode || isMnode->type != cJSON_String) { if (!isMnode || isMnode->type != cJSON_String) {
dError("failed to read %s, isMnode not found", tsEps.file); dError("failed to read %s, isMnode not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
ep->isMnode = atoi(isMnode->valuestring); ep->isMnode = atoi(isMnode->valuestring);
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", tsEps.file); dError("failed to read %s, dnodeFqdn not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_String) { if (!dnodePort || dnodePort->type != cJSON_String) {
dError("failed to read %s, dnodePort not found", tsEps.file); dError("failed to read %s, dnodePort not found", tsConfig.file);
goto PRASE_EPS_OVER; goto PRASE_EPS_OVER;
} }
ep->dnodePort = atoi(dnodePort->valuestring); ep->dnodePort = atoi(dnodePort->valuestring);
} }
dInfo("succcessed to read file %s", tsEps.file); dInfo("succcessed to read file %s", tsConfig.file);
dnodePrintEps(); dnodePrintEps();
PRASE_EPS_OVER: PRASE_EPS_OVER:
...@@ -265,21 +265,21 @@ PRASE_EPS_OVER: ...@@ -265,21 +265,21 @@ PRASE_EPS_OVER:
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp); if (fp != NULL) fclose(fp);
if (dnodeIsDnodeEpChanged(tsEps.dnodeId, tsLocalEp)) { if (dnodeIsDnodeEpChanged(tsConfig.dnodeId, tsLocalEp)) {
dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsEps.dnodeId, tsLocalEp); dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsConfig.dnodeId, tsLocalEp);
return -1; return -1;
} }
dnodeResetEps(tsEps.dnodeEps); dnodeResetEps(tsConfig.dnodeEps);
terrno = 0; terrno = 0;
return 0; return 0;
} }
static int32_t dnodeWriteEps() { static int32_t dnodeWriteEps() {
FILE *fp = fopen(tsEps.file, "w"); FILE *fp = fopen(tsConfig.file, "w");
if (!fp) { if (!fp) {
dError("failed to write %s since %s", tsEps.file, strerror(errno)); dError("failed to write %s since %s", tsConfig.file, strerror(errno));
return -1; return -1;
} }
...@@ -288,17 +288,17 @@ static int32_t dnodeWriteEps() { ...@@ -288,17 +288,17 @@ static int32_t dnodeWriteEps() {
char *content = calloc(1, maxLen + 1); char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsEps.dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsConfig.dnodeId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsEps.dropped); len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsConfig.dropped);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsEps.clusterId); len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsConfig.clusterId);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; ++i) { for (int32_t i = 0; i < tsConfig.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; SDnodeEp *ep = &tsConfig.dnodeEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode); len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort); len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort);
if (i < tsEps.dnodeEps->dnodeNum - 1) { if (i < tsConfig.dnodeEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n"); len += snprintf(content + len, maxLen - len, " },{\n");
} else { } else {
len += snprintf(content + len, maxLen - len, " }]\n"); len += snprintf(content + len, maxLen - len, " }]\n");
...@@ -312,20 +312,20 @@ static int32_t dnodeWriteEps() { ...@@ -312,20 +312,20 @@ static int32_t dnodeWriteEps() {
free(content); free(content);
terrno = 0; terrno = 0;
dInfo("successed to write %s", tsEps.file); dInfo("successed to write %s", tsConfig.file);
return 0; return 0;
} }
int32_t dnodeInitEps() { int32_t dnodeInitConfig() {
tsEps.dnodeId = 0; tsConfig.dnodeId = 0;
tsEps.dropped = 0; tsConfig.dropped = 0;
tsEps.clusterId = 0; tsConfig.clusterId = 0;
tsEps.dnodeEps = NULL; tsConfig.dnodeEps = NULL;
snprintf(tsEps.file, sizeof(tsEps.file), "%s/dnodeEps.json", tsDnodeDir); snprintf(tsConfig.file, sizeof(tsConfig.file), "%s/dnodeEps.json", tsDnodeDir);
pthread_mutex_init(&tsEps.mutex, NULL); pthread_mutex_init(&tsConfig.mutex, NULL);
tsEps.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); tsConfig.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsEps.dnodeHash == NULL) return -1; if (tsConfig.dnodeHash == NULL) return -1;
int32_t ret = dnodeReadEps(); int32_t ret = dnodeReadEps();
if (ret == 0) { if (ret == 0) {
...@@ -335,81 +335,81 @@ int32_t dnodeInitEps() { ...@@ -335,81 +335,81 @@ int32_t dnodeInitEps() {
return ret; return ret;
} }
void dnodeCleanupEps() { void dnodeCleanupConfig() {
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
if (tsEps.dnodeEps != NULL) { if (tsConfig.dnodeEps != NULL) {
free(tsEps.dnodeEps); free(tsConfig.dnodeEps);
tsEps.dnodeEps = NULL; tsConfig.dnodeEps = NULL;
} }
if (tsEps.dnodeHash) { if (tsConfig.dnodeHash) {
taosHashCleanup(tsEps.dnodeHash); taosHashCleanup(tsConfig.dnodeHash);
tsEps.dnodeHash = NULL; tsConfig.dnodeHash = NULL;
} }
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
pthread_mutex_destroy(&tsEps.mutex); pthread_mutex_destroy(&tsConfig.mutex);
} }
void dnodeUpdateDnodeEps(SDnodeEps *data) { void dnodeUpdateDnodeEps(SDnodeEps *data) {
if (data == NULL || data->dnodeNum <= 0) return; if (data == NULL || data->dnodeNum <= 0) return;
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
if (data->dnodeNum != tsEps.dnodeEps->dnodeNum) { if (data->dnodeNum != tsConfig.dnodeEps->dnodeNum) {
dnodeResetEps(data); dnodeResetEps(data);
dnodeWriteEps(); dnodeWriteEps();
} else { } else {
int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
if (memcmp(tsEps.dnodeEps, data, size) != 0) { if (memcmp(tsConfig.dnodeEps, data, size) != 0) {
dnodeResetEps(data); dnodeResetEps(data);
dnodeWriteEps(); dnodeWriteEps();
} }
} }
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
} }
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
SDnodeEp *ep = taosHashGet(tsEps.dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *ep = taosHashGet(tsConfig.dnodeHash, &dnodeId, sizeof(int32_t));
if (ep != NULL) { if (ep != NULL) {
if (port) *port = ep->dnodePort; if (port) *port = ep->dnodePort;
if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
} }
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
} }
void dnodeUpdateCfg(SDnodeCfg *data) { void dnodeUpdateCfg(SDnodeCfg *data) {
if (tsEps.dnodeId != 0 && !data->dropped) return; if (tsConfig.dnodeId != 0 && !data->dropped) return;
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
tsEps.dnodeId = data->dnodeId; tsConfig.dnodeId = data->dnodeId;
tsEps.clusterId = data->clusterId; tsConfig.clusterId = data->clusterId;
tsEps.dropped = data->dropped; tsConfig.dropped = data->dropped;
dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId); dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId);
dnodeWriteEps(); dnodeWriteEps();
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
} }
int32_t dnodeGetDnodeId() { int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0; int32_t dnodeId = 0;
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
dnodeId = tsEps.dnodeId; dnodeId = tsConfig.dnodeId;
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
return dnodeId; return dnodeId;
} }
int64_t dnodeGetClusterId() { int64_t dnodeGetClusterId() {
int64_t clusterId = 0; int64_t clusterId = 0;
pthread_mutex_lock(&tsEps.mutex); pthread_mutex_lock(&tsConfig.mutex);
clusterId = tsEps.clusterId; clusterId = tsConfig.clusterId;
pthread_mutex_unlock(&tsEps.mutex); pthread_mutex_unlock(&tsConfig.mutex);
return clusterId; return clusterId;
} }
...@@ -171,7 +171,7 @@ int32_t dnodeInit() { ...@@ -171,7 +171,7 @@ int32_t dnodeInit() {
taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL); taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL);
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps); taosStepAdd(steps, "dnode-eps", dnodeInitConfig, dnodeCleanupConfig);
taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup); taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup); taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup);
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
......
...@@ -31,17 +31,17 @@ int main(int argc, char const *argv[]) { ...@@ -31,17 +31,17 @@ int main(int argc, char const *argv[]) {
int32_t code = dnodeInit(); int32_t code = dnodeInit();
if (code != 0) { if (code != 0) {
uInfo("Failed to start TDengine, please check the log at:%s", tsLogDir); dInfo("Failed to start TDengine, please check the log at:%s", tsLogDir);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
uInfo("Started TDengine service successfully."); dInfo("Started TDengine service successfully.");
while (!stop) { while (!stop) {
taosMsleep(100); taosMsleep(100);
} }
uInfo("TDengine is shut down!"); dInfo("TDengine is shut down!");
dnodeCleanup(); dnodeCleanup();
return 0; return 0;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeCheck.h"
#include "dnodeConfig.h"
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "mnode.h"
#include "sync.h"
#include "tcache.h"
#include "tconfig.h"
#include "tnote.h"
#include "tstep.h"
#include "vnode.h"
#include "wal.h"
static struct {
EDnStat runStatus;
SStartupStep startup;
SSteps *steps;
} tsDnode;
EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; }
void dnodeReportStartup(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 0;
}
static void dnodeReportStartupFinished(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 1;
}
void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsDnode.startup, sizeof(SStartupStep)); }
static int32_t dnodeInitVnode() {
return vnodeInit();
}
static int32_t dnodeInitMnode() {
SMnodePara para;
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
para.dnodeId = dnodeGetDnodeId();
para.clusterId = dnodeGetClusterId();
return mnodeInit(para);
}
static int32_t dnodeInitTfs() {}
static int32_t dnodeInitMain() {
tsDnode.runStatus = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
return taosCheckGlobalCfg();
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
static int32_t dnodeCheckRunning(char *dir) {
char filepath[256] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dir);
FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
if (fd < 0) {
dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
return -1;
}
int32_t ret = taosLockFile(fd);
if (ret != 0) {
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
taosCloseFile(fd);
return -1;
}
return 0;
}
static int32_t dnodeInitDir() {
sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
if (!taosMkDir(tsDnodeDir)) {
dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno));
return -1;
}
if (!taosMkDir(tsMnodeDir)) {
dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno));
return -1;
}
if (!taosMkDir(tsVnodeDir)) {
dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno));
return -1;
}
if (dnodeCheckRunning(tsDnodeDir) != 0) {
return -1;
}
return 0;
}
static void dnodeCleanupDir() {}
int32_t dnodeInit() {
SSteps *steps = taosStepInit(24, dnodeReportStartup);
if (steps == NULL) return -1;
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir);
taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck);
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL);
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps);
taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup);
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
taosStepAdd(steps, "dnode-msg", dnodeInitMsg, dnodeCleanupMsg);
tsDnode.steps = steps;
taosStepExec(tsDnode.steps);
dnodeSetRunStat(DN_RUN_STAT_RUNNING);
dnodeReportStartupFinished("TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully");
return 0;
}
void dnodeCleanup() {
if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) {
dnodeSetRunStat(DN_RUN_STAT_STOPPED);
taosStepCleanup(tsDnode.steps);
tsDnode.steps = NULL;
}
}
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg;
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg;
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg;
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg;
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg;
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg;
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeConfig.h"
#include "mnode.h"
#include "tthread.h"
#include "ttime.h"
#include "vnode.h"
static struct {
pthread_t *threadId;
bool stop;
uint32_t rebootTime;
} tsMsg;
static void dnodeSendStatusMsg() {
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
dError("failed to malloc status message");
return;
}
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(dnodeGetDnodeId());
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
pStatus->clusterId = htobe64(dnodeGetClusterId());
pStatus->lastReboot = htonl(tsMsg.rebootTime);
pStatus->numOfCores = htonl(tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
// fill cluster cfg parameters
pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
pStatus->clusterCfg.checkTime = 0;
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
// vnodeGetStatus(NULL, pStatus);
// contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
// pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS};
dnodeSendMsgToMnode(&rpcMsg);
}
void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
dTrace("status rsp is received, code:%s", tstrerror(pMsg->code));
if (pMsg->code != TSDB_CODE_SUCCESS) return;
SStatusRsp *pStatusRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->numOfDnodes = htonl(pCfg->numOfDnodes);
dnodeUpdateCfg(pCfg);
if (pCfg->dropped) {
dError("status rsp is received, and set dnode to drop status");
return;
}
// vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
SDnodeEps *eps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
eps->dnodeNum = htonl(eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; ++i) {
eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId);
eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort);
}
dnodeUpdateDnodeEps(eps);
}
static void *dnodeThreadRoutine(void *param) {
int32_t ms = tsStatusInterval * 1000;
while (!tsMsg.stop) {
taosMsleep(ms);
dnodeSendStatusMsg();
}
}
int32_t dnodeInitMsg() {
tsMsg.stop = false;
tsMsg.rebootTime = taosGetTimestampSec();
tsMsg.threadId = taosCreateThread(dnodeThreadRoutine, NULL);
if (tsMsg.threadId == NULL) {
return -1;
}
dInfo("dnode msg is initialized");
return 0;
}
void dnodeCleanupMsg() {
if (tsMsg.threadId != NULL) {
tsMsg.stop = true;
taosDestoryThread(tsMsg.threadId);
tsMsg.threadId = NULL;
}
dInfo("dnode msg is cleanuped");
}
static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->mnodeNum = htonl(pCfg->mnodeNum);
for (int32_t i = 0; i < pCfg->mnodeNum; ++i) {
pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId);
pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort);
}
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
if (mnodeGetStatus() == MN_STATUS_READY) return 0;
return mnodeDeploy();
}
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
int32_t code = dnodeStartMnode(pMsg);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont;
int32_t code = taosCfgDynamicOptions(pCfg->config);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
void dnodeProcessStartupReq(SRpcMsg *pMsg) {
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
dnodeGetStartup(pStep);
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here
*/
#define _DEFAULT_SOURCE
#include "dnodeTransport.h"
#include "dnodeConfig.h"
#include "dnodeDnode.h"
#include "mnode.h"
#include "vnode.h"
typedef void (*MsgFp)(SRpcMsg *pMsg);
static struct {
void *serverRpc;
void *clientRpc;
void *shellRpc;
MsgFp msgFp[TSDB_MSG_TYPE_MAX];
} tsTrans;
static void dnodeInitMsgFp() {
// msg from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
// msg from client to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessMsg;
// message from mnode to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = mnodeProcessMsg;
// message from dnode to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
}
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dnodeProcessStartupReq(pMsg);
return;
}
if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
return;
}
if (pMsg->pCont == NULL) {
rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
rpcSendResponse(&rspMsg);
return;
}
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
} else {
dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
}
static int32_t dnodeInitServer() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort;
rpcInit.label = "DND-S";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessPeerReq;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
tsTrans.serverRpc = rpcOpen(&rpcInit);
if (tsTrans.serverRpc == NULL) {
dError("failed to init peer rpc server");
return -1;
}
dInfo("dnode peer rpc server is initialized");
return 0;
}
static void dnodeCleanupServer() {
if (tsTrans.serverRpc) {
rpcClose(tsTrans.serverRpc);
tsTrans.serverRpc = NULL;
dInfo("dnode peer server is closed");
}
}
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
return;
}
if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEps(pEpSet);
}
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
} else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
}
rpcFreeCont(pMsg->pCont);
}
static int32_t dnodeInitClient() {
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessPeerRsp;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "t";
rpcInit.ckey = "key";
rpcInit.secret = secret;
tsTrans.clientRpc = rpcOpen(&rpcInit);
if (tsTrans.clientRpc == NULL) {
dError("failed to init peer rpc client");
return -1;
}
dInfo("dnode peer rpc client is initialized");
return 0;
}
static void dnodeCleanupClient() {
if (tsTrans.clientRpc) {
rpcClose(tsTrans.clientRpc);
tsTrans.clientRpc = NULL;
dInfo("dnode peer rpc client is closed");
}
}
static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_EXITING;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
} else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
}
if (pMsg->pCont == NULL) {
rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
rpcSendResponse(&rspMsg);
return;
}
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
} else {
dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg);
}
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
}
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code;
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user);
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t dnodeInitShell() {
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
}
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeShellPort;
rpcInit.label = "SHELL";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dnodeProcessShellReq;
rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = dnodeRetrieveUserAuthInfo;
tsTrans.shellRpc = rpcOpen(&rpcInit);
if (tsTrans.shellRpc == NULL) {
dError("failed to init shell rpc server");
return -1;
}
dInfo("dnode shell rpc server is initialized");
return 0;
}
static void dnodeCleanupShell() {
if (tsTrans.shellRpc) {
rpcClose(tsTrans.shellRpc);
tsTrans.shellRpc = NULL;
}
}
int32_t dnodeInitTrans() {
if (dnodeInitClient() != 0) {
return -1;
}
if (dnodeInitServer() != 0) {
return -1;
}
if (dnodeInitShell() != 0) {
return -1;
}
return 0;
}
void dnodeCleanupTrans() {
dnodeCleanupShell();
dnodeCleanupServer();
dnodeCleanupClient();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册