提交 16132ff9 编写于 作者: S Shengliang Guan

TD-1918

上级 a0880e0d
...@@ -63,11 +63,9 @@ int32_t vnodeClose(int32_t vgId); ...@@ -63,11 +63,9 @@ int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount void* vnodeAcquire(int32_t vgId); // add refcount
void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg); int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg);
int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite);
int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeCheckWrite(void *pVnode);
......
...@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall; ...@@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall;
static taos_queue tsSdbWriteQueue; static taos_queue tsSdbWriteQueue;
static SSdbWriteWorkerPool tsSdbPool; static SSdbWriteWorkerPool tsSdbPool;
static int sdbWrite(void *param, void *data, int type); static int32_t sdbWrite(void *param, void *data, int32_t type, void *pMsg);
static int sdbWriteToQueue(void *param, void *data, int type); static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg);
static void * sdbWorkerFp(void *param); static void * sdbWorkerFp(void *param);
static int32_t sdbInitWriteWorker(); static int32_t sdbInitWriteWorker();
static void sdbCleanupWriteWorker(); static void sdbCleanupWriteWorker();
...@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { ...@@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int sdbWrite(void *param, void *data, int type) { static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) {
SSdbOper *pOper = param; SSdbOper *pOper = param;
SWalHead *pHead = data; SWalHead *pHead = data;
int32_t tableId = pHead->msgType / 10; int32_t tableId = pHead->msgType / 10;
...@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() { ...@@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() {
tsSdbWriteQueue = NULL; tsSdbWriteQueue = NULL;
} }
int sdbWriteToQueue(void *param, void *data, int type) { int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) {
SWalHead *pHead = data; SWalHead *pHead = data;
int size = sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size); memcpy(pWal, pHead, size);
taosWriteQitem(tsSdbWriteQueue, type, pWal); taosWriteQitem(tsSdbWriteQueue, qtype, pWal);
return 0; return 0;
} }
...@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) {
pOper = NULL; pOper = NULL;
} }
int32_t code = sdbWrite(pOper, pHead, type); int32_t code = sdbWrite(pOper, pHead, type, NULL);
if (code > 0) code = 0; if (code > 0) code = 0;
if (pOper) { if (pOper) {
pOper->retCode = code; pOper->retCode = code;
......
...@@ -483,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) { ...@@ -483,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) {
return pVnode->rqueue; return pVnode->rqueue;
} }
void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL;
int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->wqueue;
}
void *vnodeGetWal(void *pVnode) { void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal; return ((SVnodeObj *)pVnode)->wal;
} }
......
...@@ -208,8 +208,9 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) ...@@ -208,8 +208,9 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg)
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam; SWalHead * pHead = wparam;
if (qtype == TAOS_QTYPE_RPC && vnodeCheckWrite(pVnode) != TSDB_CODE_SUCCESS) { if (qtype == TAOS_QTYPE_RPC) {
return TSDB_CODE_VND_INVALID_VGROUP_ID; int32_t code = vnodeCheckWrite(pVnode);
if (code != TSDB_CODE_SUCCESS) return code;
} }
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len; int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册