From c0786acc6419ac4bd240f6211cb57560f5e8c2d5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 16 Jun 2020 15:36:15 +0000 Subject: [PATCH] [TD-570] fix bug while sdb sync --- src/mnode/src/mnodeSdb.c | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 1d428ab9a2..d8fc9aee5a 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -32,6 +32,7 @@ #include "mnodeSdb.h" #define SDB_TABLE_LEN 12 +#define SDB_SYNC_HACK 16 typedef enum { SDB_ACTION_INSERT, @@ -566,7 +567,7 @@ static int sdbWrite(void *param, void *data, int type) { // from app, oper is created if (pOper != NULL) { - sdbTrace("record from app is disposed, version:%" PRIu64 " code:%s", pHead->version, tstrerror(code)); + sdbTrace("record from app is disposed, version:%" PRIu64 " result:%s", pHead->version, tstrerror(code)); return code; } @@ -628,10 +629,10 @@ int32_t sdbInsertRow(SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } - int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize; + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; SSdbOper *pNewOper = taosAllocateQitem(size); - SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper); + SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; pHead->version = 0; pHead->len = pOper->rowSize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_INSERT; @@ -692,10 +693,10 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { return TSDB_CODE_MND_SDB_INVAID_KEY_TYPE; } - int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize; + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + keySize + SDB_SYNC_HACK; SSdbOper *pNewOper = taosAllocateQitem(size); - SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper); + SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; pHead->version = 0; pHead->len = keySize; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; @@ -738,10 +739,10 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } - int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize; + int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; SSdbOper *pNewOper = taosAllocateQitem(size); - SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper); + SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; pHead->version = 0; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_UPDATE; @@ -969,7 +970,7 @@ static void *sdbWorkerFp(void *param) { taosGetQitem(tsSdbWriteQall, &type, &item); if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - pHead = (void *)pOper + sizeof(SSdbOper); + pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK; } else { pHead = (SWalHead *)item; pOper = NULL; @@ -991,10 +992,10 @@ static void *sdbWorkerFp(void *param) { taosGetQitem(tsSdbWriteQall, &type, &item); if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - if (pOper->cb) { + if (pOper != NULL && pOper->cb != NULL) { pOper->retCode = (*pOper->cb)(pOper->pMsg, pOper->retCode); } - + if (pOper != NULL && pOper->pMsg != NULL) { sdbTrace("app:%p:%p, msg is processed, result:%s", pOper->pMsg->rpcMsg.ahandle, pOper->pMsg, tstrerror(pOper->retCode)); -- GitLab