diff --git a/src/system/src/dnodeService.c b/src/system/src/dnodeService.c index 8249c51faa658e16347d51e3948f827670ea5b23..17c4c736d126998793e07fcc14b74adb638434fe 100644 --- a/src/system/src/dnodeService.c +++ b/src/system/src/dnodeService.c @@ -66,7 +66,7 @@ int main(int argc, char *argv[]) { exit(EXIT_FAILURE); } } else if (strcmp(argv[i], "-V") == 0) { - printf("%s %s\n", version, compatible_version); + printf("version: %s compatible_version: %s\n", version, compatible_version); printf("gitinfo: %s\n", gitinfo); printf("buildinfo: %s\n", buildinfo); return 0; diff --git a/src/system/src/mgmtDnodeInt.c b/src/system/src/mgmtDnodeInt.c index 304d4914029f9ea1a6767124a6c60f8539b6b395..8be91fd519501ae8da84413ad9b602e227d40f70 100644 --- a/src/system/src/mgmtDnodeInt.c +++ b/src/system/src/mgmtDnodeInt.c @@ -29,6 +29,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode); void vnodeProcessMsgFromMgmt(SSchedMsg *smsg); void *rpcQhandle; +extern void *dmQhandle; int mgmtSendMsgToDnode(char *msg) { mTrace("msg:%s is sent to dnode", taosMsg[*msg]); @@ -38,7 +39,7 @@ int mgmtSendMsgToDnode(char *msg) { schedMsg.msg = msg; schedMsg.ahandle = NULL; schedMsg.thandle = NULL; - taosScheduleTask(rpcQhandle, &schedMsg); + taosScheduleTask(dmQhandle, &schedMsg); return 0; } diff --git a/src/system/src/vnodeCommit.c b/src/system/src/vnodeCommit.c index 9982ce45d8a0a693e4c0c7f479687a938fa8154c..a14e8118c49bfec6c19224f125d84349cb0b138e 100644 --- a/src/system/src/vnodeCommit.c +++ b/src/system/src/vnodeCommit.c @@ -144,6 +144,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) { goto _error; } + TSKEY now = taosGetTimestamp(pVnode->cfg.precision); SCommitHead head; int simpleCheck = 0; while (1) { @@ -180,7 +181,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) { int32_t numOfPoints = 0; (*vnodeProcessAction[head.action])(pObj, cont, head.contLen, TSDB_DATA_SOURCE_LOG, NULL, head.sversion, - &numOfPoints); + &numOfPoints, now); actions++; } else { break; diff --git a/src/system/src/vnodeMeter.c b/src/system/src/vnodeMeter.c index 27e6e74952851364036e304b65b2f90e59b44e18..f94276290f4eae3b004a8ef555854d2ecc18fb92 100644 --- a/src/system/src/vnodeMeter.c +++ b/src/system/src/vnodeMeter.c @@ -35,7 +35,7 @@ int tsMeterSizeOnFile; void vnodeUpdateMeter(void *param, void *tmdId); void vnodeRecoverMeterObjectFile(int vnode); -int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *) = {vnodeInsertPoints, +int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *, TSKEY) = {vnodeInsertPoints, vnodeImportPoints}; void vnodeFreeMeterObj(SMeterObj *pObj) { @@ -506,7 +506,7 @@ int vnodeRemoveMeterObj(int vnode, int sid) { } int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, - int *numOfInsertPoints) { + int *numOfInsertPoints, TSKEY now) { int expectedLen, i; short numOfPoints; SSubmitMsg *pSubmit = (SSubmitMsg *)cont; @@ -528,7 +528,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi // to guarantee time stamp is the same for all vnodes pData = pSubmit->payLoad; - tsKey = taosGetTimestamp(pVnode->cfg.precision); + tsKey = now; cfile = tsKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision]; if (*((TSKEY *)pData) == 0) { for (i = 0; i < numOfPoints; ++i) { diff --git a/src/system/src/vnodeShell.c b/src/system/src/vnodeShell.c index c4aece259ebf83bddd6d31f2a7736b1b9d060efc..d193c51ac1bb1e4990a156c200b065f077fcd697 100644 --- a/src/system/src/vnodeShell.c +++ b/src/system/src/vnodeShell.c @@ -484,6 +484,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int32_t numOfPoints = 0; int32_t numOfTotalPoints = 0; + TSKEY now = taosGetTimestamp(pVnode->cfg.precision); for (int32_t i = 0; i < pSubmit->numOfSid; ++i) { numOfPoints = 0; @@ -523,11 +524,11 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { // meter status is ready for insert/import if (pSubmit->import) { code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, - sversion, &numOfPoints); + sversion, &numOfPoints, now); vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); } else { code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, - sversion, &numOfPoints); + sversion, &numOfPoints, now); vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT); } diff --git a/src/system/src/vnodeStream.c b/src/system/src/vnodeStream.c index a8dcff231d6d12e8bceed83e37b0fd51592f67ca..2f8cfec1f191f98faaaea95e5ba3b41251bcae71 100644 --- a/src/system/src/vnodeStream.c +++ b/src/system/src/vnodeStream.c @@ -57,7 +57,7 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); if (state == TSDB_METER_STATE_READY) { - vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints); + vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); } else { dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid, diff --git a/src/system/src/vnodeSystem.c b/src/system/src/vnodeSystem.c index 8d3b66e52008a1e8690285ff0c2d3ed2a29dae1f..e74e172af0c4720caf455f15f1b038d2bd9bc576 100644 --- a/src/system/src/vnodeSystem.c +++ b/src/system/src/vnodeSystem.c @@ -41,9 +41,11 @@ int vnodeInitSystem() { if (numOfThreads < 1) numOfThreads = 1; queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query"); - // numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; - // if (numOfThreads < 1) numOfThreads = 1; - rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, 1, "dnode"); + numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; + if (numOfThreads < 1) numOfThreads = 1; + rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "dnode"); + + dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); vnodeTmrCtrl = taosTmrInit(tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode"); if (vnodeTmrCtrl == NULL) { @@ -70,11 +72,3 @@ int vnodeInitSystem() { return 0; } - -void vnodeInitQHandle() { - // int numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0; - // if (numOfThreads < 1) numOfThreads = 1; - rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, 1, "dnode"); - - dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt"); -}