提交 3d972f81 编写于 作者: L Liu Jicong

refacor(wal): rename len to bodyLen

上级 d16a2e27
...@@ -491,7 +491,7 @@ typedef struct { ...@@ -491,7 +491,7 @@ typedef struct {
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char char
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
int8_t precision; int8_t precision;
int64_t interval; int64_t interval;
int64_t sliding; int64_t sliding;
...@@ -651,7 +651,6 @@ typedef struct SQueryNodeAddr { ...@@ -651,7 +651,6 @@ typedef struct SQueryNodeAddr {
SEpSet epSet; SEpSet epSet;
} SQueryNodeAddr; } SQueryNodeAddr;
typedef struct { typedef struct {
SArray* pArray; // Array of SUseDbRsp SArray* pArray; // Array of SUseDbRsp
} SUseDbBatchRsp; } SUseDbBatchRsp;
...@@ -724,7 +723,7 @@ typedef struct { ...@@ -724,7 +723,7 @@ typedef struct {
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
void tFreeSFuncInfo(SFuncInfo *pInfo); void tFreeSFuncInfo(SFuncInfo* pInfo);
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp); void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
typedef struct { typedef struct {
...@@ -1289,14 +1288,14 @@ typedef struct { ...@@ -1289,14 +1288,14 @@ typedef struct {
} SMVCreateStreamRsp, SMSCreateStreamRsp; } SMVCreateStreamRsp, SMSCreateStreamRsp;
typedef struct { typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
int8_t igExists; int8_t igExists;
int8_t withTbName; int8_t withTbName;
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
char* sql; char* sql;
char* ast; char* ast;
char subscribeDbName[TSDB_DB_NAME_LEN]; char subscribeDbName[TSDB_DB_NAME_LEN];
} SCMCreateTopicReq; } SCMCreateTopicReq;
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
......
...@@ -92,7 +92,7 @@ typedef struct SWalReadHead { ...@@ -92,7 +92,7 @@ typedef struct SWalReadHead {
int8_t headVer; int8_t headVer;
int8_t reserved; int8_t reserved;
int16_t msgType; int16_t msgType;
int32_t len; int32_t bodyLen;
int64_t ingestTs; // not implemented int64_t ingestTs; // not implemented
int64_t version; int64_t version;
......
...@@ -172,7 +172,7 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { ...@@ -172,7 +172,7 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
bool create = false; bool create = false;
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
if (pOffsetObj == NULL) { if (pOffsetObj == NULL) {
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset)); pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN); memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
create = true; create = true;
} }
......
...@@ -71,7 +71,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -71,7 +71,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
assert(walReadWithHandle(pWalHandle, index) == 0); assert(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.len); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
assert(pEntry != NULL); assert(pEntry != NULL);
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST; pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
...@@ -80,8 +80,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -80,8 +80,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
pEntry->term = pWalHandle->pHead->head.syncMeta.term; pEntry->term = pWalHandle->pHead->head.syncMeta.term;
pEntry->index = index; pEntry->index = index;
assert(pEntry->dataLen == pWalHandle->pHead->head.len); assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!! // need to hold, do not new every time!!
walCloseReadHandle(pWalHandle); walCloseReadHandle(pWalHandle);
...@@ -257,4 +257,4 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) { ...@@ -257,4 +257,4 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore); char* serialized = logStoreSimple2Str(pLogStore);
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
\ No newline at end of file
...@@ -16,10 +16,10 @@ ...@@ -16,10 +16,10 @@
#ifndef _TD_WAL_INT_H_ #ifndef _TD_WAL_INT_H_
#define _TD_WAL_INT_H_ #define _TD_WAL_INT_H_
#include "tcompare.h"
#include "taoserror.h" #include "taoserror.h"
#include "tchecksum.h" #include "tchecksum.h"
#include "tcoding.h" #include "tcoding.h"
#include "tcompare.h"
#include "wal.h" #include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -41,10 +41,10 @@ typedef struct WalIdxEntry { ...@@ -41,10 +41,10 @@ typedef struct WalIdxEntry {
} SWalIdxEntry; } SWalIdxEntry;
static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) { static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
int tlen; int tlen = 0;
tlen += taosEncodeFixedI64(buf, pIdxEntry->ver); tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
tlen += taosEncodeFixedI64(buf, pIdxEntry->offset); tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
return 0; return tlen;
} }
static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) { static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
...@@ -103,7 +103,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) { ...@@ -103,7 +103,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) {
} }
static inline int walValidBodyCksum(SWalHead* pHead) { static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody); return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody);
} }
static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) { static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) {
......
...@@ -144,12 +144,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p ...@@ -144,12 +144,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.len); *ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
if (*ppHead == NULL) { if (*ppHead == NULL) {
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return -1; return -1;
} }
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.len); memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
taosThreadMutexUnlock(&pRead->mutex); taosThreadMutexUnlock(&pRead->mutex);
return 0; return 0;
} }
...@@ -178,16 +178,18 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { ...@@ -178,16 +178,18 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (pRead->capacity < pRead->pHead->head.len) { if (pRead->capacity < pRead->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len); void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
pRead->pHead = ptr; pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.len; pRead->capacity = pRead->pHead->head.bodyLen;
} }
if (pRead->pHead->head.len != taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.len)) {
if (pRead->pHead->head.bodyLen !=
taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
return -1; return -1;
} }
......
...@@ -295,7 +295,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog ...@@ -295,7 +295,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal); int64_t offset = walGetCurFileOffset(pWal);
pWal->writeHead.head.len = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
// sync info // sync info
......
...@@ -320,7 +320,7 @@ TEST_F(WalKeepEnv, readHandleRead) { ...@@ -320,7 +320,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver); sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr); int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len); ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) { for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
...@@ -372,7 +372,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { ...@@ -372,7 +372,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver); sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr); int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len); ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) { for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
...@@ -402,7 +402,7 @@ TEST_F(WalRetentionEnv, repairMeta1) { ...@@ -402,7 +402,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
char newStr[100]; char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver); sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr); int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len); ASSERT_EQ(pRead->pHead->head.bodyLen, len);
for (int j = 0; j < len; j++) { for (int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册