diff --git a/cmake/cmake.define b/cmake/cmake.define index 3fd96fd9b69daa7bbf24ffb4f47f4f54400728b6..094eb4a2dab07a484504d4a4fe8175b4a8eb269e 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -66,7 +66,7 @@ ENDIF () IF (TD_WINDOWS) MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}") SET(COMMON_FLAGS "/W3 /D_WIN32") - + SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO") # IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900)) # SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18") # ENDIF () diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index ca1b943fb22b478533036236352cb9907ae1ac8d..32205b337c978dfa20f3afa0a8455c22977d3ce2 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -241,7 +241,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { strncpy(path, tsProcPath, strlen(tsProcPath)); taosDirName(path); } +#ifdef WINDOWS + strcat(path, "udfd.exe"); +#else strcat(path, "/udfd"); +#endif char* argsUdfd[] = {path, "-c", configDir, NULL}; options.args = argsUdfd; options.file = path; diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index 087892e7413f291312fa2dd162a73d1e1d044f12..2e8ad982d87cf1d94aca8ad59b8fe26fdf5862ae 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -41,6 +41,8 @@ #include "monitor.h" #include "sync.h" +#include "libs/function/function.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/dnode/mgmt/interface/src/dmEnv.c b/source/dnode/mgmt/interface/src/dmEnv.c index 92a6e018cb336195f6fa7b3b130b7a8307a97101..2c836714cea3fd22c2c3fb647b74b00998a0ebbf 100644 --- a/source/dnode/mgmt/interface/src/dmEnv.c +++ b/source/dnode/mgmt/interface/src/dmEnv.c @@ -55,6 +55,7 @@ void dmCleanup() { monCleanup(); syncCleanUp(); walCleanUp(); + udfcClose(); taosStopCacheRefreshWorker(); dInfo("dnode env is cleaned up"); } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index fa5e1d78a0b8227a3886a13633359f755171703b..51b38604613d60fc817bbb1a91926a5eb9aca2cc 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -29,15 +29,15 @@ typedef struct SVnodesMgmt { SHashObj *hash; SRWLatch latch; SVnodesStat state; + const char *path; + SDnode *pDnode; + SMgmtWrapper *pWrapper; STfs *pTfs; SQWorkerPool queryPool; SQWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; SWWorkerPool mergePool; - const char *path; - SDnode *pDnode; - SMgmtWrapper *pWrapper; SSingleWorker mgmtWorker; SSingleWorker monitorWorker; } SVnodesMgmt; @@ -95,9 +95,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); // vmFile.c -int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); -int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt); -SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes); +int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); +int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt); +SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes); // vmWorker.c int32_t vmStartWorker(SVnodesMgmt *pMgmt); @@ -105,11 +105,12 @@ void vmStopWorker(SVnodesMgmt *pMgmt); int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); -int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration -int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); +int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 7e00a022b2dbf755bd95d089f31cd7d7211f137c..f251dd120e8e690c64045cbdd1b88ebb32f927e0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { +SVnodeObj **vmGetVnodeListFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { taosRLockLatch(&pMgmt->latch); int32_t num = 0; @@ -44,14 +44,14 @@ SVnodeObj **vmGetVnodesFromHash(SVnodesMgmt *pMgmt, int32_t *numOfVnodes) { return pVnodes; } -int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { +int32_t vmGetVnodeListFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 30000; char *content = taosMemoryCalloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; - char file[PATH_MAX]; + char file[PATH_MAX] = {0}; SWrapperCfg *pCfgs = NULL; TdFilePtr pFile = NULL; @@ -61,26 +61,26 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n if (pFile == NULL) { dDebug("file %s not exist", file); code = 0; - goto PRASE_VNODE_OVER; + goto _OVER; } len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); - goto PRASE_VNODE_OVER; + goto _OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", file); - goto PRASE_VNODE_OVER; + goto _OVER; } cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes"); if (!vnodes || vnodes->type != cJSON_Array) { dError("failed to read %s since vnodes not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } int32_t vnodesNum = cJSON_GetArraySize(vnodes); @@ -88,7 +88,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n pCfgs = taosMemoryCalloc(vnodesNum, sizeof(SWrapperCfg)); if (pCfgs == NULL) { dError("failed to read %s since out of memory", file); - goto PRASE_VNODE_OVER; + goto _OVER; } for (int32_t i = 0; i < vnodesNum; ++i) { @@ -98,7 +98,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); if (!vgId || vgId->type != cJSON_Number) { dError("failed to read %s since vgId not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->vgId = vgId->valueint; snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId); @@ -106,28 +106,28 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped"); if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->dropped = dropped->valueint; cJSON *vgVersion = cJSON_GetObjectItem(vnode, "vgVersion"); if (!vgVersion || vgVersion->type != cJSON_Number) { dError("failed to read %s since vgVersion not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->vgVersion = vgVersion->valueint; cJSON *dbUid = cJSON_GetObjectItem(vnode, "dbUid"); if (!dbUid || dbUid->type != cJSON_String) { dError("failed to read %s since dbUid not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } pCfg->dbUid = atoll(dbUid->valuestring); cJSON *db = cJSON_GetObjectItem(vnode, "db"); if (!db || db->type != cJSON_String) { dError("failed to read %s since db not found", file); - goto PRASE_VNODE_OVER; + goto _OVER; } tstrncpy(pCfg->db, db->valuestring, TSDB_DB_FNAME_LEN); } @@ -139,7 +139,7 @@ int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *n code = 0; dInfo("succcessed to read file %s", file); -PRASE_VNODE_OVER: +_OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); @@ -148,7 +148,7 @@ PRASE_VNODE_OVER: return code; } -int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) { +int32_t vmWriteVnodeListToFile(SVnodesMgmt *pMgmt) { char file[PATH_MAX]; char realfile[PATH_MAX]; snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); @@ -162,7 +162,7 @@ int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) { } int32_t numOfVnodes = 0; - SVnodeObj **pVnodes = vmGetVnodesFromHash(pMgmt, &numOfVnodes); + SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); int32_t len = 0; int32_t maxLen = 65536; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7fcfd5133dc3ce07405d6ebfe9cd04a9f8241321..b19602f3ffb5edd449c20b6bf28c8c48607d059e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -16,12 +16,37 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + + pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); + if (pInfo->pVloads == NULL) return; + + taosRLockLatch(&pMgmt->latch); + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + SVnodeLoad vload = {0}; + vnodeGetLoad(pVnode->pImpl, &vload); + taosArrayPush(pInfo->pVloads, &vload); + pIter = taosHashIterate(pMgmt->hash, pIter); + } + + taosRUnLockLatch(&pMgmt->latch); +} + void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SMonVloadInfo vloads = {0}; vmGetVnodeLoads(pWrapper, &vloads); - if (vloads.pVloads == NULL) return; + + SArray *pVloads = vloads.pVloads; + if (pVloads == NULL) return; int32_t totalVnodes = 0; int32_t masterNum = 0; @@ -31,8 +56,8 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { int64_t numOfBatchInsertReqs = 0; int64_t numOfBatchInsertSuccessReqs = 0; - for (int32_t i = 0; i < taosArrayGetSize(vloads.pVloads); ++i) { - SVnodeLoad *pLoad = taosArrayGet(vloads.pVloads, i); + for (int32_t i = 0; i < taosArrayGetSize(pVloads); ++i) { + SVnodeLoad *pLoad = taosArrayGet(pVloads, i); numOfSelectReqs += pLoad->numOfSelectReqs; numOfInsertReqs += pLoad->numOfInsertReqs; numOfInsertSuccessReqs += pLoad->numOfInsertSuccessReqs; @@ -49,10 +74,16 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *pInfo) { pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs; pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs; pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; - pMgmt->state = pInfo->vstat; + pMgmt->state.totalVnodes = totalVnodes; + pMgmt->state.masterNum = masterNum; + pMgmt->state.numOfSelectReqs = numOfSelectReqs; + pMgmt->state.numOfInsertReqs = numOfInsertReqs; + pMgmt->state.numOfInsertSuccessReqs = numOfInsertSuccessReqs; + pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs; + pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs); - taosArrayDestroy(vloads.pVloads); + taosArrayDestroy(pVloads); } int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { @@ -107,12 +138,13 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { memcpy(pCfg, &vnodeCfgDefault, sizeof(SVnodeCfg)); pCfg->vgId = pCreate->vgId; - strcpy(pCfg->dbname, pCreate->db); + tstrncpy(pCfg->dbname, pCreate->db, sizeof(pCfg->dbname)); + pCfg->dbId = pCreate->dbUid; pCfg->isWeak = true; pCfg->tsdbCfg.days = 10; - pCfg->tsdbCfg.keep2 = 3650; pCfg->tsdbCfg.keep0 = 3650; pCfg->tsdbCfg.keep1 = 3650; + pCfg->tsdbCfg.keep2 = 3650; for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) { memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention)); } @@ -121,30 +153,30 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->hashEnd = pCreate->hashEnd; pCfg->hashMethod = pCreate->hashMethod; - // sync integration pCfg->syncCfg.myIndex = pCreate->selfIndex; pCfg->syncCfg.replicaNum = pCreate->replica; - memset(&(pCfg->syncCfg.nodeInfo), 0, sizeof(pCfg->syncCfg.nodeInfo)); + memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo)); for (int i = 0; i < pCreate->replica; ++i) { - (pCfg->syncCfg.nodeInfo)[i].nodePort = (pCreate->replicas)[i].port; - snprintf((pCfg->syncCfg.nodeInfo)[i].nodeFqdn, sizeof((pCfg->syncCfg.nodeInfo)[i].nodeFqdn), "%s", - (pCreate->replicas)[i].fqdn); + pCfg->syncCfg.nodeInfo[i].nodePort = pCreate->replicas[i].port; + snprintf(pCfg->syncCfg.nodeInfo[i].nodeFqdn, sizeof(pCfg->syncCfg.nodeInfo[i].nodeFqdn), "%s", + pCreate->replicas[i].fqdn); } } static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) { - memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); - pCfg->dbUid = pCreate->dbUid; - pCfg->dropped = 0; - snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); pCfg->vgId = pCreate->vgId; pCfg->vgVersion = pCreate->vgVersion; + pCfg->dropped = 0; + pCfg->dbUid = pCreate->dbUid; + tstrncpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); + snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); } int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SCreateVnodeReq createReq = {0}; - char path[TSDB_FILENAME_LEN]; + int32_t code = -1; + char path[TSDB_FILENAME_LEN] = {0}; if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -161,14 +193,13 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId); if (pVnode != NULL) { - tFreeSCreateVnodeReq(&createReq); dDebug("vgId:%d, already exist", createReq.vgId); + tFreeSCreateVnodeReq(&createReq); vmReleaseVnode(pMgmt, pVnode); terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; return -1; } - // create vnode snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { tFreeSCreateVnodeReq(&createReq); @@ -179,49 +210,43 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; + msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; + msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; - msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; - msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; // sync integration + msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue; msgCb.qsizeFp = vmGetQueueSize; SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); - tFreeSCreateVnodeReq(&createReq); - return -1; + goto _OVER; } - int32_t code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl); + code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl); if (code != 0) { - tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); - vnodeClose(pImpl); - vnodeDestroy(path, pMgmt->pTfs); - terrno = code; - return code; + goto _OVER; } code = vnodeStart(pImpl); if (code != 0) { - tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr()); - vnodeClose(pImpl); - vnodeDestroy(path, pMgmt->pTfs); - terrno = code; - return code; + goto _OVER; } - code = vmWriteVnodesToFile(pMgmt); + code = vmWriteVnodeListToFile(pMgmt); + if (code != 0) goto _OVER; + +_OVER: if (code != 0) { - tFreeSCreateVnodeReq(&createReq); vnodeClose(pImpl); vnodeDestroy(path, pMgmt->pTfs); - terrno = code; - return code; } - return 0; + tFreeSCreateVnodeReq(&createReq); + terrno = code; + return code; } int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { @@ -243,14 +268,14 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } pVnode->dropped = 1; - if (vmWriteVnodesToFile(pMgmt) != 0) { + if (vmWriteVnodeListToFile(pMgmt) != 0) { pVnode->dropped = 0; vmReleaseVnode(pMgmt, pVnode); return -1; } vmCloseVnode(pMgmt, pVnode); - vmWriteVnodesToFile(pMgmt); + vmWriteVnodeListToFile(pMgmt); return 0; } @@ -287,7 +312,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, DEFAULT_HANDLE); @@ -300,14 +325,13 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) { dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE); - // sync integration - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); - dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, vmProcessSyncMsg, DEFAULT_HANDLE); + dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, vmProcessSyncMsg, DEFAULT_HANDLE); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 17c7563885becbdba4a589664fe9467bea3c7b53..ab4174857a13f9aa4de629ba1d186e79bb6486fc 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -15,7 +15,6 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -#include "libs/function/function.h" SVnodeObj *vmAcquireVnode(SVnodesMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; @@ -55,14 +54,14 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->vgId = pCfg->vgId; pVnode->refCount = 0; + pVnode->vgVersion = pCfg->vgVersion; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; - pVnode->pWrapper = pMgmt->pWrapper; - pVnode->pImpl = pImpl; - pVnode->vgVersion = pCfg->vgVersion; pVnode->dbUid = pCfg->dbUid; pVnode->db = tstrdup(pCfg->db); pVnode->path = tstrdup(pCfg->path); + pVnode->pImpl = pImpl; + pVnode->pWrapper = pMgmt->pWrapper; if (pVnode->path == NULL || pVnode->db == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -78,14 +77,11 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWUnLockLatch(&pMgmt->latch); - if (code != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - } return code; } void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { - char path[TSDB_FILENAME_LEN]; + char path[TSDB_FILENAME_LEN] = {0}; taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); @@ -98,6 +94,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pMergeQ)) taosMsleep(10); vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); @@ -116,7 +113,7 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { taosMemoryFree(pVnode); } -static void *vmOpenVnodeFunc(void *param) { +static void *vmOpenVnodeInThread(void *param) { SVnodeThread *pThread = param; SVnodesMgmt *pMgmt = pThread->pMgmt; SDnode *pDnode = pMgmt->pDnode; @@ -136,10 +133,11 @@ static void *vmOpenVnodeFunc(void *param) { SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[WRITE_QUEUE] = vmPutMsgToWriteQueue; + msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; + msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; - msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; - msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; // sync integration + msgCb.queueFps[MERGE_QUEUE] = vmPutMsgToMergeQueue; msgCb.qsizeFp = vmGetQueueSize; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); @@ -148,12 +146,10 @@ static void *vmOpenVnodeFunc(void *param) { pThread->failed++; } else { vmOpenVnode(pMgmt, pCfg, pImpl); - // vnodeStart(pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; + atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); } - - atomic_add_fetch_32(&pMgmt->state.openVnodes, 1); } dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, @@ -163,29 +159,24 @@ static void *vmOpenVnodeFunc(void *param) { static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) { SDnode *pDnode = pMgmt->pDnode; - taosInitRWLatch(&pMgmt->latch); pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->hash == NULL) { - dError("failed to init vnode hash"); terrno = TSDB_CODE_OUT_OF_MEMORY; + dError("failed to init vnode hash since %s", terrstr()); return -1; } SWrapperCfg *pCfgs = NULL; int32_t numOfVnodes = 0; - if (vmGetVnodesFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) { + if (vmGetVnodeListFromFile(pMgmt, &pCfgs, &numOfVnodes) != 0) { dInfo("failed to get vnode list from disk since %s", terrstr()); return -1; } pMgmt->state.totalVnodes = numOfVnodes; -#if 0 - int32_t threadNum = tsNumOfCores; -#else - int32_t threadNum = 1; -#endif + int32_t threadNum = 1; // tsNumOfCores; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); @@ -210,7 +201,7 @@ static int32_t vmOpenVnodes(SVnodesMgmt *pMgmt) { TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeFunc, pThread) != 0) { + if (taosThreadCreate(&pThread->thread, &thAttr, vmOpenVnodeInThread, pThread) != 0) { dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno)); } @@ -240,7 +231,7 @@ static void vmCloseVnodes(SVnodesMgmt *pMgmt) { dInfo("start to close all vnodes"); int32_t numOfVnodes = 0; - SVnodeObj **pVnodes = vmGetVnodesFromHash(pMgmt, &numOfVnodes); + SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { vmCloseVnode(pMgmt, pVnodes[i]); @@ -267,12 +258,9 @@ static void vmCleanup(SMgmtWrapper *pWrapper) { vmStopWorker(pMgmt); vnodeCleanup(); tfsClose(pMgmt->pTfs); - // walCleanUp(); taosMemoryFree(pMgmt); pWrapper->pMgmt = NULL; - // syncCleanUp(); - udfcClose(); dInfo("vnode-mgmt is cleaned up"); } @@ -313,7 +301,6 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "vnode-wal", "initialized"); - // sync integration if (syncInit() != 0) { dError("failed to open sync since %s", terrstr()); return -1; @@ -381,23 +368,7 @@ static int32_t vmStart(SMgmtWrapper *pWrapper) { } static void vmStop(SMgmtWrapper *pWrapper) { -#if 0 - dDebug("vnode-mgmt start to stop"); - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - taosRLockLatch(&pMgmt->latch); - - void *pIter = taosHashIterate(pMgmt->hash, NULL); - while (pIter) { - SVnodeObj **ppVnode = pIter; - if (ppVnode == NULL || *ppVnode == NULL) continue; - - SVnodeObj *pVnode = *ppVnode; - vnodeStop(pVnode->pImpl); - pIter = taosHashIterate(pMgmt->hash, pIter); - } - - taosRUnLockLatch(&pMgmt->latch); -#endif + // process inside the vnode } void vmSetMgmtFp(SMgmtWrapper *pWrapper) { @@ -413,25 +384,3 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) { pWrapper->fp = mgmtFp; } -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - - pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); - if (pInfo->pVloads == NULL) return; - - taosRLockLatch(&pMgmt->latch); - - void *pIter = taosHashIterate(pMgmt->hash, NULL); - while (pIter) { - SVnodeObj **ppVnode = pIter; - if (ppVnode == NULL || *ppVnode == NULL) continue; - - SVnodeObj *pVnode = *ppVnode; - SVnodeLoad vload = {0}; - vnodeGetLoad(pVnode->pImpl, &vload); - taosArrayPush(pInfo->pVloads, &vload); - pIter = taosHashIterate(pMgmt->hash, pIter); - } - - taosRUnLockLatch(&pMgmt->latch); -} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 06e24cb48aef945d5c893ad12cd9e575b744b6b5..11afe3333533165416003e6fed46cba9cd6cc399 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -14,28 +14,29 @@ */ #define _DEFAULT_SOURCE - #include "vmInt.h" #include "sync.h" #include "syncTools.h" -static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; +static inline void vmSendRsp(SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } -static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SVnodesMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in vnode-m queue", pMsg); + dTrace("msg:%p, will be processed in vnode-mgmt/monitor queue", pMsg); switch (msgType) { case TDMT_MON_VM_INFO: @@ -52,12 +53,12 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dError("msg:%p, not processed in vnode-mgmt queue", pMsg); + dError("msg:%p, not processed in vnode-mgmt/monitor queue", pMsg); } if (msgType & 1u) { if (code != 0 && terrno != 0) code = terrno; - vmSendRsp(pMgmt->pWrapper, pMsg, code); + vmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -71,7 +72,9 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed in vnode-query queue", pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { - vmSendRsp(pVnode->pWrapper, pMsg, code); + if (terrno != 0) code = terrno; + vmSendRsp(pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); @@ -84,7 +87,9 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); if (code != 0) { - vmSendRsp(pVnode->pWrapper, pMsg, code); + if (terrno != 0) code = terrno; + vmSendRsp(pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); @@ -108,32 +113,10 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO dTrace("msg:%p, will be processed in vnode-write queue", pMsg); if (taosArrayPush(pArray, &pMsg) == NULL) { dTrace("msg:%p, failed to process since %s", pMsg, terrstr()); - vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY); + vmSendRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); } } -#if 0 - int64_t version; - - vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version); - - numOfMsgs = taosArrayGetSize(pArray); - for (int32_t i = 0; i < numOfMsgs; i++) { - SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); - SRpcMsg *pRpc = &pMsg->rpcMsg; - - rsp.pCont = NULL; - rsp.contLen = 0; - rsp.code = 0; - rsp.handle = pRpc->handle; - rsp.ahandle = pRpc->ahandle; - rsp.refId = pRpc->refId; - - int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp); - tmsgSendRsp(&rsp); - } -#else - // sync integration response for (int i = 0; i < taosArrayGetSize(pArray); i++) { SNodeMsg *pMsg; SRpcMsg *pRpc; @@ -168,7 +151,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO assert(0); } } -#endif for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); @@ -186,9 +168,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SRpcMsg rsp; for (int32_t i = 0; i < numOfMsgs; ++i) { -#if 1 - // sync integration - taosGetQitem(qall, (void **)&pMsg); // init response rpc msg @@ -219,7 +198,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.refId = pMsg->rpcMsg.refId; tmsgSendRsp(&rsp); } -#endif } } @@ -246,7 +224,9 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO dTrace("msg:%p, will be processed in vnode-merge queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); if (code != 0) { - vmSendRsp(pVnode->pWrapper, pMsg, code); + if (terrno != 0) code = terrno; + vmSendRsp(pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); @@ -257,16 +237,17 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { SRpcMsg *pRpc = &pMsg->rpcMsg; SMsgHead *pHead = pRpc->pCont; + int32_t code = 0; + pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr()); - return terrno; + return terrno != 0 ? terrno : -1; } - int32_t code = 0; switch (qtype) { case QUERY_QUEUE: dTrace("msg:%p, type:%s will be written into vnode-query queue", pMsg, TMSG_INFO(pRpc->msgType)); @@ -326,7 +307,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->mgmtWorker; - dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -335,7 +316,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; - dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } @@ -350,9 +331,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); int32_t code = 0; - if (pMsg == NULL) { - code = -1; - } else { + if (pMsg != NULL) { dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; // if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0); @@ -377,7 +356,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT dTrace("msg:%p, will be put into vnode-merge queue", pMsg); taosWriteQitem(pVnode->pMergeQ, pMsg); break; - case SYNC_QUEUE: // sync integration + case SYNC_QUEUE: dTrace("msg:%p, will be put into vnode-sync queue", pMsg); taosWriteQitem(pVnode->pSyncQ, pMsg); break; @@ -387,6 +366,7 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT break; } } + vmReleaseVnode(pMgmt, pVnode); return code; } @@ -395,6 +375,14 @@ int32_t vmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, WRITE_QUEUE); } +int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, SYNC_QUEUE); +} + +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); +} + int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE); } @@ -403,30 +391,15 @@ int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE); } -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); -} - int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE); } -// sync integration -int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - return vmPutRpcMsgToQueue(pWrapper, pRpc, SYNC_QUEUE); -} - int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t size = -1; SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId); if (pVnode != NULL) { switch (qtype) { - case QUERY_QUEUE: - size = taosQueueSize(pVnode->pQueryQ); - break; - case FETCH_QUEUE: - size = taosQueueSize(pVnode->pFetchQ); - break; case WRITE_QUEUE: size = taosQueueSize(pVnode->pWriteQ); break; @@ -436,6 +409,12 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { case APPLY_QUEUE: size = taosQueueSize(pVnode->pApplyQ); break; + case QUERY_QUEUE: + size = taosQueueSize(pVnode->pQueryQ); + break; + case FETCH_QUEUE: + size = taosQueueSize(pVnode->pFetchQ); + break; case MERGE_QUEUE: size = taosQueueSize(pVnode->pMergeQ); break; @@ -449,14 +428,14 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); - pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); - if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || - pVnode->pQueryQ == NULL || pVnode->pMergeQ == NULL) { + if (pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pApplyQ == NULL || pVnode->pQueryQ == NULL || + pVnode->pFetchQ == NULL || pVnode->pMergeQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -466,17 +445,17 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { } void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { - tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); + tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ); - tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); pVnode->pWriteQ = NULL; - pVnode->pApplyQ = NULL; pVnode->pSyncQ = NULL; - pVnode->pFetchQ = NULL; + pVnode->pApplyQ = NULL; pVnode->pQueryQ = NULL; + pVnode->pFetchQ = NULL; pVnode->pMergeQ = NULL; dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); } @@ -499,17 +478,23 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = tsNumOfVnodeWriteThreads; if (tWWorkerInit(pWPool) != 0) return -1; - pWPool = &pMgmt->syncPool; - pWPool->name = "vnode-sync"; - pWPool->max = tsNumOfVnodeSyncThreads; - if (tWWorkerInit(pWPool) != 0) return -1; - - pWPool = &pMgmt->mergePool; - pWPool->name = "vnode-merge"; - pWPool->max = tsNumOfVnodeMergeThreads; - if (tWWorkerInit(pWPool) != 0) return -1; - - SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + SWWorkerPool *pSPool = &pMgmt->syncPool; + pSPool->name = "vnode-sync"; + pSPool->max = tsNumOfVnodeSyncThreads; + if (tWWorkerInit(pSPool) != 0) return -1; + + SWWorkerPool *pMPool = &pMgmt->mergePool; + pMPool->name = "vnode-merge"; + pMPool->max = tsNumOfVnodeMergeThreads; + if (tWWorkerInit(pMPool) != 0) return -1; + + SSingleWorkerCfg cfg = { + .min = 1, + .max = 1, + .name = "vnode-mgmt", + .fp = (FItem)vmProcessMgmtMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; @@ -517,7 +502,12 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "vnode-monitor", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "vnode-monitor", + .fp = (FItem)vmProcessMgmtMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start mnode vnode-monitor worker since %s", terrstr()); return -1; @@ -531,10 +521,10 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { void vmStopWorker(SVnodesMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker); - tQWorkerCleanup(&pMgmt->fetchPool); - tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->syncPool); + tQWorkerCleanup(&pMgmt->queryPool); + tQWorkerCleanup(&pMgmt->fetchPool); tWWorkerCleanup(&pMgmt->mergePool); dDebug("vnode workers are closed"); } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index f8bd14813c7a4e5707e7c7beb20dabb23d23d6e2..e9037a7b115c93af31ca5d2d15dff148585d42db 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -310,7 +310,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { code = taosFsyncFile(pFile); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to write file:%s since %s", tmpfile, tstrerror(code)); + mError("failed to sync file:%s since %s", tmpfile, tstrerror(code)); } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 28f3b3edb9394e5e0f46705f512781f5590b964d..8850b353970aef29b8509d040877c95530b27230 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -148,7 +148,7 @@ struct STsdbCfg { struct SVnodeCfg { int32_t vgId; - char dbname[TSDB_DB_NAME_LEN]; + char dbname[TSDB_DB_FNAME_LEN]; uint64_t dbId; int32_t szPage; int32_t szCache; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e0f8943a48ccb707b86956459b9f4c6c4f24ab5a..ff0ebefb42baa7b93a92f2f38e2788ef8397db06 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1985,7 +1985,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf return; } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; - TSKEY lastRowKey = TSKEY_INITIAL_VAL; + TSKEY lastKeyAppend = TSKEY_INITIAL_VAL; do { STSRow* row2 = NULL; @@ -2019,7 +2019,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey); + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); // numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -2076,7 +2076,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey); + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); // ++numOfRows; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -2117,10 +2117,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t qstart = 0, qend = 0; getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); + lastKeyAppend = tsArray[qend]; numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend); pos += (qend - qstart + 1) * step; - + if(numOfRows > 0) { + curRow = numOfRows - 1; + } cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; cur->lastKey = cur->win.ekey + step; } diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index c31cabda1960780331650c1db90912db3d314632..7a4cd8092205786065015252432dcb4de0a1db41 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -36,7 +36,7 @@ target_link_libraries( PRIVATE os util common nodes function ) -add_library(udf1 MODULE test/udf1.c) +add_library(udf1 STATIC MODULE test/udf1.c) target_include_directories( udf1 PUBLIC @@ -50,7 +50,7 @@ target_include_directories( target_link_libraries( udf1 PUBLIC os) -add_library(udf2 MODULE test/udf2.c) +add_library(udf2 STATIC MODULE test/udf2.c) target_include_directories( udf2 PUBLIC diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 27efbcda53908d08273f143a7be422d7f7b996ab..7f8ad150f0dddedd13be9e5a927796944e3b62e9 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -67,6 +67,7 @@ typedef struct SSrvMsg { typedef struct SWorkThrdObj { TdThread thread; + uv_connect_t connect_req; uv_pipe_t* pipe; uv_os_fd_t fd; uv_loop_t* loop; @@ -87,8 +88,10 @@ typedef struct SServerObj { // work thread info int workerIdx; int numOfThreads; + int numOfWorkerReady; SWorkThrdObj** pThreadObj; + uv_pipe_t pipeListen; uv_pipe_t** pipe; uint32_t ip; uint32_t port; @@ -161,7 +164,7 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(void* arg); +static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName); static bool addHandleToAcceptloop(void* arg); #define CONN_SHOULD_RELEASE(conn, head) \ @@ -577,6 +580,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_tcp_init(pObj->loop, cli); if (uv_accept(stream, (uv_stream_t*)cli) == 0) { + if (pObj->numOfWorkerReady < pObj->numOfThreads) { + tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); + uv_close((uv_handle_t*)cli, NULL); + return; + } + uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); @@ -672,15 +681,21 @@ void* transAcceptThread(void* arg) { return NULL; } -static bool addHandleToWorkloop(void* arg) { - SWorkThrdObj* pThrd = arg; +void uvOnPipeConnectionCb(uv_connect_t *connect, int status) { + if (status != 0) { + return; + } + SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); +} +static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); if (0 != uv_loop_init(pThrd->loop)) { return false; } uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - uv_pipe_open(pThrd->pipe, pThrd->fd); + // int r = uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; @@ -691,7 +706,8 @@ static bool addHandleToWorkloop(void* arg) { QUEUE_INIT(&pThrd->conn); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); - uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); + // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; } @@ -802,12 +818,32 @@ static void uvDestroyConn(uv_handle_t* handle) { uv_walk(thrd->loop, uvWalkCb, NULL); } } +static void uvPipeListenCb(uv_stream_t* handle, int status) { + ASSERT(status == 0); + + SServerObj* srv = container_of(handle, SServerObj, pipeListen); + uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); + ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1)); + ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe)); + + ASSERT(1 == uv_is_readable((uv_stream_t*)pipe)); + ASSERT(1 == uv_is_writable((uv_stream_t*)pipe)); + ASSERT(0 == uv_is_closing((uv_handle_t*)pipe)); + + srv->numOfWorkerReady++; + + // ASSERT(0 == uv_listen((uv_stream_t*)&ctx.send.tcp, 512, uvOnAcceptCb)); + + // r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb); + // ASSERT(r == 0); +} void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); srv->numOfThreads = numOfThreads; srv->workerIdx = 0; + srv->numOfWorkerReady = 0; srv->pThreadObj = (SWorkThrdObj**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrdObj*)); srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*)); srv->ip = ip; @@ -817,6 +853,16 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, taosThreadOnce(&transModuleInit, uvInitEnv); transSrvInst++; + char pipeName[64]; + assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); +#ifdef WINDOWS + snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc\\%p-%lu", taosSafeRand(), GetCurrentProcessId()); +#else + snprintf(pipeName, sizeof(pipeName), ".trans.rpc\\%08X-%lu", taosSafeRand(), taosGetSelfPthreadId()); +#endif + assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); + assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); + for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); thrd->pTransInst = shandle; @@ -826,17 +872,22 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - uv_os_sock_t fds[2]; - if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - goto End; - } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - thrd->fd = fds[0]; + // #ifdef WINDOWS + // uv_file fds[2]; + // if (uv_pipe(fds, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE, UV_READABLE_PIPE|UV_WRITABLE_PIPE|UV_NONBLOCK_PIPE) != 0) { + // #else + // uv_os_sock_t fds[2]; + // if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + // #endif + // goto End; + // } + // uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + // uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write + + // thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd)) { + if (false == addHandleToWorkloop(thrd,pipeName)) { goto End; } int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 19e4defafc47dfb0b6b84a856ddac8d8484b42c0..a29ab8b454e4388b2477e6b6c9b690c007a73123 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -204,7 +204,7 @@ int32_t taosExpandDir(const char *dirname, char *outname, int32_t maxlen) { int32_t taosRealPath(char *dirname, char *realPath, int32_t maxlen) { char tmp[PATH_MAX] = {0}; #ifdef WINDOWS - if (_fullpath(dirname, tmp, maxlen) != NULL) { + if (_fullpath(tmp, dirname, maxlen) != NULL) { #else if (realpath(dirname, tmp) != NULL) { #endif diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index d378b5234aedb6d8e520a7ca6781a17826def3bc..ab68c69b8db2a6be1b7467093a09c1174a8dac95 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -543,7 +543,7 @@ int32_t taosFsyncFile(TdFilePtr pFile) { HANDLE h = (HANDLE)_get_osfhandle(pFile->fd); - return FlushFileBuffers(h); + return !FlushFileBuffers(h); #else if (pFile == NULL) { return 0; diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 3c3612854c8a6bf8f943e55261fd61aeff340394..348424b37246222cfdb74dcff0513c0f2a5711e9 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -869,11 +869,15 @@ SysNameInfo taosGetSysNameInfo() { SysNameInfo info = {0}; DWORD dwVersion = GetVersion(); - tstrncpy(info.sysname, getenv("OS"), sizeof(info.sysname)); - tstrncpy(info.nodename, getenv("COMPUTERNAME"), sizeof(info.nodename)); + char *tmp = NULL; + tmp = getenv("OS"); + if (tmp != NULL) tstrncpy(info.sysname, tmp, sizeof(info.sysname)); + tmp = getenv("COMPUTERNAME"); + if (tmp != NULL) tstrncpy(info.nodename, tmp, sizeof(info.nodename)); sprintf_s(info.release, sizeof(info.release), "%d", dwVersion & 0x0F); sprintf_s(info.version, sizeof(info.release), "%d", (dwVersion >> 8) & 0x0F); - tstrncpy(info.machine, getenv("PROCESSOR_ARCHITECTURE"), sizeof(info.machine)); + tmp = getenv("PROCESSOR_ARCHITECTURE"); + if (tmp != NULL) tstrncpy(info.machine, tmp, sizeof(info.machine)); return info; #elif defined(_TD_DARWIN_64)