提交 b2083ea6 编写于 作者: L Liu Jicong

fix memory leak

上级 d68e6e79
......@@ -52,6 +52,7 @@ typedef struct {
int32_t fsyncPeriod; // millisecond
int32_t retentionPeriod; // secs
int32_t rollPeriod; // secs
int32_t retentionSize; // secs
int64_t segSize;
EWalType walLevel; // wal level
} SWalCfg;
......
......@@ -43,6 +43,13 @@ typedef struct SArray {
*/
void* taosArrayInit(size_t size, size_t elemSize);
/**
*
* @param tsize
* @return
*/
int32_t taosArrayEnsureCap(SArray* pArray, size_t tsize);
/**
*
* @param pArray
......
......@@ -46,6 +46,7 @@ int walRollFileInfo(SWal* pWal) {
pInfo->closeTs = ts;
}
//TODO: change to emplace back
WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo));
if(pNewInfo == NULL) {
return -1;
......@@ -56,12 +57,13 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0;
taosArrayPush(pWal->fileInfoSet, pNewInfo);
free(pNewInfo);
return 0;
}
char* walMetaSerialize(SWal* pWal) {
char buf[30];
if(pWal == NULL || pWal->fileInfoSet == NULL) return 0;
ASSERT(pWal->fileInfoSet);
int sz = pWal->fileInfoSet->size;
cJSON* pRoot = cJSON_CreateObject();
cJSON* pMeta = cJSON_CreateObject();
......@@ -103,7 +105,9 @@ char* walMetaSerialize(SWal* pWal) {
sprintf(buf, "%" PRId64, pInfo->fileSize);
cJSON_AddStringToObject(pField, "fileSize", buf);
}
return cJSON_Print(pRoot);
char* serialized = cJSON_Print(pRoot);
cJSON_Delete(pRoot);
return serialized;
}
int walMetaDeserialize(SWal* pWal, const char* bytes) {
......@@ -123,7 +127,8 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
pFiles = cJSON_GetObjectItem(pRoot, "files");
int sz = cJSON_GetArraySize(pFiles);
//deserialize
SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo));
SArray* pArray = pWal->fileInfoSet;
taosArrayEnsureCap(pArray, sz);
WalFileInfo *pData = pArray->pData;
for(int i = 0; i < sz; i++) {
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
......@@ -141,6 +146,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
}
taosArraySetSize(pArray, sz);
pWal->fileInfoSet = pArray;
cJSON_Delete(pRoot);
return 0;
}
......@@ -171,6 +177,8 @@ static int walFindCurMetaVer(SWal* pWal) {
break;
}
}
closedir(dir);
regfree(&walMetaRegexPattern);
return metaVer;
}
......@@ -195,6 +203,7 @@ int walWriteMeta(SWal* pWal) {
walBuildMetaName(pWal, metaVer, fnameStr);
remove(fnameStr);
}
free(serialized);
return 0;
}
......@@ -215,6 +224,7 @@ int walReadMeta(SWal* pWal) {
if(buf == NULL) {
return -1;
}
memset(buf, 0, size+5);
int tfd = tfOpenRead(fnameStr);
if(tfRead(tfd, buf, size) != size) {
free(buf);
......
......@@ -80,6 +80,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
memset(pWal, 0, sizeof(SWal));
pWal->writeLogTfd = -1;
pWal->writeIdxTfd = -1;
pWal->writeCur = -1;
......@@ -89,6 +90,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->rollPeriod = pCfg->rollPeriod;
pWal->segSize = pCfg->segSize;
pWal->retentionSize = pCfg->retentionSize;
pWal->retentionPeriod = pCfg->retentionPeriod;
pWal->level = pCfg->walLevel;
//init version info
......@@ -99,6 +102,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->snapshottingVer = -1;
pWal->totSize = 0;
//init status
pWal->lastRollSeq = -1;
......@@ -122,6 +127,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walFreeObj(pWal);
return NULL;
}
walReadMeta(pWal);
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
......@@ -153,9 +159,12 @@ void walClose(SWal *pWal) {
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->writeLogTfd);
pWal->writeLogTfd = -1;
tfClose(pWal->writeIdxTfd);
/*taosArrayDestroy(pWal->fileInfoSet);*/
/*pWal->fileInfoSet = NULL;*/
pWal->writeIdxTfd = -1;
walWriteMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refSetId, pWal->refId);
}
......@@ -165,7 +174,7 @@ static int32_t walInitObj(SWal *pWal) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
pWal->fileInfoSet = taosArrayInit(0, sizeof(WalFileInfo));
pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
if(pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
......
......@@ -228,6 +228,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
pthread_mutex_unlock(&pWal->mutex);
return 0;
}
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
pWal->snapshottingVer = ver;
//check file rolling
......@@ -276,10 +277,12 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
//make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1;
pWal->firstVersion = -1;
} else {
pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
}
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;;
pWal->totSize = newTotSize;
pWal->snapshottingVer = -1;
......@@ -340,19 +343,10 @@ int walRoll(SWal *pWal) {
}
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int code = 0;
//get index file
if(!tfValid(pWal->writeIdxTfd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
return code;
}
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
WalIdxEntry entry = { .ver = ver, .offset = offset };
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
if(size != sizeof(WalIdxEntry)) {
//TODO truncate
return -1;
}
return 0;
......
......@@ -18,12 +18,15 @@ class WalCleanEnv : public ::testing::Test {
void SetUp() override {
taosRemoveDir(pathName);
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal));
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1;
pCfg->segSize = -1;
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
free(pCfg);
ASSERT(pWal != NULL);
}
......@@ -49,11 +52,13 @@ class WalCleanDeleteEnv : public ::testing::Test {
void SetUp() override {
taosRemoveDir(pathName);
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal));
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
free(pCfg);
ASSERT(pWal != NULL);
}
......@@ -77,13 +82,22 @@ class WalKeepEnv : public ::testing::Test {
walCleanUp();
}
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override {
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal));
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1;
pCfg->segSize = -1;
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg);
free(pCfg);
ASSERT(pWal != NULL);
}
......@@ -124,6 +138,7 @@ TEST_F(WalCleanEnv, serialize) {
ASSERT(code == 0);
char*ss = walMetaSerialize(pWal);
printf("%s\n", ss);
free(ss);
code = walWriteMeta(pWal);
ASSERT(code == 0);
}
......@@ -140,29 +155,38 @@ TEST_F(WalCleanEnv, removeOldMeta) {
ASSERT(code == 0);
}
//TEST_F(WalKeepEnv, readOldMeta) {
//int code = walRollFileInfo(pWal);
//ASSERT(code == 0);
//code = walWriteMeta(pWal);
//ASSERT(code == 0);
//code = walRollFileInfo(pWal);
//ASSERT(code == 0);
//code = walWriteMeta(pWal);
//ASSERT(code == 0);
//char*oldss = walMetaSerialize(pWal);
//TearDown();
//SetUp();
//code = walReadMeta(pWal);
//ASSERT(code == 0);
//char* newss = walMetaSerialize(pWal);
//int len = strlen(oldss);
//ASSERT_EQ(len, strlen(newss));
//for(int i = 0; i < len; i++) {
//EXPECT_EQ(oldss[i], newss[i]);
//}
//}
TEST_F(WalKeepEnv, readOldMeta) {
walResetEnv();
const char* ranStr = "tvapq02tcp";
int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->lastVersion, i);
}
char* oldss = walMetaSerialize(pWal);
TearDown();
SetUp();
ASSERT_EQ(pWal->firstVersion, 0);
ASSERT_EQ(pWal->lastVersion, 9);
char* newss = walMetaSerialize(pWal);
len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
for(int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
}
free(oldss);
free(newss);
}
TEST_F(WalCleanEnv, write) {
const char* ranStr = "tvapq02tcp";
......@@ -228,6 +252,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->commitVersion, i);
}
code = walWriteMeta(pWal);
//code = walWriteMeta(pWal);
code = walBeginTakeSnapshot(pWal, i - 1);
ASSERT_EQ(code, 0);
code = walEndTakeSnapshot(pWal);
ASSERT_EQ(code, 0);
}
......@@ -58,24 +58,31 @@ static int32_t taosArrayResize(SArray* pArray) {
return 0;
}
void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles) {
if (pArray == NULL || pData == NULL) {
return NULL;
}
if (pArray->size + nEles > pArray->capacity) {
int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
if (newCap > pArray->capacity) {
size_t tsize = (pArray->capacity << 1u);
while (pArray->size + nEles > tsize) {
while (newCap > tsize) {
tsize = (tsize << 1u);
}
pArray->pData = realloc(pArray->pData, tsize * pArray->elemSize);
if (pArray->pData == NULL) {
return NULL;
return -1;
}
pArray->capacity = tsize;
}
return 0;
}
void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles) {
if (pArray == NULL || pData == NULL) {
return NULL;
}
if(taosArrayEnsureCap(pArray, pArray->size + nEles) != 0){
return NULL;
}
void* dst = TARRAY_GET_ELEM(pArray, pArray->size);
memcpy(dst, pData, pArray->elemSize * nEles);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册