diff --git a/src/system/detail/inc/vnodeUtil.h b/src/system/detail/inc/vnodeUtil.h index 870579466013cb192f403f7e15c67c8f81fa668b..b0f573ba2d466b084ce750e2e5d368cca61d353e 100644 --- a/src/system/detail/inc/vnodeUtil.h +++ b/src/system/detail/inc/vnodeUtil.h @@ -81,6 +81,8 @@ int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state); bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state); void vnodeSetMeterDeleting(SMeterObj* pMeterObj); +int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st); + bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid); void vnodeFreeColumnInfo(SColumnInfo* pColumnInfo); bool isGroupbyNormalCol(SSqlGroupbyExpr* pExpr); diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index b2b937b0664654664906b78d082d48434e8757ff..4cc8e84e7ee40ade0842b7be8e757a6c810538fb 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -286,12 +286,9 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { SShellObj *pShell = pImport->pShell; pImport->retry++; - - //slow query will block the import operation - int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); - if (state >= TSDB_METER_STATE_DELETING) { - dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", - pObj->vnode, pObj->sid, pObj->meterId, state); + + int32_t code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING); + if (code == TSDB_CODE_NOT_ACTIVE_TABLE) { return; } @@ -303,14 +300,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { //if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY int32_t commitInProcess = 0; pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) { + if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) { pthread_mutex_unlock(&pPool->vmutex); vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); if (pImport->retry < 1000) { dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready." "commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId, - commitInProcess, num, state); + commitInProcess, num, pObj->state); taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); return; @@ -320,15 +317,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) { } else { pPool->commitInProcess = 1; pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, pImport); + int32_t ret = vnodeImportData(pObj, pImport); if (pShell) { - pShell->code = code; + pShell->code = ret; pShell->numOfTotalPoints += pImport->importedRows; } } vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - pVnode->version++; // send response back to shell @@ -912,16 +908,12 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) { - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); if (pShell) { pShell->code = code; pShell->numOfTotalPoints += pointsImported; } - - vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); } else { SImportInfo *pNew, import; @@ -933,7 +925,11 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi import.pShell = pShell; import.payload = payload; import.rows = rows; - + + if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) { + return code; + } + int32_t num = 0; pthread_mutex_lock(&pVnode->vmutex); num = pObj->numOfQueries; @@ -944,7 +940,8 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pthread_mutex_lock(&pPool->vmutex); if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { pthread_mutex_unlock(&pPool->vmutex); - + vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); + pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); memcpy(pNew, &import, sizeof(SImportInfo)); pNew->signature = pNew; @@ -956,19 +953,25 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, pObj->meterId, commitInProcess, pObj->numOfQueries); + /* + * vnodeProcessImportTimer will set the import status for this table, so need to + * set the import flag here + */ taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); return 0; } else { pPool->commitInProcess = 1; pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, &import); + + int ret = vnodeImportData(pObj, &import); if (pShell) { - pShell->code = code; + pShell->code = ret; pShell->numOfTotalPoints += import.importedRows; } } } + vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); pVnode->version++; if (pShell) { diff --git a/src/system/detail/src/vnodeMeter.c b/src/system/detail/src/vnodeMeter.c index 14cdf9eb78ef5bc5ce85c08b196c856b2b571929..42353c36a4feb99d33163dbbae015bd15475756a 100644 --- a/src/system/detail/src/vnodeMeter.c +++ b/src/system/detail/src/vnodeMeter.c @@ -572,7 +572,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi dTrace("vid:%d sid:%d id:%s, cache is full, freePoints:%d, notFreeSlots:%d", pObj->vnode, pObj->sid, pObj->meterId, pObj->freePoints, pPool->notFreeSlots); vnodeProcessCommitTimer(pVnode, NULL); - return TSDB_CODE_ACTION_IN_PROGRESS; + return code; } // FIXME: Here should be after the comparison of sversions. @@ -608,7 +608,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, numOfPoints,firstKey, lastKey, minAllowedKey, maxAllowedKey); return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; } - + + if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_INSERT)) != TSDB_CODE_SUCCESS) { + goto _over; + } + for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion if (pObj->state >= TSDB_METER_STATE_DELETING) { @@ -654,6 +658,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi pthread_mutex_unlock(&(pVnode->vmutex)); _over: + vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); + dTrace("vid:%d sid:%d id:%s, %d out of %d points are inserted, lastKey:%ld source:%d, vnode total storage: %ld", pObj->vnode, pObj->sid, pObj->meterId, points, numOfPoints, pObj->lastKey, source, pVnode->vnodeStatistic.totalStorage); diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index a5f5259887efbf66dcdd16416e61cec952b992d0..b8516e127beeeab4d7af7ff2af06f22164d4b600 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -565,40 +565,15 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; int sversion = htonl(pBlocks->sversion); - int32_t state = TSDB_METER_STATE_READY; if (pSubmit->import) { - state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); + code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, + sversion, &numOfPoints, now); } else { - state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT); + code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, + sversion, &numOfPoints, now); } - if (state == TSDB_METER_STATE_READY) { - // meter status is ready for insert/import - if (pSubmit->import) { - code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, - sversion, &numOfPoints, now); - vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); - } else { - code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, - sversion, &numOfPoints, now); - vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT); - } - - if (code != TSDB_CODE_SUCCESS) {break;} - } else { - if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) { - dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, - pMeterObj->state); - code = TSDB_CODE_NOT_ACTIVE_TABLE; - break; - } else {// waiting for 300ms by default and try again - dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid, - pMeterObj->meterId, pMeterObj->state); - - code = TSDB_CODE_ACTION_IN_PROGRESS; - break; - } - } + if (code != TSDB_CODE_SUCCESS) {break;} numOfTotalPoints += numOfPoints; pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + diff --git a/src/system/detail/src/vnodeStream.c b/src/system/detail/src/vnodeStream.c index 562b7eb73c80d67664c4f3148757eab03f517634..874e59448de28953bfc98b4c85b8075d8b2fd926 100644 --- a/src/system/detail/src/vnodeStream.c +++ b/src/system/detail/src/vnodeStream.c @@ -55,14 +55,11 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) { contLen += sizeof(SSubmitMsg); int32_t numOfPoints = 0; + int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, + &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); - int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); - if (state == TSDB_METER_STATE_READY) { - vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision)); - vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); - } else { - dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid, - pObj->meterId, state); + if (code != TSDB_CODE_SUCCESS) { + dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId); } assert(numOfPoints >= 0 && numOfPoints <= 1); diff --git a/src/system/detail/src/vnodeUtil.c b/src/system/detail/src/vnodeUtil.c index b8c463f50b1d97c9ba23b17eb572f2c237a7220a..52b27f5d6d612a7626939a948983ecfe27f9d1ad 100644 --- a/src/system/detail/src/vnodeUtil.c +++ b/src/system/detail/src/vnodeUtil.c @@ -668,6 +668,26 @@ void vnodeSetMeterDeleting(SMeterObj* pMeterObj) { pMeterObj->state |= TSDB_METER_STATE_DELETING; } +int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) { + int32_t code = TSDB_CODE_SUCCESS; + + int32_t state = vnodeSetMeterState(pObj, st); + if (state != TSDB_METER_STATE_READY) {//return to denote import is not performed + if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) { + dTrace("vid:%d sid:%d id:%s, meter is deleted, state:%d", pObj->vnode, pObj->sid, pObj->meterId, + pObj->state); + code = TSDB_CODE_NOT_ACTIVE_TABLE; + } else {// waiting for 300ms by default and try again + dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pObj->vnode, pObj->sid, + pObj->meterId, pObj->state); + + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + } + + return code; +} + bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) { SMeterObj* pObj = pVnode->meterList[sid];