提交 24863d6d 编写于 作者: H hjxilinx

[tbase-1212]

上级 a423258f
......@@ -81,6 +81,8 @@ int32_t vnodeSetMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeClearMeterState(SMeterObj* pMeterObj, int32_t state);
bool vnodeIsMeterState(SMeterObj* pMeterObj, int32_t state);
void vnodeSetMeterDeleting(SMeterObj* pMeterObj);
int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st);
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid);
void vnodeFreeColumnInfo(SColumnInfo* pColumnInfo);
bool isGroupbyNormalCol(SSqlGroupbyExpr* pExpr);
......
......@@ -286,12 +286,9 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
SShellObj *pShell = pImport->pShell;
pImport->retry++;
//slow query will block the import operation
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, state);
int32_t code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING);
if (code == TSDB_CODE_NOT_ACTIVE_TABLE) {
return;
}
......@@ -303,14 +300,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
//if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) {
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || code == TSDB_CODE_ACTION_IN_PROGRESS) {
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (pImport->retry < 1000) {
dTrace("vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
commitInProcess, num, state);
commitInProcess, num, pObj->state);
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl);
return;
......@@ -320,15 +317,14 @@ void vnodeProcessImportTimer(void *param, void *tmrId) {
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, pImport);
int32_t ret = vnodeImportData(pObj, pImport);
if (pShell) {
pShell->code = code;
pShell->code = ret;
pShell->numOfTotalPoints += pImport->importedRows;
}
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++;
// send response back to shell
......@@ -912,16 +908,12 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
if (*((TSKEY *)(pSubmit->payLoad + (rows - 1) * pObj->bytesPerPoint)) > pObj->lastKey) {
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now);
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += pointsImported;
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else {
SImportInfo *pNew, import;
......@@ -933,7 +925,11 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
import.pShell = pShell;
import.payload = payload;
import.rows = rows;
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
return code;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
......@@ -944,7 +940,8 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo));
pNew->signature = pNew;
......@@ -956,19 +953,25 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->meterId, commitInProcess, pObj->numOfQueries);
/*
* vnodeProcessImportTimer will set the import status for this table, so need to
* set the import flag here
*/
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0;
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, &import);
int ret = vnodeImportData(pObj, &import);
if (pShell) {
pShell->code = code;
pShell->code = ret;
pShell->numOfTotalPoints += import.importedRows;
}
}
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++;
if (pShell) {
......
......@@ -572,7 +572,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
dTrace("vid:%d sid:%d id:%s, cache is full, freePoints:%d, notFreeSlots:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->freePoints, pPool->notFreeSlots);
vnodeProcessCommitTimer(pVnode, NULL);
return TSDB_CODE_ACTION_IN_PROGRESS;
return code;
}
// FIXME: Here should be after the comparison of sversions.
......@@ -608,7 +608,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pObj->vnode, pObj->sid, pObj->meterId, pVnode->lastKeyOnFile, numOfPoints,firstKey, lastKey, minAllowedKey, maxAllowedKey);
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
}
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_INSERT)) != TSDB_CODE_SUCCESS) {
goto _over;
}
for (i = 0; i < numOfPoints; ++i) {
// meter will be dropped, abort current insertion
if (pObj->state >= TSDB_METER_STATE_DELETING) {
......@@ -654,6 +658,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pthread_mutex_unlock(&(pVnode->vmutex));
_over:
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
dTrace("vid:%d sid:%d id:%s, %d out of %d points are inserted, lastKey:%ld source:%d, vnode total storage: %ld",
pObj->vnode, pObj->sid, pObj->meterId, points, numOfPoints, pObj->lastKey, source,
pVnode->vnodeStatistic.totalStorage);
......
......@@ -565,40 +565,15 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int sversion = htonl(pBlocks->sversion);
int32_t state = TSDB_METER_STATE_READY;
if (pSubmit->import) {
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
} else {
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
}
if (state == TSDB_METER_STATE_READY) {
// meter status is ready for insert/import
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
}
if (code != TSDB_CODE_SUCCESS) {break;}
} else {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
}
}
if (code != TSDB_CODE_SUCCESS) {break;}
numOfTotalPoints += numOfPoints;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
......
......@@ -55,14 +55,11 @@ void vnodeProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
contLen += sizeof(SSubmitMsg);
int32_t numOfPoints = 0;
int32_t code = vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion,
&numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision));
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
if (state == TSDB_METER_STATE_READY) {
vnodeInsertPoints(pObj, (char *)pMsg, contLen, TSDB_DATA_SOURCE_SHELL, NULL, pObj->sversion, &numOfPoints, taosGetTimestamp(vnodeList[pObj->vnode].cfg.precision));
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else {
dError("vid:%d sid:%d id:%s, failed to insert continuous query results, state:%d", pObj->vnode, pObj->sid,
pObj->meterId, state);
if (code != TSDB_CODE_SUCCESS) {
dError("vid:%d sid:%d id:%s, failed to insert continuous query results", pObj->vnode, pObj->sid, pObj->meterId);
}
assert(numOfPoints >= 0 && numOfPoints <= 1);
......
......@@ -668,6 +668,26 @@ void vnodeSetMeterDeleting(SMeterObj* pMeterObj) {
pMeterObj->state |= TSDB_METER_STATE_DELETING;
}
int32_t vnodeSetMeterInsertImportStateEx(SMeterObj* pObj, int32_t st) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t state = vnodeSetMeterState(pObj, st);
if (state != TSDB_METER_STATE_READY) {//return to denote import is not performed
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, meter is deleted, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pObj->vnode, pObj->sid,
pObj->meterId, pObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
}
return code;
}
bool vnodeIsSafeToDeleteMeter(SVnodeObj* pVnode, int32_t sid) {
SMeterObj* pObj = pVnode->meterList[sid];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册