diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index e635227eaa68f9f077deff60ea32eeae607146c3..10d01f12ec7db4fb9d1b13cc97fabed05129f772 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -61,16 +61,16 @@ extern "C" { } \ } -#define WAL_HEAD_VER 0 +#define WAL_HEAD_VER 0 #define WAL_NOSUFFIX_LEN 20 -#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) -#define WAL_LOG_SUFFIX "log" +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) +#define WAL_LOG_SUFFIX "log" #define WAL_INDEX_SUFFIX "idx" -#define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) -#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) -#define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_MAGIC 0xFAFBFCFDULL +#define WAL_REFRESH_MS 1000 +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) +#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) +#define WAL_FILE_LEN (WAL_PATH_LEN + 32) +#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_CUR_FAILED 1 diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 5f401c9b2bd2cb4ff871d04c0dabcc1dbf2d5f4c..bea2b66af8ad94a9129489c073940077d270c37b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -97,8 +97,8 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, } if (pRsp != NULL) { - pRsp->affectedRows = htonl(affectedrows); - pRsp->numOfRows = htonl(numOfRows); + pRsp->affectedRows = affectedrows; + pRsp->numOfRows = numOfRows; } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 395f715b8fd1d7e3ceabcdc11c62242d2d5edce3..87cef5304231b7f336205904013b9dcc9e56e2a2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -18,6 +18,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); +static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { SNodeMsg *pMsg; @@ -79,9 +80,10 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_SUBMIT: /*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ if (pVnode->config.streamMode == 0) { - if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { - // TODO: handle error - } + *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); + (*pRsp)->handle = pMsg->handle; + (*pRsp)->ahandle = pMsg->ahandle; + return vnodeProcessSubmitReq(pVnode, ptr, *pRsp); } break; case TDMT_VND_MQ_SET_CONN: { @@ -298,5 +300,25 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) { } taosMemoryFree(vAlterTbReq.dbFName); taosMemoryFree(vAlterTbReq.name); + return 0; +} + +static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp) { + SSubmitRsp rsp = {0}; + + pRsp->code = 0; + + // handle the request + if (tsdbInsertData(pVnode->pTsdb, pSubmitReq, &rsp) < 0) { + pRsp->code = terrno; + return -1; + } + + // encode the response (TODO) + pRsp->msgType = TDMT_VND_SUBMIT_RSP; + pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp)); + memcpy(pRsp->pCont, &rsp, sizeof(rsp)); + pRsp->contLen = sizeof(SSubmitRsp); + return 0; } \ No newline at end of file