提交 8ff091de 编写于 作者: T Tao Liu

[TD-324] add affected rows in the insert cmd result

上级 36973a50
...@@ -140,7 +140,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid); ...@@ -140,7 +140,7 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tid);
* *
* @return the number of points inserted, -1 for failure and the error number is set * @return the number of points inserted, -1 for failure and the error number is set
*/ */
int32_t tsdbInsertData(TsdbRepoT *pRepo, SSubmitMsg *pMsg); int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) ;
// -- FOR QUERY TIME SERIES DATA // -- FOR QUERY TIME SERIES DATA
......
...@@ -29,7 +29,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); ...@@ -29,7 +29,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
// static int tsdbOpenMetaFile(char *tsdbDir); // static int tsdbOpenMetaFile(char *tsdbDir);
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now); static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int * affectedrows);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitData(void *arg); static void * tsdbCommitData(void *arg);
...@@ -406,22 +406,23 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) { ...@@ -406,22 +406,23 @@ STableInfo *tsdbGetTableInfo(TsdbRepoT *pRepo, STableId tableId) {
} }
// TODO: need to return the number of data inserted // TODO: need to return the number of data inserted
int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg) { int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) {
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbInitSubmitMsgIter(pMsg, &msgIter); tsdbInitSubmitMsgIter(pMsg, &msgIter);
SSubmitBlk *pBlock = NULL; SSubmitBlk *pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t affectedrows = 0;
TSKEY now = taosGetTimestamp(pRepo->config.precision); TSKEY now = taosGetTimestamp(pRepo->config.precision);
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
if ((code = tsdbInsertDataToTable(repo, pBlock, now)) != TSDB_CODE_SUCCESS) { if ((code = tsdbInsertDataToTable(repo, pBlock, now, &affectedrows)) != TSDB_CODE_SUCCESS) {
return code; return code;
} }
} }
pRsp->affectedRows = htonl(affectedrows);
return code; return code;
} }
...@@ -846,7 +847,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable ...@@ -846,7 +847,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
return 0; return 0;
} }
static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now) { static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid}; STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
...@@ -875,6 +876,7 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY ...@@ -875,6 +876,7 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
if (tdInsertRowToTable(pRepo, row, pTable) < 0) { if (tdInsertRowToTable(pRepo, row, pTable) < 0) {
return -1; return -1;
} }
(*affectedrows)++;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -91,16 +91,15 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR ...@@ -91,16 +91,15 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
// save insert result into item // save insert result into item
vTrace("vgId:%d, submit msg is processed", pVnode->vgId); vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
code = tsdbInsertData(pVnode->tsdb, pCont);
pRet->len = sizeof(SShellSubmitRspMsg); pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len); pRet->rsp = rpcMallocCont(pRet->len);
SShellSubmitRspMsg *pRsp = pRet->rsp; SShellSubmitRspMsg *pRsp = pRet->rsp;
code = tsdbInsertData(pVnode->tsdb, pCont, pRsp);
pRsp->numOfFailedBlocks = 0; //TODO
//pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO
pRsp->code = 0; pRsp->code = 0;
pRsp->numOfRows = htonl(1); pRsp->numOfRows = htonl(1);
pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0;
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册