未验证 提交 352bf51f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11799 from taosdata/fix/tsim

fix(cluster): repair dmnodeTest
...@@ -53,8 +53,8 @@ int32_t taosProcRun(SProcObj *pProc); ...@@ -53,8 +53,8 @@ int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc); void taosProcStop(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, EProcFuncType ftype); void *handle, int64_t handleRef, EProcFuncType ftype);
void taosProcRemoveHandle(SProcObj *pProc, void *handle); int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle);
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)); void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle));
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype); EProcFuncType ftype);
......
...@@ -237,18 +237,20 @@ static int32_t taosLoadCfg(SConfig *pCfg, const char *inputCfgDir, const char *e ...@@ -237,18 +237,20 @@ static int32_t taosLoadCfg(SConfig *pCfg, const char *inputCfgDir, const char *e
char cfgFile[PATH_MAX + 100] = {0}; char cfgFile[PATH_MAX + 100] = {0};
taosExpandDir(inputCfgDir, cfgDir, PATH_MAX); taosExpandDir(inputCfgDir, cfgDir, PATH_MAX);
snprintf(cfgFile, sizeof(cfgFile), "%s" TD_DIRSEP "taos.cfg", cfgDir); if (taosIsDir(cfgDir)) {
snprintf(cfgFile, sizeof(cfgFile), "%s" TD_DIRSEP "taos.cfg", cfgDir);
} else {
tstrncpy(cfgFile, cfgDir, sizeof(cfgDir));
}
if (cfgLoad(pCfg, CFG_STYPE_APOLLO_URL, apolloUrl) != 0) { if (cfgLoad(pCfg, CFG_STYPE_APOLLO_URL, apolloUrl) != 0) {
uError("failed to load from apollo url:%s since %s", apolloUrl, terrstr()); uError("failed to load from apollo url:%s since %s", apolloUrl, terrstr());
return -1; return -1;
} }
if (cfgLoad(pCfg, CFG_STYPE_CFG_FILE, cfgDir) != 0) { if (cfgLoad(pCfg, CFG_STYPE_CFG_FILE, cfgFile) != 0) {
if (cfgLoad(pCfg, CFG_STYPE_CFG_FILE, cfgFile) != 0) { uError("failed to load from cfg file:%s since %s", cfgFile, terrstr());
uInfo("cfg file:%s not read since %s", cfgFile, terrstr()); return -1;
return 0;
}
} }
if (cfgLoad(pCfg, CFG_STYPE_ENV_FILE, envFile) != 0) { if (cfgLoad(pCfg, CFG_STYPE_ENV_FILE, envFile) != 0) {
......
...@@ -190,7 +190,7 @@ int main(int argc, char const *argv[]) { ...@@ -190,7 +190,7 @@ int main(int argc, char const *argv[]) {
} }
if (dmInitLog() != 0) { if (dmInitLog() != 0) {
printf("failed to start since init log error\n"); dError("failed to start since init log error");
return -1; return -1;
} }
......
...@@ -88,7 +88,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe ...@@ -88,7 +88,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType),
pRpc->handle, pMsg->user); pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
PROC_FUNC_REQ); pRpc->refId, PROC_FUNC_REQ);
} else { } else {
dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
ASSERT(1); ASSERT(1);
...@@ -356,7 +356,7 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t ...@@ -356,7 +356,7 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
break; break;
case PROC_FUNC_RSP: case PROC_FUNC_RSP:
taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
dmSendRpcRsp(pWrapper->pDnode, pMsg); dmSendRpcRsp(pWrapper->pDnode, pMsg);
break; break;
default: default:
......
...@@ -53,7 +53,7 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -53,7 +53,7 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (createReq.dnodeId != pDnode->data.dnodeId) { if (pDnode->data.dnodeId != 0 && createReq.dnodeId != pDnode->data.dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->data.dnodeId); dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->data.dnodeId);
return -1; return -1;
......
...@@ -134,9 +134,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -134,9 +134,9 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (alterReq.dnodeId != pDnode->data.dnodeId) { if (pDnode->data.dnodeId != 0 && alterReq.dnodeId != pDnode->data.dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to alter mnode since %s, dnodeId:%d input:%d", terrstr(), pDnode->data.dnodeId, alterReq.dnodeId); dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pDnode->data.dnodeId);
return -1; return -1;
} else { } else {
return mmAlter(pMgmt, &alterReq); return mmAlter(pMgmt, &alterReq);
......
...@@ -139,12 +139,6 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) { ...@@ -139,12 +139,6 @@ static int32_t cfgCheckAndSetDir(SConfigItem *pItem, const char *inputDir) {
return -1; return -1;
} }
if (taosRealPath(fullDir, NULL, PATH_MAX) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get realpath of dir:%s since %s", inputDir, terrstr());
return -1;
}
taosMemoryFreeClear(pItem->str); taosMemoryFreeClear(pItem->str);
pItem->str = strdup(fullDir); pItem->str = strdup(fullDir);
if (pItem->str == NULL) { if (pItem->str == NULL) {
...@@ -172,9 +166,8 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty ...@@ -172,9 +166,8 @@ static int32_t cfgSetBool(SConfigItem *pItem, const char *value, ECfgSrcType sty
static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) { static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int32_t ival = (int32_t)atoi(value); int32_t ival = (int32_t)atoi(value);
if (ival < pItem->imin || ival > pItem->imax) { if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "], use last src:%s value:%d", uError("cfg:%s, type:%s src:%s value:%d out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
cfgStypeStr(pItem->stype), pItem->i32);
terrno = TSDB_CODE_OUT_OF_RANGE; terrno = TSDB_CODE_OUT_OF_RANGE;
return -1; return -1;
} }
...@@ -187,10 +180,8 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st ...@@ -187,10 +180,8 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st
static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) { static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int64_t ival = (int64_t)atoi(value); int64_t ival = (int64_t)atoi(value);
if (ival < pItem->imin || ival > pItem->imax) { if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
"], use last src:%s value:%" PRId64, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
pItem->name, cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax,
cfgStypeStr(pItem->stype), pItem->i64);
terrno = TSDB_CODE_OUT_OF_RANGE; terrno = TSDB_CODE_OUT_OF_RANGE;
return -1; return -1;
} }
...@@ -203,9 +194,8 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st ...@@ -203,9 +194,8 @@ static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType st
static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) { static int32_t cfgSetFloat(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
float fval = (float)atof(value); float fval = (float)atof(value);
if (fval < pItem->fmin || fval > pItem->fmax) { if (fval < pItem->fmin || fval > pItem->fmax) {
uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f], use last src:%s value:%f", pItem->name, uError("cfg:%s, type:%s src:%s value:%f out of range[%f, %f]", pItem->name, cfgDtypeStr(pItem->dtype),
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax, cfgStypeStr(pItem->stype), cfgStypeStr(stype), fval, pItem->fmin, pItem->fmax);
pItem->fval);
terrno = TSDB_CODE_OUT_OF_RANGE; terrno = TSDB_CODE_OUT_OF_RANGE;
return -1; return -1;
} }
...@@ -219,8 +209,8 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s ...@@ -219,8 +209,8 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s
char *tmp = strdup(value); char *tmp = strdup(value);
if (tmp == NULL) { if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name, uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str); cfgStypeStr(stype), value, terrstr());
return -1; return -1;
} }
...@@ -232,9 +222,8 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s ...@@ -232,9 +222,8 @@ static int32_t cfgSetString(SConfigItem *pItem, const char *value, ECfgSrcType s
static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType stype) { static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetDir(pItem, value) != 0) { if (cfgCheckAndSetDir(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name, cfgStypeStr(stype), value, terrstr());
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str);
return -1; return -1;
} }
...@@ -245,8 +234,8 @@ static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType styp ...@@ -245,8 +234,8 @@ static int32_t cfgSetDir(SConfigItem *pItem, const char *value, ECfgSrcType styp
static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType stype) { static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetLocale(pItem, value) != 0) { if (cfgCheckAndSetLocale(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name, uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str); cfgStypeStr(stype), value, terrstr());
return -1; return -1;
} }
...@@ -257,8 +246,8 @@ static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType s ...@@ -257,8 +246,8 @@ static int32_t cfgSetLocale(SConfigItem *pItem, const char *value, ECfgSrcType s
static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType stype) { static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetCharset(pItem, value) != 0) { if (cfgCheckAndSetCharset(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name, uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str); cfgStypeStr(stype), value, terrstr());
return -1; return -1;
} }
...@@ -269,8 +258,8 @@ static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType ...@@ -269,8 +258,8 @@ static int32_t cfgSetCharset(SConfigItem *pItem, const char *value, ECfgSrcType
static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType stype) { static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
if (cfgCheckAndSetTimezone(pItem, value) != 0) { if (cfgCheckAndSetTimezone(pItem, value) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s, use last src:%s value:%s", pItem->name, uError("cfg:%s, type:%s src:%s value:%s failed to dup since %s", pItem->name, cfgDtypeStr(pItem->dtype),
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), value, terrstr(), cfgStypeStr(pItem->stype), pItem->str); cfgStypeStr(stype), value, terrstr());
return -1; return -1;
} }
...@@ -606,15 +595,19 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { ...@@ -606,15 +595,19 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
char *line = NULL, *name, *value, *value2, *value3; char *line = NULL, *name, *value, *value2, *value3;
int32_t olen, vlen, vlen2, vlen3; int32_t olen, vlen, vlen2, vlen3;
ssize_t _bytes = 0; ssize_t _bytes = 0;
int32_t code = 0;
if (taosIsDir(filepath)) {
return -1;
}
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM); TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); // success when the file does not exist
return -1; if (errno == ENOENT) {
terrno = TAOS_SYSTEM_ERROR(errno);
uInfo("failed to load from cfg file %s since %s, use default parameters", filepath, terrstr());
return 0;
} else {
uError("failed to load from cfg file %s since %s", filepath, terrstr());
return -1;
}
} }
while (!taosEOFFile(pFile)) { while (!taosEOFFile(pFile)) {
...@@ -643,17 +636,24 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { ...@@ -643,17 +636,24 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
if (vlen3 != 0) value3[vlen3] = 0; if (vlen3 != 0) value3[vlen3] = 0;
} }
cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE); code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) { if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE); code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
} }
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
if (line != NULL) taosMemoryFreeClear(line); if (line != NULL) taosMemoryFreeClear(line);
uInfo("load from cfg file %s success", filepath); if (code == 0 || (code != 0 && terrno == TSDB_CODE_CFG_NOT_FOUND)) {
return 0; uInfo("load from cfg file %s success", filepath);
return 0;
} else {
uError("failed to load from cfg file %s since %s", filepath, terrstr());
return -1;
}
} }
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
......
...@@ -137,24 +137,23 @@ static void taosStopLog() { ...@@ -137,24 +137,23 @@ static void taosStopLog() {
} }
} }
static void taosLogBuffDestroy() {
taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex);
taosCloseFile(&tsLogObj.logHandle->pFile);
taosMemoryFreeClear(tsLogObj.logHandle->buffer);
memset(&tsLogObj.logHandle->buffer, 0, sizeof(tsLogObj.logHandle->buffer));
taosThreadMutexDestroy(&tsLogObj.logMutex);
taosMemoryFreeClear(tsLogObj.logHandle);
memset(&tsLogObj.logHandle, 0, sizeof(tsLogObj.logHandle));
tsLogObj.logHandle = NULL;
}
void taosCloseLog() { void taosCloseLog() {
taosStopLog(); if (tsLogObj.logHandle != NULL) {
if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { taosStopLog();
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL); if (tsLogObj.logHandle != NULL && taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) {
taosThreadJoin(tsLogObj.logHandle->asyncThread, NULL);
}
tsLogInited = 0;
taosThreadMutexDestroy(&tsLogObj.logHandle->buffMutex);
taosCloseFile(&tsLogObj.logHandle->pFile);
taosMemoryFreeClear(tsLogObj.logHandle->buffer);
memset(&tsLogObj.logHandle->buffer, 0, sizeof(tsLogObj.logHandle->buffer));
taosThreadMutexDestroy(&tsLogObj.logMutex);
taosMemoryFreeClear(tsLogObj.logHandle);
memset(&tsLogObj.logHandle, 0, sizeof(tsLogObj.logHandle));
tsLogObj.logHandle = NULL;
} }
tsLogInited = 0;
taosLogBuffDestroy(tsLogObj.logHandle);
} }
static bool taosLockLogFile(TdFilePtr pFile) { static bool taosLockLogFile(TdFilePtr pFile) {
......
...@@ -154,7 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -154,7 +154,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
} }
static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen,
const char *pBody, int32_t rawBodyLen, int64_t handle, EProcFuncType ftype) { const char *pBody, int32_t rawBodyLen, int64_t handle, int64_t handleRef,
EProcFuncType ftype) {
if (rawHeadLen == 0 || pHead == NULL) { if (rawHeadLen == 0 || pHead == NULL) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return -1; return -1;
...@@ -172,7 +173,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char ...@@ -172,7 +173,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char
} }
if (handle != 0 && ftype == PROC_FUNC_REQ) { if (handle != 0 && ftype == PROC_FUNC_REQ) {
if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) { if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handleRef, sizeof(int64_t)) != 0) {
taosThreadMutexUnlock(&pQueue->mutex); taosThreadMutexUnlock(&pQueue->mutex);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -286,13 +287,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -286,13 +287,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
pQueue->head = headLen + bodyLen; pQueue->head = headLen + bodyLen;
} else if (remain < 8 + headLen) { } else if (remain < 8 + headLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, remain - 8);
memcpy((char*)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); memcpy((char *)pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen); memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
pQueue->head = headLen - (remain - 8) + bodyLen; pQueue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + headLen + bodyLen) { } else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
memcpy((char*)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); memcpy((char *)pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
pQueue->head = bodyLen - (remain - 8 - headLen); pQueue->head = bodyLen - (remain - 8 - headLen);
} else { } else {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
...@@ -454,19 +455,25 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -454,19 +455,25 @@ void taosProcCleanup(SProcObj *pProc) {
} }
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
void *handle, EProcFuncType ftype) { void *handle, int64_t handleRef, EProcFuncType ftype) {
if (ftype != PROC_FUNC_REQ) { if (ftype != PROC_FUNC_REQ) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return -1; return -1;
} }
return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, ftype); return taosProcQueuePush(pProc, pProc->pChildQueue, pHead, headLen, pBody, bodyLen, (int64_t)handle, handleRef,
ftype);
} }
void taosProcRemoveHandle(SProcObj *pProc, void *handle) { int64_t taosProcRemoveHandle(SProcObj *pProc, void *handle) {
int64_t h = (int64_t)handle; int64_t h = (int64_t)handle;
taosThreadMutexLock(&pProc->pChildQueue->mutex); taosThreadMutexLock(&pProc->pChildQueue->mutex);
int64_t *handleRef = taosHashGet(pProc->hash, &h, sizeof(int64_t));
taosHashRemove(pProc->hash, &h, sizeof(int64_t)); taosHashRemove(pProc->hash, &h, sizeof(int64_t));
taosThreadMutexUnlock(&pProc->pChildQueue->mutex); taosThreadMutexUnlock(&pProc->pChildQueue->mutex);
if (handleRef == NULL) return 0;
return *handleRef;
} }
void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
...@@ -484,7 +491,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { ...@@ -484,7 +491,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) {
void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
EProcFuncType ftype) { EProcFuncType ftype) {
int32_t retry = 0; int32_t retry = 0;
while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, 0, ftype) != 0) {
uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry);
retry++; retry++;
taosMsleep(retry); taosMsleep(retry);
......
...@@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { ...@@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) {
SProcObj *cproc = taosProcInit(&cfg); SProcObj *cproc = taosProcInit(&cfg);
ASSERT_NE(cproc, nullptr); ASSERT_NE(cproc, nullptr);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RSP), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RSP), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REGIST), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REGIST), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RELEASE), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_RELEASE), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, 0, PROC_FUNC_REQ), 0);
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, 0, PROC_FUNC_REQ), 0);
for (int32_t j = 0; j < 1000; j++) { for (int32_t j = 0; j < 1000; j++) {
int32_t i = 0; int32_t i = 0;
for (i = 0; i < 20; ++i) { for (i = 0; i < 20; ++i) {
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
} }
ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, 0, PROC_FUNC_REQ), 0);
cfg.isChild = true; cfg.isChild = true;
cfg.name = "1235_p"; cfg.name = "1235_p";
...@@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) { ...@@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) {
int32_t i = 0; int32_t i = 0;
for (i = 0; i < 20; ++i) { for (i = 0; i < 20; ++i) {
head.handle = (void *)((int64_t)i); head.handle = (void *)((int64_t)i);
ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_FUNC_REQ), 0); ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), i, PROC_FUNC_REQ), 0);
} }
cfg.isChild = true; cfg.isChild = true;
...@@ -246,9 +246,14 @@ TEST_F(UtilTesProc, 03_Handle) { ...@@ -246,9 +246,14 @@ TEST_F(UtilTesProc, 03_Handle) {
taosProcRun(pproc); taosProcRun(pproc);
taosProcCleanup(pproc); taosProcCleanup(pproc);
taosProcRemoveHandle(cproc, (void *)((int64_t)3)); int64_t ref = 0;
taosProcRemoveHandle(cproc, (void *)((int64_t)5));
taosProcRemoveHandle(cproc, (void *)((int64_t)6)); ref = taosProcRemoveHandle(cproc, (void *)((int64_t)3));
EXPECT_EQ(ref, 3);
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)5));
EXPECT_EQ(ref, 5);
ref = taosProcRemoveHandle(cproc, (void *)((int64_t)6));
EXPECT_EQ(ref, 6);
taosProcCloseHandles(cproc, processHandle); taosProcCloseHandles(cproc, processHandle);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册