提交 c0786acc 编写于 作者: S Shengliang Guan

[TD-570] fix bug while sdb sync

上级 a561016f
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "mnodeSdb.h" #include "mnodeSdb.h"
#define SDB_TABLE_LEN 12 #define SDB_TABLE_LEN 12
#define SDB_SYNC_HACK 16
typedef enum { typedef enum {
SDB_ACTION_INSERT, SDB_ACTION_INSERT,
...@@ -566,7 +567,7 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -566,7 +567,7 @@ static int sdbWrite(void *param, void *data, int type) {
// from app, oper is created // from app, oper is created
if (pOper != NULL) { if (pOper != NULL) {
sdbTrace("record from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code)); sdbTrace("record from app is disposed, version:%" PRIu64 " result:%s", pHead->version, tstrerror(code));
return code; return code;
} }
...@@ -628,10 +629,10 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -628,10 +629,10 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize; int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size); SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper); SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0; pHead->version = 0;
pHead->len = pOper->rowSize; pHead->len = pOper->rowSize;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT;
...@@ -692,10 +693,10 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -692,10 +693,10 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE; return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE;
} }
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize; int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size); SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper); SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0; pHead->version = 0;
pHead->len = keySize; pHead->len = keySize;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
...@@ -738,10 +739,10 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -738,10 +739,10 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize; int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size); SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper); SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0; pHead->version = 0;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE;
...@@ -969,7 +970,7 @@ static void *sdbWorkerFp(void *param) { ...@@ -969,7 +970,7 @@ static void *sdbWorkerFp(void *param) {
taosGetQitem(tsSdbWriteQall, &type, &item); taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
pHead = (void *)pOper + sizeof(SSdbOper); pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
pOper = NULL; pOper = NULL;
...@@ -991,10 +992,10 @@ static void *sdbWorkerFp(void *param) { ...@@ -991,10 +992,10 @@ static void *sdbWorkerFp(void *param) {
taosGetQitem(tsSdbWriteQall, &type, &item); taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item; pOper = (SSdbOper *)item;
if (pOper->cb) { if (pOper != NULL && pOper->cb != NULL) {
pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode);
} }
if (pOper != NULL && pOper->pMsg != NULL) { if (pOper != NULL && pOper->pMsg != NULL) {
sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg,
tstrerror(pOper->retCode)); tstrerror(pOper->retCode));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册