提交 178f5771 编写于 作者: H hjxilinx

[jira none]

上级 0ba6a7cd
......@@ -166,7 +166,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
int code = TSDB_CODE_SUCCESS;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SShellObj * pShell = (SShellObj *)param;
int pointsImported = 0;
TSKEY firstKey, lastKey;
payload = pSubmit->payLoad;
......@@ -200,25 +199,29 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (code != 0) return code;
}
/*
* The timestamp of all records in a submit payload are always in ascending order, guaranteed by client, so here only
* the first key.
*/
if (firstKey > pObj->lastKey) { // Just call insert
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
// TODO: Here may fail to set the state, add error handling.
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now);
// TODO: outside clear state function is invalid for this structure
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else { // trigger import
if (sversion != pObj->sversion) {
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->sversion, sversion);
return TSDB_CODE_OTHERS;
}
SImportInfo import;
// check the table status for perform import historical data
if ((code = vnodeSetMeterInsertImportStateEx(pObj, TSDB_METER_STATE_IMPORTING)) != TSDB_CODE_SUCCESS) {
return code;
}
SImportInfo import = {0};
dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey);
memset(&import, 0, sizeof(import));
import.firstKey = firstKey;
import.lastKey = lastKey;
import.pObj = pObj;
......@@ -226,8 +229,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
import.payload = payload;
import.rows = rows;
// FIXME: mutex here seems meaningless and num here still can
// be changed
// FIXME: mutex here seems meaningless and num here still can be changed
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
......@@ -236,10 +238,12 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) ||
num > 0) { // mutual exclusion with read (need to change here)
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) {
// mutual exclusion with read (need to change here)
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
return TSDB_CODE_ACTION_IN_PROGRESS;
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
......@@ -248,7 +252,8 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
pVnode->version++;
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
return code;
}
......
......@@ -584,12 +584,12 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_INSERT, cont, contLen, sversion);
if (code != 0) return code;
if (code != TSDB_CODE_SUCCESS) return code;
}
if (source == TSDB_DATA_SOURCE_SHELL && pVnode->cfg.replications > 1) {
code = vnodeForwardToPeer(pObj, cont, contLen, TSDB_ACTION_INSERT, sversion);
if (code != 0) return code;
if (code != TSDB_CODE_SUCCESS) return code;
}
if (pObj->sversion < sversion) {
......@@ -601,11 +601,11 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
pData = pSubmit->payLoad;
code = TSDB_CODE_SUCCESS;
TSKEY firstKey = *((TSKEY *)pData);
TSKEY lastKey = *((TSKEY *)(pData + pObj->bytesPerPoint * (numOfPoints - 1)));
int cfid = now/pVnode->cfg.daysPerFile/tsMsPerDay[pVnode->cfg.precision];
TSKEY minAllowedKey = (cfid - pVnode->maxFiles + 1)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision];
TSKEY maxAllowedKey = (cfid + 2)*pVnode->cfg.daysPerFile*tsMsPerDay[pVnode->cfg.precision] - 2;
if (firstKey < minAllowedKey || firstKey > maxAllowedKey || lastKey < minAllowedKey || lastKey > maxAllowedKey) {
......@@ -619,7 +619,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
for (i = 0; i < numOfPoints; ++i) { // meter will be dropped, abort current insertion
if (pObj->state >= TSDB_METER_STATE_DELETING) {
if (vnodeIsMeterState(pObj, TSDB_METER_STATE_DELETING)) {
dWarn("vid:%d sid:%d id:%s, meter is dropped, abort insert, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
pObj->state);
......@@ -648,6 +648,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pData += pObj->bytesPerPoint;
points++;
}
atomic_fetch_add_64(&(pVnode->vnodeStatistic.pointsWritten), points * (pObj->numOfColumns - 1));
atomic_fetch_add_64(&(pVnode->vnodeStatistic.totalStorage), points * pObj->bytesPerPoint);
......@@ -660,6 +661,7 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pVnode->version++;
pthread_mutex_unlock(&(pVnode->vmutex));
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
_over:
......
......@@ -509,7 +509,7 @@ static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *p
SMeterObj *pMeterObj = pVnode->meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active table", pVnode->vnode, sid);
dError("vid:%d sid:%d, not active table", pVnode->vnode, sid);
vnodeSendMeterCfgMsg(pVnode->vnode, sid);
return TSDB_CODE_NOT_ACTIVE_TABLE;
}
......@@ -581,41 +581,27 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
int32_t state = TSDB_METER_STATE_READY;
state = vnodeSetMeterState(pMeterObj, (pSubmit->import ? TSDB_METER_STATE_IMPORTING : TSDB_METER_STATE_INSERT));
if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
pObj->numOfTotalPoints += numOfPoints;
if (code == TSDB_CODE_SUCCESS) pObj->count--;
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
numOfTotalPoints += numOfPoints;
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
//records for one table should be consecutive located in the payload buffer, which is guaranteed by client
if (code == TSDB_CODE_SUCCESS) {
pObj->count--;
}
if (code != TSDB_CODE_SUCCESS) break;
} else {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else {// waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
}
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
numOfTotalPoints += numOfPoints;
}
if (code != TSDB_CODE_SUCCESS) break;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
......@@ -635,7 +621,7 @@ _submit_over:
pImportInfo->import = 1;
pImportInfo->vnode = pSubmit->vnode;
pImportInfo->numOfSid = pSubmit->numOfSid;
pImportInfo->ssid = i;
pImportInfo->ssid = i; // start from this position, not the initial position
pImportInfo->pObj = pObj;
pImportInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
assert(pImportInfo->offset >= 0);
......@@ -658,7 +644,7 @@ static void vnodeProcessBatchImportTimer(void *param, void *tmrId) {
SBatchImportInfo *pImportInfo = (SBatchImportInfo *)param;
assert(pImportInfo != NULL && pImportInfo->import);
int32_t i = 0, numOfPoints = 0, numOfTotalPoints = 0;
int32_t i = 0, numOfPoints = 0;
int32_t code = TSDB_CODE_SUCCESS;
SShellObj * pShell = pImportInfo->pObj;
......@@ -677,30 +663,11 @@ static void vnodeProcessBatchImportTimer(void *param, void *tmrId) {
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
int32_t state = TSDB_METER_STATE_READY;
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
pShell->numOfTotalPoints += numOfPoints;
if (code != TSDB_CODE_SUCCESS) break;
pShell->count--;
} else {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else { // waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
}
}
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell,
sversion, &numOfPoints, now);
pShell->numOfTotalPoints += numOfPoints;
if (code != TSDB_CODE_SUCCESS) break;
pShell->count--;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册