提交 335d42cf 编写于 作者: S slguan

fix the issue #443

上级 b806c458
......@@ -66,7 +66,7 @@ int main(int argc, char *argv[]) {
exit(EXIT_FAILURE);
}
} else if (strcmp(argv[i], "-V") == 0) {
printf("%s %s\n", version, compatible_version);
printf("version: %s compatible_version: %s\n", version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildinfo: %s\n", buildinfo);
return 0;
......
......@@ -29,6 +29,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode);
void vnodeProcessMsgFromMgmt(SSchedMsg *smsg);
void *rpcQhandle;
extern void *dmQhandle;
int mgmtSendMsgToDnode(char *msg) {
mTrace("msg:%s is sent to dnode", taosMsg[*msg]);
......@@ -38,7 +39,7 @@ int mgmtSendMsgToDnode(char *msg) {
schedMsg.msg = msg;
schedMsg.ahandle = NULL;
schedMsg.thandle = NULL;
taosScheduleTask(rpcQhandle, &schedMsg);
taosScheduleTask(dmQhandle, &schedMsg);
return 0;
}
......
......@@ -144,6 +144,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
goto _error;
}
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
SCommitHead head;
int simpleCheck = 0;
while (1) {
......@@ -180,7 +181,7 @@ size_t vnodeRestoreDataFromLog(int vnode, char *fileName, uint64_t *firstV) {
int32_t numOfPoints = 0;
(*vnodeProcessAction[head.action])(pObj, cont, head.contLen, TSDB_DATA_SOURCE_LOG, NULL, head.sversion,
&numOfPoints);
&numOfPoints, now);
actions++;
} else {
break;
......
......@@ -35,7 +35,7 @@ int tsMeterSizeOnFile;
void vnodeUpdateMeter(void *param, void *tmdId);
void vnodeRecoverMeterObjectFile(int vnode);
int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *) = {vnodeInsertPoints,
int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *, TSKEY) = {vnodeInsertPoints,
vnodeImportPoints};
void vnodeFreeMeterObj(SMeterObj *pObj) {
......@@ -506,7 +506,7 @@ int vnodeRemoveMeterObj(int vnode, int sid) {
}
int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *numOfInsertPoints) {
int *numOfInsertPoints, TSKEY now) {
int expectedLen, i;
short numOfPoints;
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
......@@ -528,7 +528,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
// to guarantee time stamp is the same for all vnodes
pData = pSubmit->payLoad;
tsKey = taosGetTimestamp(pVnode->cfg.precision);
tsKey = now;
cfile = tsKey/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
if (*((TSKEY *)pData) == 0) {
for (i = 0; i < numOfPoints; ++i) {
......
......@@ -484,6 +484,7 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int32_t numOfPoints = 0;
int32_t numOfTotalPoints = 0;
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
for (int32_t i = 0; i < pSubmit->numOfSid; ++i) {
numOfPoints = 0;
......@@ -523,11 +524,11 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
// meter status is ready for insert/import
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints);
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints);
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
}
......
......@@ -57,7 +57,7 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
if (state == TSDB_METER_STATE_READY) {
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints);
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision));
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else {
dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid,
......
......@@ -41,9 +41,11 @@ int vnodeInitSystem() {
if (numOfThreads < 1) numOfThreads = 1;
queryQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "query");
// numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
// if (numOfThreads < 1) numOfThreads = 1;
rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, 1, "dnode");
numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
if (numOfThreads < 1) numOfThreads = 1;
rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, numOfThreads, "dnode");
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
vnodeTmrCtrl = taosTmrInit(tsSessionsPerVnode + 1000, 200, 60000, "DND-vnode");
if (vnodeTmrCtrl == NULL) {
......@@ -70,11 +72,3 @@ int vnodeInitSystem() {
return 0;
}
void vnodeInitQHandle() {
// int numOfThreads = (1.0 - tsRatioOfQueryThreads) * tsNumOfCores * tsNumOfThreadsPerCore / 2.0;
// if (numOfThreads < 1) numOfThreads = 1;
rpcQhandle = taosInitScheduler(tsNumOfVnodesPerCore * tsNumOfCores * tsSessionsPerVnode, 1, "dnode");
dmQhandle = taosInitScheduler(tsSessionsPerVnode, 1, "mgmt");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册