提交 56bb3121 编写于 作者: H Hongze Cheng

refactor part of code

上级 6fabe650
......@@ -39,7 +39,7 @@ SShellObj **shellList = NULL;
int vnodeProcessRetrieveRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj);
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj);
static void vnodeProcessBatchImportTimer(void *param, void *tmrId);
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId);
int vnodeSelectReqNum = 0;
int vnodeInsertReqNum = 0;
......@@ -523,6 +523,52 @@ static int vnodeCheckSubmitBlockContext(SShellSubmitBlock *pBlocks, SVnodeObj *p
return TSDB_CODE_SUCCESS;
}
static int vnodeDoSubmitJob(SVnodeObj *pVnode, int import, int32_t *ssid, int32_t esid, SShellSubmitBlock **ppBlocks,
TSKEY now, SShellObj *pObj) {
SShellSubmitBlock *pBlocks = *ppBlocks;
int code = TSDB_CODE_SUCCESS;
int32_t numOfPoints = 0;
int32_t i = 0;
for (i = *ssid; i < esid; i++) {
numOfPoints = 0;
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
if (import) {
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
// records for one table should be consecutive located in the payload buffer, which is guaranteed by client
if (code == TSDB_CODE_SUCCESS) {
pObj->count--;
}
} else {
code = vnodeInsertPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
}
if (code != TSDB_CODE_SUCCESS) break;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
}
*ssid = i;
*ppBlocks = pBlocks;
return code;
}
int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
int code = 0, ret = 0;
int32_t i = 0;
......@@ -566,47 +612,13 @@ int vnodeProcessShellSubmitRequest(char *pMsg, int msgLen, SShellObj *pObj) {
pObj->count = pSubmit->numOfSid; // for import
pObj->code = 0; // for import
pObj->numOfTotalPoints = 0; // for import
pObj->numOfTotalPoints = 0;
int32_t numOfPoints = 0;
int32_t numOfTotalPoints = 0;
// We take current time here to avoid it in the for loop.
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
pBlocks = (SShellSubmitBlock *)(pMsg + sizeof(SShellSubmitMsg));
for (i = 0; i < pSubmit->numOfSid; ++i) {
numOfPoints = 0;
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
if (pSubmit->import) {
dTrace("start to import data");
code = vnodeImportPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pObj,
sversion, &numOfPoints, now);
pObj->numOfTotalPoints += numOfPoints;
//records for one table should be consecutive located in the payload buffer, which is guaranteed by client
if (code == TSDB_CODE_SUCCESS) {
pObj->count--;
}
} else {
code = vnodeInsertPoints(pMeterObj, (char *) &(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, NULL,
sversion, &numOfPoints, now);
numOfTotalPoints += numOfPoints;
}
if (code != TSDB_CODE_SUCCESS) break;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
}
i = 0;
code = vnodeDoSubmitJob(pVnode, pSubmit->import, &i, pSubmit->numOfSid, &pBlocks, now, pObj);
_submit_over:
ret = 0;
......@@ -627,61 +639,42 @@ _submit_over:
pSubmitInfo->offset = ((char *)pBlocks) - (pMsg + sizeof(SShellSubmitMsg));
assert(pSubmitInfo->offset >= 0);
memcpy((void *)(pSubmitInfo->blks), (void *)(pMsg + sizeof(SShellSubmitMsg)), msgLen - sizeof(SShellSubmitMsg));
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
}
} else {
if (code == TSDB_CODE_SUCCESS) assert(pObj->count == 0);
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
}
} else { // Insert case
ret = vnodeSendShellSubmitRspMsg(pObj, code, numOfTotalPoints);
ret = vnodeSendShellSubmitRspMsg(pObj, code, pObj->numOfTotalPoints);
}
atomic_fetch_add_32(&vnodeInsertReqNum, 1);
return ret;
}
static void vnodeProcessBatchImportTimer(void *param, void *tmrId) {
static void vnodeProcessBatchSubmitTimer(void *param, void *tmrId) {
SBatchSubmitInfo *pSubmitInfo = (SBatchSubmitInfo *)param;
assert(pSubmitInfo != NULL && pSubmitInfo->import);
int32_t i = 0, numOfPoints = 0;
int32_t i = 0;
int32_t code = TSDB_CODE_SUCCESS;
SShellObj * pShell = pSubmitInfo->pObj;
SVnodeObj * pVnode = &vnodeList[pSubmitInfo->vnode];
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pSubmitInfo->blks + pSubmitInfo->offset);
TSKEY now = taosGetTimestamp(pVnode->cfg.precision);
i = pSubmitInfo->ssid;
for (i = pSubmitInfo->ssid; i < pSubmitInfo->numOfSid; i++) {
numOfPoints = 0;
code = vnodeCheckSubmitBlockContext(pBlocks, pVnode);
if (code != TSDB_CODE_SUCCESS) break;
SMeterObj *pMeterObj = (SMeterObj *)(pVnode->meterList[htonl(pBlocks->sid)]);
// dont include sid, vid
int32_t subMsgLen = sizeof(pBlocks->numOfRows) + htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint;
int32_t sversion = htonl(pBlocks->sversion);
code = vnodeImportPoints(pMeterObj, (char *)&(pBlocks->numOfRows), subMsgLen, TSDB_DATA_SOURCE_SHELL, pShell,
sversion, &numOfPoints, now);
pShell->numOfTotalPoints += numOfPoints;
if (code != TSDB_CODE_SUCCESS) break;
pShell->count--;
pBlocks = (SShellSubmitBlock *)((char *)pBlocks + sizeof(SShellSubmitBlock) +
htons(pBlocks->numOfRows) * pMeterObj->bytesPerPoint);
}
code = vnodeDoSubmitJob(pVnode, pSubmitInfo->import, &i, pSubmitInfo->numOfSid, &pBlocks, now, pShell);
int ret = 0;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
pSubmitInfo->ssid = i;
pSubmitInfo->offset = ((char *)pBlocks) - pSubmitInfo->blks;
taosTmrStart(vnodeProcessBatchImportTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
taosTmrStart(vnodeProcessBatchSubmitTimer, 10, (void *)pSubmitInfo, vnodeTmrCtrl);
} else {
if (code == TSDB_CODE_SUCCESS) assert(pShell->count == 0);
tfree(param);
ret = vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints);
vnodeSendShellSubmitRspMsg(pShell, code, pShell->numOfTotalPoints);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册