提交 cc2dfbf5 编写于 作者: J Jeff Tao

add code for write response

上级 ff41901d
...@@ -20,6 +20,11 @@ ...@@ -20,6 +20,11 @@
extern "C" { extern "C" {
#endif #endif
typedef struct {
int len;
void *rsp;
} SRspRet;
int32_t vnodeInitWrite(); int32_t vnodeInitWrite();
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
......
...@@ -289,7 +289,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { ...@@ -289,7 +289,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode } else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg); dnodeProcessReadResult(pVnode, pMsg);
dnodeReleaseVnode(pVnode); vnodeRelease(pVnode);
} }
} }
......
...@@ -34,6 +34,7 @@ typedef struct { ...@@ -34,6 +34,7 @@ typedef struct {
} SWriteWorker; } SWriteWorker;
typedef struct { typedef struct {
SRspRet rspRet;
void *pCont; void *pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
...@@ -148,8 +149,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -148,8 +149,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pWrite->rpcMsg.handle, .handle = pWrite->rpcMsg.handle,
.pCont = NULL, .pCont = pWrite->rspRet.rsp,
.contLen = 0, .contLen = pWrite->rspRet.len,
.code = code, .code = code,
}; };
......
...@@ -29,11 +29,11 @@ ...@@ -29,11 +29,11 @@
#include "vnodeInt.h" #include "vnodeInt.h"
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*); static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*);
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, void *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, void *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, void *); static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, void *); static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, void *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
int32_t vnodeInitWrite() { int32_t vnodeInitWrite() {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
...@@ -49,6 +49,9 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { ...@@ -49,6 +49,9 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING) if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING)
return TSDB_CODE_NOT_ACTIVE_VNODE; return TSDB_CODE_NOT_ACTIVE_VNODE;
...@@ -73,7 +76,7 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { ...@@ -73,7 +76,7 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) {
// write into WAL // write into WAL
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if ( code < 0) return code; if ( code < 0) return code;
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
if (code < 0) return code; if (code < 0) return code;
...@@ -86,7 +89,7 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { ...@@ -86,7 +89,7 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) {
return code; return code;
} }
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, void *item) { static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = 0; int32_t code = 0;
// save insert result into item // save insert result into item
...@@ -94,10 +97,19 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, void *item) ...@@ -94,10 +97,19 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, void *item)
dTrace("pVnode:%p vgId:%d, submit msg is processed", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, submit msg is processed", pVnode, pVnode->vgId);
code = tsdbInsertData(pVnode->tsdb, pCont); code = tsdbInsertData(pVnode->tsdb, pCont);
pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len);
SShellSubmitRspMsg *pRsp = pRet->rsp;
pRsp->code = 0;
pRsp->numOfRows = htonl(1);
pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0;
return code; return code;
} }
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont; SMDCreateTableMsg *pTable = pCont;
int32_t code = 0; int32_t code = 0;
...@@ -153,7 +165,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, void * ...@@ -153,7 +165,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, void *
return code; return code;
} }
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDDropTableMsg *pTable = pCont; SMDDropTableMsg *pTable = pCont;
int32_t code = 0; int32_t code = 0;
...@@ -168,7 +180,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, void *it ...@@ -168,7 +180,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, void *it
return code; return code;
} }
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDCreateTableMsg *pTable = pCont; SMDCreateTableMsg *pTable = pCont;
int32_t code = 0; int32_t code = 0;
...@@ -223,7 +235,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, void *i ...@@ -223,7 +235,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, void *i
return code; return code;
} }
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, void *item) { static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
SMDDropSTableMsg *pTable = pCont; SMDDropSTableMsg *pTable = pCont;
int32_t code = 0; int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册