diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 32e35416927d144cdf3dabaa51e29ada6d390ac9..ee851ca0643587e876dd118a9dbaf991a3995a06 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -140,7 +140,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid); * * @return the number of points inserted, -1 for failure and the error number is set */ -int32_t tsdbInsertData(TsdbRepoT *pRepo, SSubmitMsg *pMsg); +int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) ; // -- FOR QUERY TIME SERIES DATA diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c85f5cc8387628a3e5bad709a347f194a4bd272c..f1bddadabe614bcc1a8edd9d9a85313ec9604942 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -29,7 +29,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); // static int tsdbOpenMetaFile(char *tsdbDir); -static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now); +static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int * affectedrows); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); @@ -406,22 +406,23 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) { } // TODO: need to return the number of data inserted -int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg) { +int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) { SSubmitMsgIter msgIter; STsdbRepo *pRepo = (STsdbRepo *)repo; tsdbInitSubmitMsgIter(pMsg, &msgIter); SSubmitBlk *pBlock = NULL; int32_t code = TSDB_CODE_SUCCESS; + int32_t affectedrows = 0; TSKEY now = taosGetTimestamp(pRepo->config.precision); while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { - if ((code = tsdbInsertDataToTable(repo, pBlock, now)) != TSDB_CODE_SUCCESS) { + if ((code = tsdbInsertDataToTable(repo, pBlock, now, &affectedrows)) != TSDB_CODE_SUCCESS) { return code; } } - + pRsp->affectedRows = htonl(affectedrows); return code; } @@ -846,7 +847,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable return 0; } -static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now) { +static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) { STsdbRepo *pRepo = (STsdbRepo *)repo; STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; @@ -875,6 +876,7 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY if (tdInsertRowToTable(pRepo, row, pTable) < 0) { return -1; } + (*affectedrows)++; } return TSDB_CODE_SUCCESS; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index ec0a3b2f0b55e544aefd26ffa6588a1015023c68..635c4669782114fa34c420bd9e7d752fc5f24828 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -91,17 +91,16 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR // save insert result into item vTrace("vgId:%d, submit msg is processed", pVnode->vgId); - code = tsdbInsertData(pVnode->tsdb, pCont); - + pRet->len = sizeof(SShellSubmitRspMsg); pRet->rsp = rpcMallocCont(pRet->len); SShellSubmitRspMsg *pRsp = pRet->rsp; - + code = tsdbInsertData(pVnode->tsdb, pCont, pRsp); + pRsp->numOfFailedBlocks = 0; //TODO + //pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO pRsp->code = 0; pRsp->numOfRows = htonl(1); - pRsp->affectedRows = htonl(1); - pRsp->numOfFailedBlocks = 0; - + return code; }