提交 9b7f04b0 编写于 作者: S Shengliang Guan

TD-1762

上级 cfdc8c15
...@@ -32,7 +32,11 @@ static void dnodePrintCfg(SDnodeCfg *cfg); ...@@ -32,7 +32,11 @@ static void dnodePrintCfg(SDnodeCfg *cfg);
int32_t dnodeInitCfg() { int32_t dnodeInitCfg() {
pthread_mutex_init(&tsCfgMutex, NULL); pthread_mutex_init(&tsCfgMutex, NULL);
dnodeResetCfg(NULL); dnodeResetCfg(NULL);
return dnodeReadCfg(); int32_t ret = dnodeReadCfg();
if (ret == 0) {
dInfo("dnode cfg is initialized");
}
return ret;
} }
void dnodeCleanupCfg() { pthread_mutex_destroy(&tsCfgMutex); } void dnodeCleanupCfg() { pthread_mutex_destroy(&tsCfgMutex); }
......
...@@ -262,8 +262,6 @@ int32_t dnodeInitCheck() { ...@@ -262,8 +262,6 @@ int32_t dnodeInitCheck() {
} }
} }
dInfo("dnode check is initialized");
return 0; return 0;
} }
...@@ -35,7 +35,11 @@ int32_t dnodeInitEps() { ...@@ -35,7 +35,11 @@ int32_t dnodeInitEps() {
pthread_mutex_init(&tsEpsMutex, NULL); pthread_mutex_init(&tsEpsMutex, NULL);
tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); tsEpsHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
dnodeResetEps(NULL); dnodeResetEps(NULL);
return dnodeReadEps(); int32_t ret = dnodeReadEps();
if (ret == 0) {
dInfo("dnode eps is initialized");
}
return ret;
} }
void dnodeCleanupEps() { void dnodeCleanupEps() {
......
...@@ -34,7 +34,12 @@ static int32_t dnodeWriteMInfos(); ...@@ -34,7 +34,12 @@ static int32_t dnodeWriteMInfos();
int32_t dnodeInitMInfos() { int32_t dnodeInitMInfos() {
pthread_mutex_init(&tsMInfosMutex, NULL); pthread_mutex_init(&tsMInfosMutex, NULL);
dnodeResetMInfos(NULL); dnodeResetMInfos(NULL);
return dnodeReadMInfos(); int32_t ret = dnodeReadMInfos();
if (ret == 0) {
dInfo("dnode minfos is initialized");
}
return ret;
} }
void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); } void dnodeCleanupMInfos() { pthread_mutex_destroy(&tsMInfosMutex); }
......
...@@ -58,7 +58,7 @@ int32_t dnodeInitMnodePeer() { ...@@ -58,7 +58,7 @@ int32_t dnodeInitMnodePeer() {
dDebug("dnode mpeer worker:%d is created", i); dDebug("dnode mpeer worker:%d is created", i);
} }
dDebug("dnode mpeer is opened, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset); dDebug("dnode mpeer is initialized, workers:%d qset:%p", tsMPeerPool.maxNum, tsMPeerQset);
return 0; return 0;
} }
......
...@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeRead() { ...@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeRead() {
dDebug("dnode mread worker:%d is created", i); dDebug("dnode mread worker:%d is created", i);
} }
dDebug("dnode mread is opened, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset); dDebug("dnode mread is initialized, workers:%d qset:%p", tsMReadPool.maxNum, tsMReadQset);
return 0; return 0;
} }
......
...@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeWrite() { ...@@ -60,7 +60,7 @@ int32_t dnodeInitMnodeWrite() {
dDebug("dnode mwrite worker:%d is created", i); dDebug("dnode mwrite worker:%d is created", i);
} }
dDebug("dnode mwrite is opened, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset); dDebug("dnode mwrite is initialized, workers:%d qset:%p", tsMWritePool.maxNum, tsMWriteQset);
return 0; return 0;
} }
......
...@@ -205,7 +205,7 @@ static int32_t dnodeInitStorage() { ...@@ -205,7 +205,7 @@ static int32_t dnodeInitStorage() {
dnodeCheckDataDirOpenned(tsDnodeDir); dnodeCheckDataDirOpenned(tsDnodeDir);
dInfo("storage directory is initialized"); dInfo("dnode storage is initialized at %s", tsDnodeDir);
return 0; return 0;
} }
......
...@@ -79,7 +79,6 @@ int32_t dnodeInitMgmt() { ...@@ -79,7 +79,6 @@ int32_t dnodeInitMgmt() {
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
tsRebootTime = taosGetTimestampSec(); tsRebootTime = taosGetTimestampSec();
int32_t code = vnodeInitResources(); int32_t code = vnodeInitResources();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt(); dnodeCleanupMgmt();
......
...@@ -114,6 +114,7 @@ int32_t dnodeInitModules() { ...@@ -114,6 +114,7 @@ int32_t dnodeInitModules() {
} }
} }
dInfo("dnode modules is initialized");
return 0; return 0;
} }
......
...@@ -72,7 +72,7 @@ int32_t dnodeInitServer() { ...@@ -72,7 +72,7 @@ int32_t dnodeInitServer() {
return -1; return -1;
} }
dInfo("inter-dnodes RPC server is opened"); dInfo("dnode inter-dnodes RPC server is initialized");
return 0; return 0;
} }
...@@ -137,7 +137,7 @@ int32_t dnodeInitClient() { ...@@ -137,7 +137,7 @@ int32_t dnodeInitClient() {
return -1; return -1;
} }
dInfo("inter-dnodes rpc client is opened"); dInfo("initialized inter-dnodes rpc client is initialized");
return 0; return 0;
} }
......
...@@ -97,7 +97,7 @@ int32_t dnodeInitShell() { ...@@ -97,7 +97,7 @@ int32_t dnodeInitShell() {
return -1; return -1;
} }
dInfo("shell rpc server is opened"); dInfo("dnode shell rpc server is initialized");
return 0; return 0;
} }
......
...@@ -299,6 +299,7 @@ int32_t dnodeInitTelemetry() { ...@@ -299,6 +299,7 @@ int32_t dnodeInitTelemetry() {
dTrace("failed to create telemetry thread, reason:%s", strerror(errno)); dTrace("failed to create telemetry thread, reason:%s", strerror(errno));
} }
dInfo("dnode telemetry is initialized");
return 0; return 0;
} }
......
...@@ -61,7 +61,7 @@ int32_t dnodeInitVnodeRead() { ...@@ -61,7 +61,7 @@ int32_t dnodeInitVnodeRead() {
pWorker->workerId = i; pWorker->workerId = i;
} }
dInfo("dnode read is opened, min worker:%d max worker:%d", readPool.min, readPool.max); dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max);
return 0; return 0;
} }
......
...@@ -69,7 +69,7 @@ int32_t dnodeInitVnodeWrite() { ...@@ -69,7 +69,7 @@ int32_t dnodeInitVnodeWrite() {
wWorkerPool.writeWorker[i].workerId = i; wWorkerPool.writeWorker[i].workerId = i;
} }
dInfo("dnode write is opened, max worker %d", wWorkerPool.max); dInfo("dnode write is initialized, max worker %d", wWorkerPool.max);
return 0; return 0;
} }
......
...@@ -440,9 +440,11 @@ static int32_t mnodeGetDnodeEpsSize() { ...@@ -440,9 +440,11 @@ static int32_t mnodeGetDnodeEpsSize() {
return size; return size;
} }
static void mnodeGetDnodeEpsData(SDnodeEps *pEps) { static void mnodeGetDnodeEpsData(SDnodeEps *pEps, int32_t epsSize) {
pthread_mutex_lock(&tsDnodeEpsMutex); pthread_mutex_lock(&tsDnodeEpsMutex);
memcpy(pEps, tsDnodeEps, tsDnodeEpsSize); if (epsSize == tsDnodeEpsSize) {
memcpy(pEps, tsDnodeEps, tsDnodeEpsSize);
}
pthread_mutex_unlock(&tsDnodeEpsMutex); pthread_mutex_unlock(&tsDnodeEpsMutex);
} }
...@@ -451,6 +453,7 @@ static void mnodeUpdateDnodeEps() { ...@@ -451,6 +453,7 @@ static void mnodeUpdateDnodeEps() {
int32_t totalDnodes = mnodeGetDnodesNum(); int32_t totalDnodes = mnodeGetDnodesNum();
tsDnodeEpsSize = sizeof(SDnodeEps) + totalDnodes * sizeof(SDnodeEp); tsDnodeEpsSize = sizeof(SDnodeEps) + totalDnodes * sizeof(SDnodeEp);
free(tsDnodeEps);
tsDnodeEps = calloc(1, tsDnodeEpsSize); tsDnodeEps = calloc(1, tsDnodeEpsSize);
tsDnodeEps->dnodeNum = htonl(totalDnodes); tsDnodeEps->dnodeNum = htonl(totalDnodes);
...@@ -534,7 +537,10 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -534,7 +537,10 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
} }
int32_t openVnodes = htons(pStatus->openVnodes); int32_t openVnodes = htons(pStatus->openVnodes);
int32_t contLen = sizeof(SDMStatusRsp) + openVnodes * sizeof(SDMVgroupAccess) + mnodeGetDnodeEpsSize(); int32_t epsSize = mnodeGetDnodeEpsSize();
int32_t vgAccessSize = openVnodes * sizeof(SDMVgroupAccess);
int32_t contLen = sizeof(SDMStatusRsp) + vgAccessSize + epsSize;
SDMStatusRsp *pRsp = rpcMallocCont(contLen); SDMStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) { if (pRsp == NULL) {
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
...@@ -596,8 +602,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -596,8 +602,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
SDnodeEps *pEps = (SDnodeEps *)((char *)pAccess + openVnodes * sizeof(SDMVgroupAccess)); SDnodeEps *pEps = (SDnodeEps *)((char *)pRsp + sizeof(SDMStatusRsp) + vgAccessSize);
mnodeGetDnodeEpsData(pEps); mnodeGetDnodeEpsData(pEps, epsSize);
pMsg->rpcRsp.len = contLen; pMsg->rpcRsp.len = contLen;
pMsg->rpcRsp.rsp = pRsp; pMsg->rpcRsp.rsp = pRsp;
......
...@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) { ...@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) {
tError("failed to malloc msg, size:%d", size); tError("failed to malloc msg, size:%d", size);
return NULL; return NULL;
} else { } else {
tTrace("malloc mem: %p", start); tTrace("malloc mem:%p size:%d", start, size);
} }
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
...@@ -1453,7 +1453,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1453,7 +1453,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead->msgLen = rpcMsgLenFromCont(origLen); pNewHead->msgLen = rpcMsgLenFromCont(origLen);
rpcFreeMsg(pHead); // free the compressed message buffer rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead; pHead = pNewHead;
tTrace("decomp malloc mem: %p", temp); tTrace("decomp malloc mem:%p", temp);
} else { } else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen); tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
} }
......
...@@ -437,12 +437,13 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -437,12 +437,13 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
} }
msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
buffer = malloc(msgLen + tsRpcOverhead); int32_t size = msgLen + tsRpcOverhead;
buffer = malloc(size);
if (NULL == buffer) { if (NULL == buffer) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} else { } else {
tTrace("TCP malloc mem: %p", buffer); tTrace("TCP malloc mem:%p size:%d", buffer, size);
} }
msg = buffer + tsRpcOverhead; msg = buffer + tsRpcOverhead;
......
...@@ -209,12 +209,13 @@ static void *taosRecvUdpData(void *param) { ...@@ -209,12 +209,13 @@ static void *taosRecvUdpData(void *param) {
continue; continue;
} }
char *tmsg = malloc(dataLen + tsRpcOverhead); int32_t size = dataLen + tsRpcOverhead;
char *tmsg = malloc(size);
if (NULL == tmsg) { if (NULL == tmsg) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue; continue;
} else { } else {
tTrace("UDP malloc mem: %p", tmsg); tTrace("UDP malloc mem:%p size:%d", tmsg, size);
} }
tmsg += tsRpcOverhead; // overhead for SRpcReqContext tmsg += tsRpcOverhead; // overhead for SRpcReqContext
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册