diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 68c3266e5582a33ca10d087987eb0550cb198aa8..48f203f3181296add0eeb112a7eb65dffd9f2152 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -232,26 +232,35 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi import.rows = rows; // FIXME: mutex here seems meaningless and num here still can be changed - int32_t num = 0; - pthread_mutex_lock(&pVnode->vmutex); - num = pObj->numOfQueries; - pthread_mutex_unlock(&pVnode->vmutex); + // int32_t num = 0; + // pthread_mutex_lock(&pVnode->vmutex); + // num = pObj->numOfQueries; + // pthread_mutex_unlock(&pVnode->vmutex); int32_t commitInProcess = 0; pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { + if ((commitInProcess = pPool->commitInProcess) == 1) { // mutual exclusion with read (need to change here) pthread_mutex_unlock(&pPool->vmutex); vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); return TSDB_CODE_ACTION_IN_PROGRESS; - - } else { - pPool->commitInProcess = 1; - pthread_mutex_unlock(&pPool->vmutex); - code = vnodeImportData(pObj, &import); - *pNumOfPoints = import.importedRows; + } + + int loop = 0; + while (pObj->numOfQueries > 0) { + loop++; + if (loop > 1000) { + sched_yield(); + loop = 0; + } } + + pPool->commitInProcess = 1; + pthread_mutex_unlock(&pPool->vmutex); + code = vnodeImportData(pObj, &import); + *pNumOfPoints = import.importedRows; + pVnode->version++; vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); }