From 528b601287b785bb09190ec7c87ae42dcc4a24fb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 23 Nov 2019 17:07:43 +0800 Subject: [PATCH] Fix TBASE-1114 --- src/system/detail/src/vnodeImport.c | 115 ++--------------- src/system/detail/src/vnodeShell.c | 186 +++++++++++++++++++++------- 2 files changed, 153 insertions(+), 148 deletions(-) diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 609422e5d6..94de72f86e 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -156,83 +156,13 @@ void vnodeGetValidDataRange(int vnode, TSKEY now, TSKEY *minKey, TSKEY *maxKey) return; } -void vnodeProcessImportTimer(void *param, void *tmrId) { - SImportInfo *pImport = (SImportInfo *)param; - if (pImport == NULL || pImport->signature != param) { - dError("import timer is messed up, signature:%p", pImport); - return; - } - - SMeterObj * pObj = pImport->pObj; - SVnodeObj * pVnode = &vnodeList[pObj->vnode]; - SCachePool *pPool = (SCachePool *)pVnode->pCachePool; - SShellObj * pShell = pImport->pShell; - - pImport->retry++; - - // slow query will block the import operation - int32_t state = vnodeSetMeterState(pObj, TSDB_METER_STATE_IMPORTING); - if (state >= TSDB_METER_STATE_DELETING) { - dError("vid:%d sid:%d id:%s, meter is deleted, failed to import, state:%d", pObj->vnode, pObj->sid, pObj->meterId, - state); - return; - } - - int32_t num = 0; - pthread_mutex_lock(&pVnode->vmutex); - num = pObj->numOfQueries; - pthread_mutex_unlock(&pVnode->vmutex); - - // if the num == 0, it will never be increased before state is set to TSDB_METER_STATE_READY - int32_t commitInProcess = 0; - pthread_mutex_lock(&pPool->vmutex); - if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0 || state != TSDB_METER_STATE_READY) { - pthread_mutex_unlock(&pPool->vmutex); - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - - if (pImport->retry < 1000) { - dTrace( - "vid:%d sid:%d id:%s, import failed, retry later. commit in process or queries on it, or not ready." - "commitInProcess:%d, numOfQueries:%d, state:%d", - pObj->vnode, pObj->sid, pObj->meterId, commitInProcess, num, state); - - taosTmrStart(vnodeProcessImportTimer, 10, pImport, vnodeTmrCtrl); - return; - } else { - pShell->code = TSDB_CODE_TOO_SLOW; - } - } else { - pPool->commitInProcess = 1; - pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, pImport); - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += pImport->importedRows; - } - } - - vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); - - pVnode->version++; - - // send response back to shell - if (pShell) { - pShell->count--; - if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pImport->pShell, pShell->code, pShell->numOfTotalPoints); - } - - pImport->signature = NULL; - free(pImport->opayload); - free(pImport); -} - int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *param, int sversion, int *pNumOfPoints, TSKEY now) { SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SVnodeObj * pVnode = vnodeList + pObj->vnode; int rows = 0; char * payload = NULL; - int code = TSDB_CODE_ACTION_IN_PROGRESS; + int code = TSDB_CODE_SUCCESS; SCachePool *pPool = (SCachePool *)(pVnode->pCachePool); SShellObj * pShell = (SShellObj *)param; int pointsImported = 0; @@ -243,14 +173,10 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi if (firstKey > pObj->lastKey) { // Just call insert vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); + // TODO: Here may fail to set the state, add error handling. vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); - code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, &pointsImported, now); - - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += pointsImported; - } - + code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now); + // TODO: outside clear state function is invalid for this structure vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); } else { // trigger import { @@ -290,7 +216,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } } - SImportInfo *pNew, import; + SImportInfo import; dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld", pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, pObj->lastKey); @@ -315,40 +241,17 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi if (((commitInProcess = pPool->commitInProcess) == 1) || num > 0) { // mutual exclusion with read (need to change here) pthread_mutex_unlock(&pPool->vmutex); - - pNew = (SImportInfo *)malloc(sizeof(SImportInfo)); - memcpy(pNew, &import, sizeof(SImportInfo)); - pNew->signature = pNew; - int payloadLen = contLen - sizeof(SSubmitMsg); - pNew->payload = malloc(payloadLen); - pNew->opayload = pNew->payload; - memcpy(pNew->payload, payload, payloadLen); - - dTrace("vid:%d sid:%d id:%s, import later, commit in process:%d, numOfQueries:%d", pObj->vnode, pObj->sid, - pObj->meterId, commitInProcess, pObj->numOfQueries); - - taosTmrStart(vnodeProcessImportTimer, 10, pNew, vnodeTmrCtrl); - return 0; + return TSDB_CODE_ACTION_IN_PROGRESS; } else { pPool->commitInProcess = 1; pthread_mutex_unlock(&pPool->vmutex); int code = vnodeImportData(pObj, &import); - if (pShell) { - pShell->code = code; - pShell->numOfTotalPoints += import.importedRows; - } + *pNumOfPoints = import.importedRows; } + pVnode->version++; } - // How about the retry? Will this also cause vnode version++? - pVnode->version++; - - if (pShell) { - pShell->count--; - if (pShell->count <= 0) vnodeSendShellSubmitRspMsg(pShell, pShell->code, pShell->numOfTotalPoints); - } - - return 0; + return code; } /* Function to search keys in a range diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index a5f5259887..fcf4cdd231 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -38,10 +38,21 @@ SShellObj **shellList = NULL; int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj); int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj); int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj); +static void vnodeProcessBatchImportTimer(void *param, void *tmrId); int vnodeSelectReqNum = 0; int vnodeInsertReqNum = 0; +typedef struct { + int32_t import; + int32_t vnode; + int32_t numOfSid; + int32_t ssid; // Start sid + SShellObj *pObj; + int64_t offset; // offset relative the blks + char blks[]; +} SBatchImportInfo; + void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { int sid, vnode; SShellObj *pObj = (SShellObj *)ahandle; @@ -242,6 +253,7 @@ int vnodeSendShellSubmitRspMsg(SShellObj *pObj, int code, int numOfPoints) { char *pMsg, *pStart; int msgLen; + dTrace("code:%d numOfTotalPoints:%d", code, numOfPoints); pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_SUBMIT_RSP, 128); if (pStart == NULL) return -1; pMsg = pStart; @@ -273,6 +285,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { } if (pQueryMsg->numOfSids <= 0) { + dError("Invalid number of meters to query, numOfSids:%d", pQueryMsg->numOfSids); code = TSDB_CODE_INVALID_QUERY_MSG; goto _query_over; } @@ -482,10 +495,37 @@ int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj) { return msgLen; } +static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *pVnode) { + int32_t sid = htonl(pBlocks->sid); + uint64_t uid = htobe64(pBlocks->uid); + + if (sid >= pVnode->cfg.maxSessions || sid <= 0) { + dError("sid:%d is out of range", sid); + return TSDB_CODE_INVALID_TABLE_ID; + } + + SMeterObj *pMeterObj = pVnode->meterList[sid]; + if (pMeterObj == NULL) { + dError("vid:%d sid:%d, no active table", pVnode->vnode, sid); + vnodeSendMeterCfgMsg(pVnode->vnode, sid); + return TSDB_CODE_NOT_ACTIVE_TABLE; + } + + if (pMeterObj->uid != uid) { + dError("vid:%d sid:%d, meterId:%s, uid:%lld, uid in msg:%lld, uid mismatch", pVnode->vnode, sid, pMeterObj->meterId, + pMeterObj->uid, uid); + return TSDB_CODE_INVALID_SUBMIT_MSG; + } + + return TSDB_CODE_SUCCESS; +} + int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int code = 0, ret = 0; + int32_t i = 0; SShellSubmitMsg shellSubmit = *(SShellSubmitMsg *)pMsg; SShellSubmitMsg *pSubmit = &shellSubmit; + SShellSubmitBlock *pBlocks = NULL; pSubmit->vnode = htons(pSubmit->vnode); pSubmit->numOfSid = htonl(pSubmit->numOfSid); @@ -524,67 +564,41 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { pObj->count = pSubmit->numOfSid; // for import pObj->code = 0; // for import pObj->numOfTotalPoints = 0; // for import - SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg)); int32_t numOfPoints = 0; int32_t numOfTotalPoints = 0; // We take current time here to avoid it in the for loop. TSKEY now = taosGetTimestamp(pVnode->cfg.precision); - for (int32_t i = 0; i < pSubmit->numOfSid; ++i) { + pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg)); + for (i = 0; i < pSubmit->numOfSid; ++i) { numOfPoints = 0; - pBlocks->sid = htonl(pBlocks->sid); - pBlocks->uid = htobe64(pBlocks->uid); - - if (pBlocks->sid >= pVnode->cfg.maxSessions || pBlocks->sid <= 0) { - dTrace("sid:%d is out of range", pBlocks->sid); - code = TSDB_CODE_INVALID_TABLE_ID; - goto _submit_over; - } - - int vnode = pSubmit->vnode; - int sid = pBlocks->sid; - - SMeterObj *pMeterObj = vnodeList[vnode].meterList[sid]; - if (pMeterObj == NULL) { - dError("vid:%d sid:%d, no active table", vnode, sid); - vnodeSendMeterCfgMsg(vnode, sid); - code = TSDB_CODE_NOT_ACTIVE_TABLE; - goto _submit_over; - } - - if (pMeterObj->uid != pBlocks->uid) { - dError("vid:%d sid:%d, meterId:%s, uid:%lld, uid in msg:%lld, uid mismatch", vnode, sid, pMeterObj->meterId, - pMeterObj->uid, pBlocks->uid); - code = TSDB_CODE_INVALID_SUBMIT_MSG; - goto _submit_over; - } + code = vnodeCheckSubmitBlockContext(pBlocks, pVnode); + if (code != TSDB_CODE_SUCCESS) break; + SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]); // dont include sid, vid - int subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; - int sversion = htonl(pBlocks->sversion); + int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; + int32_t sversion = htonl(pBlocks->sversion); int32_t state = TSDB_METER_STATE_READY; - if (pSubmit->import) { - state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); - } else { - state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_INSERT); - } + state = vnodeSetMeterState(pMeterObj, (pSubmit->import ? TSDB_METER_STATE_IMPORTING : TSDB_METER_STATE_INSERT)); - if (state == TSDB_METER_STATE_READY) { - // meter status is ready for insert/import + if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import if (pSubmit->import) { code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, sversion, &numOfPoints, now); vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); + pObj->numOfTotalPoints += numOfPoints; + if (code == TSDB_CODE_SUCCESS) pObj->count--; } else { code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, sversion, &numOfPoints, now); vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_INSERT); + numOfTotalPoints += numOfPoints; } - - if (code != TSDB_CODE_SUCCESS) {break;} + if (code != TSDB_CODE_SUCCESS) break; } else { if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) { dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, @@ -600,15 +614,103 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { } } - numOfTotalPoints += numOfPoints; pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); } _submit_over: - // for import, send the submit response only when return code is not zero - if (pSubmit->import == 0 || code != 0) ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints); + ret = 0; + if (pSubmit->import) { // Import case + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + + SBatchImportInfo *pImportInfo = + (SBatchImportInfo *)calloc(1, sizeof(SBatchImportInfo) + msgLen - sizeof(SShellSubmitMsg)); + if (pImportInfo == NULL) { + code = TSDB_CODE_SERV_OUT_OF_MEMORY; + ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); + } else { // Start a timer to process the next part of request + pImportInfo->import = 1; + pImportInfo->vnode = pSubmit->vnode; + pImportInfo->numOfSid = pSubmit->numOfSid; + pImportInfo->ssid = i; + pImportInfo->pObj = pObj; + pImportInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg)); + assert(pImportInfo->offset >= 0); + memcpy((void *)(pImportInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg)); + taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pImportInfo, vnodeTmrCtrl); + } + } else { + if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0); + ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); + } + } else { // Insert case + ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints); + } atomic_fetch_add_32(&vnodeInsertReqNum, 1); return ret; } + +static void vnodeProcessBatchImportTimer(void *param, void *tmrId) { + SBatchImportInfo *pImportInfo = (SBatchImportInfo *)param; + assert(pImportInfo != NULL && pImportInfo->import); + + int32_t i = 0, numOfPoints = 0, numOfTotalPoints = 0; + int32_t code = TSDB_CODE_SUCCESS; + + SShellObj * pShell = pImportInfo->pObj; + SVnodeObj * pVnode = &vnodeList[pImportInfo->vnode]; + SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pImportInfo->blks + pImportInfo->offset); + TSKEY now = taosGetTimestamp(pVnode->cfg.precision); + + for (i = pImportInfo->ssid; i < pImportInfo->numOfSid; i++) { + numOfPoints = 0; + + code = vnodeCheckSubmitBlockContext(pBlocks, pVnode); + if (code != TSDB_CODE_SUCCESS) break; + + SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]); + // dont include sid, vid + int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; + int32_t sversion = htonl(pBlocks->sversion); + + int32_t state = TSDB_METER_STATE_READY; + state = vnodeSetMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); + + if (state == TSDB_METER_STATE_READY) { // meter status is ready for insert/import + code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell, + sversion, &numOfPoints, now); + vnodeClearMeterState(pMeterObj, TSDB_METER_STATE_IMPORTING); + pShell->numOfTotalPoints += numOfPoints; + if (code != TSDB_CODE_SUCCESS) break; + pShell->count--; + } else { + if (vnodeIsMeterState(pMeterObj, TSDB_METER_STATE_DELETING)) { + dTrace("vid:%d sid:%d id:%s, it is removed, state:%d", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, + pMeterObj->state); + code = TSDB_CODE_NOT_ACTIVE_TABLE; + break; + } else { // waiting for 300ms by default and try again + dTrace("vid:%d sid:%d id:%s, try submit again since in state:%d", pMeterObj->vnode, pMeterObj->sid, + pMeterObj->meterId, pMeterObj->state); + + code = TSDB_CODE_ACTION_IN_PROGRESS; + break; + } + } + + pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); + } + + int ret = 0; + if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + pImportInfo->ssid = i; + pImportInfo->offset = ((char *)pBlocks) - pImportInfo->blks; + taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pImportInfo, vnodeTmrCtrl); + } else { + if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0); + tfree(param); + ret = vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints); + } +} -- GitLab