diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index b1a9dbd3ef839bc0209cf27702f07924a5aa729c..8d33b4350363e4ed9dc0ae8d75ae261dd788f8d5 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -39,7 +39,7 @@ SShellObj **shellList = NULL; int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj); int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj); int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj); -static void vnodeProcessBatchImportTimer(void *param, void *tmrId); +static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId); int vnodeSelectReqNum = 0; int vnodeInsertReqNum = 0; @@ -523,6 +523,52 @@ static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *p return TSDB_CODE_SUCCESS; } +static int vnodeDoSubmitJob(SVnodeObj *pVnode, int import, int32_t *ssid, int32_t esid, SShellSubmitBlock **ppBlocks, + TSKEY now, SShellObj *pObj) { + SShellSubmitBlock *pBlocks = *ppBlocks; + int code = TSDB_CODE_SUCCESS; + int32_t numOfPoints = 0; + int32_t i = 0; + + for (i = *ssid; i < esid; i++) { + numOfPoints = 0; + + code = vnodeCheckSubmitBlockContext(pBlocks, pVnode); + if (code != TSDB_CODE_SUCCESS) break; + + SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]); + + // dont include sid, vid + int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; + int32_t sversion = htonl(pBlocks->sversion); + + if (import) { + code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, + sversion, &numOfPoints, now); + pObj->numOfTotalPoints += numOfPoints; + + // records for one table should be consecutive located in the payload buffer, which is guaranteed by client + if (code == TSDB_CODE_SUCCESS) { + pObj->count--; + } + } else { + code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, + sversion, &numOfPoints, now); + pObj->numOfTotalPoints += numOfPoints; + } + + if (code != TSDB_CODE_SUCCESS) break; + + pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); + } + + *ssid = i; + *ppBlocks = pBlocks; + + return code; +} + int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { int code = 0, ret = 0; int32_t i = 0; @@ -566,47 +612,13 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) { pObj->count = pSubmit->numOfSid; // for import pObj->code = 0; // for import - pObj->numOfTotalPoints = 0; // for import + pObj->numOfTotalPoints = 0; - int32_t numOfPoints = 0; - int32_t numOfTotalPoints = 0; - // We take current time here to avoid it in the for loop. TSKEY now = taosGetTimestamp(pVnode->cfg.precision); pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg)); - for (i = 0; i < pSubmit->numOfSid; ++i) { - numOfPoints = 0; - - code = vnodeCheckSubmitBlockContext(pBlocks, pVnode); - if (code != TSDB_CODE_SUCCESS) break; - - SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]); - - // dont include sid, vid - int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; - int32_t sversion = htonl(pBlocks->sversion); - - if (pSubmit->import) { - dTrace("start to import data"); - code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj, - sversion, &numOfPoints, now); - pObj->numOfTotalPoints += numOfPoints; - - //records for one table should be consecutive located in the payload buffer, which is guaranteed by client - if (code == TSDB_CODE_SUCCESS) { - pObj->count--; - } - } else { - code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL, - sversion, &numOfPoints, now); - numOfTotalPoints += numOfPoints; - } - - if (code != TSDB_CODE_SUCCESS) break; - - pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + - htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); - } + i = 0; + code = vnodeDoSubmitJob(pVnode, pSubmit->import, &i, pSubmit->numOfSid, &pBlocks, now, pObj); _submit_over: ret = 0; @@ -627,61 +639,42 @@ _submit_over: pSubmitInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg)); assert(pSubmitInfo->offset >= 0); memcpy((void *)(pSubmitInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg)); - taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl); + taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl); } } else { if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0); ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); } } else { // Insert case - ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints); + ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints); } atomic_fetch_add_32(&vnodeInsertReqNum, 1); return ret; } -static void vnodeProcessBatchImportTimer(void *param, void *tmrId) { +static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) { SBatchSubmitInfo *pSubmitInfo = (SBatchSubmitInfo *)param; assert(pSubmitInfo != NULL && pSubmitInfo->import); - int32_t i = 0, numOfPoints = 0; + int32_t i = 0; int32_t code = TSDB_CODE_SUCCESS; SShellObj * pShell = pSubmitInfo->pObj; SVnodeObj * pVnode = &vnodeList[pSubmitInfo->vnode]; SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pSubmitInfo->blks + pSubmitInfo->offset); TSKEY now = taosGetTimestamp(pVnode->cfg.precision); + i = pSubmitInfo->ssid; - for (i = pSubmitInfo->ssid; i < pSubmitInfo->numOfSid; i++) { - numOfPoints = 0; - - code = vnodeCheckSubmitBlockContext(pBlocks, pVnode); - if (code != TSDB_CODE_SUCCESS) break; - - SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]); - // dont include sid, vid - int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint; - int32_t sversion = htonl(pBlocks->sversion); - - code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell, - sversion, &numOfPoints, now); - pShell->numOfTotalPoints += numOfPoints; - if (code != TSDB_CODE_SUCCESS) break; - pShell->count--; - - pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) + - htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint); - } + code = vnodeDoSubmitJob(pVnode, pSubmitInfo->import, &i, pSubmitInfo->numOfSid, &pBlocks, now, pShell); - int ret = 0; if (code == TSDB_CODE_ACTION_IN_PROGRESS) { pSubmitInfo->ssid = i; pSubmitInfo->offset = ((char *)pBlocks) - pSubmitInfo->blks; - taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl); + taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl); } else { if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0); tfree(param); - ret = vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints); + vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints); } }