提交 528b6012 编写于 作者: H Hongze Cheng

Fix TBASE-1114

上级 23c7e045
......@@ -156,83 +156,13 @@ void vnodeGetValidDataRange(int vnode, TSKEY now, TSKEY *minKey, TSKEY *maxKey)
return;
}
void vnodeProcessImportTimer(void *param, void *tmrId) {
SImportInfo *pImport = (SImportInfo *)param;
if (pImport == NULL || pImport->signature != param) {
dError("import timer is messed up, signature:%p", pImport);
return;
}
SMeterObj * pObj = pImport->pObj;
SVnodeObj * pVnode = &vnodeList[pObj->vnode];
SCachePool *pPool = (SCachePool *)pVnode->pCachePool;
SShellObj * pShell = pImport->pShell;
pImport->retry++;
// slow query will block the import operation
int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (state >= TSDB_METER_STATE_DELETING) {
dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", pObj->vnode, pObj->sid, pObj->meterId,
state);
return;
}
int32_t num = 0;
pthread_mutex_lock(&pVnode->vmutex);
num = pObj->numOfQueries;
pthread_mutex_unlock(&pVnode->vmutex);
// if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY
int32_t commitInProcess = 0;
pthread_mutex_lock(&pPool->vmutex);
if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) {
pthread_mutex_unlock(&pPool->vmutex);
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
if (pImport->retry < 1000) {
dTrace(
"vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready."
"commitInProcess:%d, numOfQueries:%d, state:%d",
pObj->vnode, pObj->sid, pObj->meterId, commitInProcess, num, state);
taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl);
return;
} else {
pShell->code = TSDB_CODE_TOO_SLOW;
}
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, pImport);
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += pImport->importedRows;
}
}
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
pVnode->version++;
// send response back to shell
if (pShell) {
pShell->count--;
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pImport->pShell, pShell->code, pShell->numOfTotalPoints);
}
pImport->signature = NULL;
free(pImport->opayload);
free(pImport);
}
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion,
int *pNumOfPoints, TSKEY now) {
SSubmitMsg *pSubmit = (SSubmitMsg *)cont;
SVnodeObj * pVnode = vnodeList + pObj->vnode;
int rows = 0;
char * payload = NULL;
int code = TSDB_CODE_ACTION_IN_PROGRESS;
int code = TSDB_CODE_SUCCESS;
SCachePool *pPool = (SCachePool *)(pVnode->pCachePool);
SShellObj * pShell = (SShellObj *)param;
int pointsImported = 0;
......@@ -243,14 +173,10 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (firstKey > pObj->lastKey) { // Just call insert
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
// TODO: Here may fail to set the state, add error handling.
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, &pointsImported, now);
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += pointsImported;
}
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now);
// TODO: outside clear state function is invalid for this structure
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
} else { // trigger import
{
......@@ -290,7 +216,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
}
}
SImportInfo *pNew, import;
SImportInfo import;
dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld",
pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey);
......@@ -315,40 +241,17 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
if (((commitInProcess = pPool->commitInProcess) == 1) ||
num > 0) { // mutual exclusion with read (need to change here)
pthread_mutex_unlock(&pPool->vmutex);
pNew = (SImportInfo *)malloc(sizeof(SImportInfo));
memcpy(pNew, &import, sizeof(SImportInfo));
pNew->signature = pNew;
int payloadLen = contLen - sizeof(SSubmitMsg);
pNew->payload = malloc(payloadLen);
pNew->opayload = pNew->payload;
memcpy(pNew->payload, payload, payloadLen);
dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid,
pObj->meterId, commitInProcess, pObj->numOfQueries);
taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl);
return 0;
return TSDB_CODE_ACTION_IN_PROGRESS;
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, &import);
if (pShell) {
pShell->code = code;
pShell->numOfTotalPoints += import.importedRows;
*pNumOfPoints = import.importedRows;
}
}
}
// How about the retry? Will this also cause vnode version++?
pVnode->version++;
if (pShell) {
pShell->count--;
if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints);
}
return 0;
return code;
}
/* Function to search keys in a range
......
......@@ -38,10 +38,21 @@ SShellObj **shellList = NULL;
int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj);
static void vnodeProcessBatchImportTimer(void *param, void *tmrId);
int vnodeSelectReqNum = 0;
int vnodeInsertReqNum = 0;
typedef struct {
int32_t import;
int32_t vnode;
int32_t numOfSid;
int32_t ssid; // Start sid
SShellObj *pObj;
int64_t offset; // offset relative the blks
char blks[];
} SBatchImportInfo;
void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
int sid, vnode;
SShellObj *pObj = (SShellObj *)ahandle;
......@@ -242,6 +253,7 @@ int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints) {
char *pMsg, *pStart;
int msgLen;
dTrace("code:%d numOfTotalPoints:%d", code, numOfPoints);
pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128);
if (pStart == NULL) return -1;
pMsg = pStart;
......@@ -273,6 +285,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
}
if (pQueryMsg->numOfSids <= 0) {
dError("Invalid number of meters to query, numOfSids:%d", pQueryMsg->numOfSids);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
}
......@@ -482,10 +495,37 @@ int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) {
return msgLen;
}
static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *pVnode) {
int32_t sid = htonl(pBlocks->sid);
uint64_t uid = htobe64(pBlocks->uid);
if (sid >= pVnode->cfg.maxSessions || sid <= 0) {
dError("sid:%d is out of range", sid);
return TSDB_CODE_INVALID_TABLE_ID;
}
SMeterObj *pMeterObj = pVnode->meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active table", pVnode->vnode, sid);
vnodeSendMeterCfgMsg(pVnode->vnode, sid);
return TSDB_CODE_NOT_ACTIVE_TABLE;
}
if (pMeterObj->uid != uid) {
dError("vid:%d sid:%d, meterId:%s, uid:%lld, uid in msg:%lld, uid mismatch", pVnode->vnode, sid, pMeterObj->meterId,
pMeterObj->uid, uid);
return TSDB_CODE_INVALID_SUBMIT_MSG;
}
return TSDB_CODE_SUCCESS;
}
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int code = 0, ret = 0;
int32_t i = 0;
SShellSubmitMsg shellSubmit = *(SShellSubmitMsg *)pMsg;
SShellSubmitMsg *pSubmit = &shellSubmit;
SShellSubmitBlock *pBlocks = NULL;
pSubmit->vnode = htons(pSubmit->vnode);
pSubmit->numOfSid = htonl(pSubmit->numOfSid);
......@@ -524,67 +564,41 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
pObj->count = pSubmit->numOfSid; // for import
pObj->code = 0; // for import
pObj->numOfTotalPoints = 0; // for import
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
int32_t numOfPoints = 0;
int32_t numOfTotalPoints = 0;
// We take current time here to avoid it in the for loop.
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
for (int32_t i = 0; i < pSubmit->numOfSid; ++i) {
pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
for (i = 0; i < pSubmit->numOfSid; ++i) {
numOfPoints = 0;
pBlocks->sid = htonl(pBlocks->sid);
pBlocks->uid = htobe64(pBlocks->uid);
if (pBlocks->sid >= pVnode->cfg.maxSessions || pBlocks->sid <= 0) {
dTrace("sid:%d is out of range", pBlocks->sid);
code = TSDB_CODE_INVALID_TABLE_ID;
goto _submit_over;
}
int vnode = pSubmit->vnode;
int sid = pBlocks->sid;
SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid];
if (pMeterObj == NULL) {
dError("vid:%d sid:%d, no active table", vnode, sid);
vnodeSendMeterCfgMsg(vnode, sid);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
goto _submit_over;
}
if (pMeterObj->uid != pBlocks->uid) {
dError("vid:%d sid:%d, meterId:%s, uid:%lld, uid in msg:%lld, uid mismatch", vnode, sid, pMeterObj->meterId,
pMeterObj->uid, pBlocks->uid);
code = TSDB_CODE_INVALID_SUBMIT_MSG;
goto _submit_over;
}
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int sversion = htonl(pBlocks->sversion);
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
int32_t state = TSDB_METER_STATE_READY;
if (pSubmit->import) {
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
} else {
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
}
state = vnodeSetMeterState(pMeterObj, (pSubmit->import ? TSDB_METER_STATE_IMPORTING : TSDB_METER_STATE_INSERT));
if (state == TSDB_METER_STATE_READY) {
// meter status is ready for insert/import
if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import
if (pSubmit->import) {
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
pObj->numOfTotalPoints += numOfPoints;
if (code == TSDB_CODE_SUCCESS) pObj->count--;
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT);
numOfTotalPoints += numOfPoints;
}
if (code != TSDB_CODE_SUCCESS) {break;}
if (code != TSDB_CODE_SUCCESS) break;
} else {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
......@@ -600,15 +614,103 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
}
}
numOfTotalPoints += numOfPoints;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
}
_submit_over:
// for import, send the submit response only when return code is not zero
if (pSubmit->import == 0 || code != 0) ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints);
ret = 0;
if (pSubmit->import) { // Import case
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
SBatchImportInfo *pImportInfo =
(SBatchImportInfo *)calloc(1, sizeof(SBatchImportInfo) + msgLen - sizeof(SShellSubmitMsg));
if (pImportInfo == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
} else { // Start a timer to process the next part of request
pImportInfo->import = 1;
pImportInfo->vnode = pSubmit->vnode;
pImportInfo->numOfSid = pSubmit->numOfSid;
pImportInfo->ssid = i;
pImportInfo->pObj = pObj;
pImportInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
assert(pImportInfo->offset >= 0);
memcpy((void *)(pImportInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg));
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pImportInfo, vnodeTmrCtrl);
}
} else {
if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0);
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
}
} else { // Insert case
ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints);
}
atomic_fetch_add_32(&vnodeInsertReqNum, 1);
return ret;
}
static void vnodeProcessBatchImportTimer(void *param, void *tmrId) {
SBatchImportInfo *pImportInfo = (SBatchImportInfo *)param;
assert(pImportInfo != NULL && pImportInfo->import);
int32_t i = 0, numOfPoints = 0, numOfTotalPoints = 0;
int32_t code = TSDB_CODE_SUCCESS;
SShellObj * pShell = pImportInfo->pObj;
SVnodeObj * pVnode = &vnodeList[pImportInfo->vnode];
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pImportInfo->blks + pImportInfo->offset);
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
for (i = pImportInfo->ssid; i < pImportInfo->numOfSid; i++) {
numOfPoints = 0;
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
int32_t state = TSDB_METER_STATE_READY;
state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell,
sversion, &numOfPoints, now);
vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING);
pShell->numOfTotalPoints += numOfPoints;
if (code != TSDB_CODE_SUCCESS) break;
pShell->count--;
} else {
if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) {
dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId,
pMeterObj->state);
code = TSDB_CODE_NOT_ACTIVE_TABLE;
break;
} else { // waiting for 300ms by default and try again
dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid,
pMeterObj->meterId, pMeterObj->state);
code = TSDB_CODE_ACTION_IN_PROGRESS;
break;
}
}
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
}
int ret = 0;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
pImportInfo->ssid = i;
pImportInfo->offset = ((char *)pBlocks) - pImportInfo->blks;
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pImportInfo, vnodeTmrCtrl);
} else {
if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0);
tfree(param);
ret = vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册