提交 9569172a 编写于 作者: L Liu Jicong

Merge branch 'feature/tq' into 3.0

...@@ -52,6 +52,7 @@ typedef struct { ...@@ -52,6 +52,7 @@ typedef struct {
int32_t fsyncPeriod; // millisecond int32_t fsyncPeriod; // millisecond
int32_t retentionPeriod; // secs int32_t retentionPeriod; // secs
int32_t rollPeriod; // secs int32_t rollPeriod; // secs
int32_t retentionSize; // secs
int64_t segSize; int64_t segSize;
EWalType walLevel; // wal level EWalType walLevel; // wal level
} SWalCfg; } SWalCfg;
......
...@@ -43,6 +43,13 @@ typedef struct SArray { ...@@ -43,6 +43,13 @@ typedef struct SArray {
*/ */
void* taosArrayInit(size_t size, size_t elemSize); void* taosArrayInit(size_t size, size_t elemSize);
/**
*
* @param tsize
* @return
*/
int32_t taosArrayEnsureCap(SArray* pArray, size_t tsize);
/** /**
* *
* @param pArray * @param pArray
......
...@@ -46,6 +46,7 @@ int walRollFileInfo(SWal* pWal) { ...@@ -46,6 +46,7 @@ int walRollFileInfo(SWal* pWal) {
pInfo->closeTs = ts; pInfo->closeTs = ts;
} }
//TODO: change to emplace back
WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo)); WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo));
if(pNewInfo == NULL) { if(pNewInfo == NULL) {
return -1; return -1;
...@@ -56,12 +57,13 @@ int walRollFileInfo(SWal* pWal) { ...@@ -56,12 +57,13 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo->closeTs = -1; pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0; pNewInfo->fileSize = 0;
taosArrayPush(pWal->fileInfoSet, pNewInfo); taosArrayPush(pWal->fileInfoSet, pNewInfo);
free(pNewInfo);
return 0; return 0;
} }
char* walMetaSerialize(SWal* pWal) { char* walMetaSerialize(SWal* pWal) {
char buf[30]; char buf[30];
if(pWal == NULL || pWal->fileInfoSet == NULL) return 0; ASSERT(pWal->fileInfoSet);
int sz = pWal->fileInfoSet->size; int sz = pWal->fileInfoSet->size;
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
cJSON* pMeta = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject();
...@@ -103,7 +105,9 @@ char* walMetaSerialize(SWal* pWal) { ...@@ -103,7 +105,9 @@ char* walMetaSerialize(SWal* pWal) {
sprintf(buf, "%" PRId64, pInfo->fileSize); sprintf(buf, "%" PRId64, pInfo->fileSize);
cJSON_AddStringToObject(pField, "fileSize", buf); cJSON_AddStringToObject(pField, "fileSize", buf);
} }
return cJSON_Print(pRoot); char* serialized = cJSON_Print(pRoot);
cJSON_Delete(pRoot);
return serialized;
} }
int walMetaDeserialize(SWal* pWal, const char* bytes) { int walMetaDeserialize(SWal* pWal, const char* bytes) {
...@@ -123,7 +127,8 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ...@@ -123,7 +127,8 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
pFiles = cJSON_GetObjectItem(pRoot, "files"); pFiles = cJSON_GetObjectItem(pRoot, "files");
int sz = cJSON_GetArraySize(pFiles); int sz = cJSON_GetArraySize(pFiles);
//deserialize //deserialize
SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo)); SArray* pArray = pWal->fileInfoSet;
taosArrayEnsureCap(pArray, sz);
WalFileInfo *pData = pArray->pData; WalFileInfo *pData = pArray->pData;
for(int i = 0; i < sz; i++) { for(int i = 0; i < sz; i++) {
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
...@@ -141,6 +146,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ...@@ -141,6 +146,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
} }
taosArraySetSize(pArray, sz); taosArraySetSize(pArray, sz);
pWal->fileInfoSet = pArray; pWal->fileInfoSet = pArray;
cJSON_Delete(pRoot);
return 0; return 0;
} }
...@@ -171,6 +177,8 @@ static int walFindCurMetaVer(SWal* pWal) { ...@@ -171,6 +177,8 @@ static int walFindCurMetaVer(SWal* pWal) {
break; break;
} }
} }
closedir(dir);
regfree(&walMetaRegexPattern);
return metaVer; return metaVer;
} }
...@@ -195,6 +203,7 @@ int walWriteMeta(SWal* pWal) { ...@@ -195,6 +203,7 @@ int walWriteMeta(SWal* pWal) {
walBuildMetaName(pWal, metaVer, fnameStr); walBuildMetaName(pWal, metaVer, fnameStr);
remove(fnameStr); remove(fnameStr);
} }
free(serialized);
return 0; return 0;
} }
...@@ -215,6 +224,7 @@ int walReadMeta(SWal* pWal) { ...@@ -215,6 +224,7 @@ int walReadMeta(SWal* pWal) {
if(buf == NULL) { if(buf == NULL) {
return -1; return -1;
} }
memset(buf, 0, size+5);
int tfd = tfOpenRead(fnameStr); int tfd = tfOpenRead(fnameStr);
if(tfRead(tfd, buf, size) != size) { if(tfRead(tfd, buf, size) != size) {
free(buf); free(buf);
......
...@@ -80,6 +80,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -80,6 +80,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
memset(pWal, 0, sizeof(SWal));
pWal->writeLogTfd = -1; pWal->writeLogTfd = -1;
pWal->writeIdxTfd = -1; pWal->writeIdxTfd = -1;
pWal->writeCur = -1; pWal->writeCur = -1;
...@@ -89,6 +90,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -89,6 +90,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->fsyncPeriod = pCfg->fsyncPeriod; pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->rollPeriod = pCfg->rollPeriod; pWal->rollPeriod = pCfg->rollPeriod;
pWal->segSize = pCfg->segSize; pWal->segSize = pCfg->segSize;
pWal->retentionSize = pCfg->retentionSize;
pWal->retentionPeriod = pCfg->retentionPeriod;
pWal->level = pCfg->walLevel; pWal->level = pCfg->walLevel;
//init version info //init version info
...@@ -99,6 +102,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -99,6 +102,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->snapshottingVer = -1; pWal->snapshottingVer = -1;
pWal->totSize = 0;
//init status //init status
pWal->lastRollSeq = -1; pWal->lastRollSeq = -1;
...@@ -122,6 +127,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -122,6 +127,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walFreeObj(pWal); walFreeObj(pWal);
return NULL; return NULL;
} }
walReadMeta(pWal);
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
...@@ -153,9 +159,12 @@ void walClose(SWal *pWal) { ...@@ -153,9 +159,12 @@ void walClose(SWal *pWal) {
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->writeLogTfd); tfClose(pWal->writeLogTfd);
pWal->writeLogTfd = -1;
tfClose(pWal->writeIdxTfd); tfClose(pWal->writeIdxTfd);
/*taosArrayDestroy(pWal->fileInfoSet);*/ pWal->writeIdxTfd = -1;
/*pWal->fileInfoSet = NULL;*/ walWriteMeta(pWal);
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refSetId, pWal->refId); taosRemoveRef(tsWal.refSetId, pWal->refId);
} }
...@@ -165,7 +174,7 @@ static int32_t walInitObj(SWal *pWal) { ...@@ -165,7 +174,7 @@ static int32_t walInitObj(SWal *pWal) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
pWal->fileInfoSet = taosArrayInit(0, sizeof(WalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
if(pWal->fileInfoSet == NULL) { if(pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
......
...@@ -228,6 +228,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -228,6 +228,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
return 0; return 0;
} }
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
pWal->snapshottingVer = ver; pWal->snapshottingVer = ver;
//check file rolling //check file rolling
...@@ -276,10 +277,12 @@ int32_t walEndTakeSnapshot(SWal *pWal) { ...@@ -276,10 +277,12 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
//make new array, remove files //make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if(taosArrayGetSize(pWal->fileInfoSet) == 0) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1;
pWal->firstVersion = -1; pWal->firstVersion = -1;
} else { } else {
pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
} }
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;;
pWal->totSize = newTotSize; pWal->totSize = newTotSize;
pWal->snapshottingVer = -1; pWal->snapshottingVer = -1;
...@@ -340,19 +343,10 @@ int walRoll(SWal *pWal) { ...@@ -340,19 +343,10 @@ int walRoll(SWal *pWal) {
} }
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int code = 0;
//get index file
if(!tfValid(pWal->writeIdxTfd)) {
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
return code;
}
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
WalIdxEntry entry = { .ver = ver, .offset = offset }; WalIdxEntry entry = { .ver = ver, .offset = offset };
int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry)); int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(WalIdxEntry));
if(size != sizeof(WalIdxEntry)) { if(size != sizeof(WalIdxEntry)) {
//TODO truncate
return -1; return -1;
} }
return 0; return 0;
......
...@@ -18,12 +18,15 @@ class WalCleanEnv : public ::testing::Test { ...@@ -18,12 +18,15 @@ class WalCleanEnv : public ::testing::Test {
void SetUp() override { void SetUp() override {
taosRemoveDir(pathName); taosRemoveDir(pathName);
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg)); memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1; pCfg->rollPeriod = -1;
pCfg->segSize = -1; pCfg->segSize = -1;
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
} }
...@@ -49,11 +52,13 @@ class WalCleanDeleteEnv : public ::testing::Test { ...@@ -49,11 +52,13 @@ class WalCleanDeleteEnv : public ::testing::Test {
void SetUp() override { void SetUp() override {
taosRemoveDir(pathName); taosRemoveDir(pathName);
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg)); memset(pCfg, 0, sizeof(SWalCfg));
pCfg->retentionPeriod = 0; pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
} }
...@@ -77,13 +82,22 @@ class WalKeepEnv : public ::testing::Test { ...@@ -77,13 +82,22 @@ class WalKeepEnv : public ::testing::Test {
walCleanUp(); walCleanUp();
} }
void walResetEnv() {
TearDown();
taosRemoveDir(pathName);
SetUp();
}
void SetUp() override { void SetUp() override {
SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWalCfg));
memset(pCfg, 0, sizeof(SWalCfg)); memset(pCfg, 0, sizeof(SWalCfg));
pCfg->rollPeriod = -1; pCfg->rollPeriod = -1;
pCfg->segSize = -1; pCfg->segSize = -1;
pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->walLevel = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
} }
...@@ -124,6 +138,7 @@ TEST_F(WalCleanEnv, serialize) { ...@@ -124,6 +138,7 @@ TEST_F(WalCleanEnv, serialize) {
ASSERT(code == 0); ASSERT(code == 0);
char*ss = walMetaSerialize(pWal); char*ss = walMetaSerialize(pWal);
printf("%s\n", ss); printf("%s\n", ss);
free(ss);
code = walWriteMeta(pWal); code = walWriteMeta(pWal);
ASSERT(code == 0); ASSERT(code == 0);
} }
...@@ -140,29 +155,38 @@ TEST_F(WalCleanEnv, removeOldMeta) { ...@@ -140,29 +155,38 @@ TEST_F(WalCleanEnv, removeOldMeta) {
ASSERT(code == 0); ASSERT(code == 0);
} }
//TEST_F(WalKeepEnv, readOldMeta) { TEST_F(WalKeepEnv, readOldMeta) {
//int code = walRollFileInfo(pWal); walResetEnv();
//ASSERT(code == 0); const char* ranStr = "tvapq02tcp";
//code = walWriteMeta(pWal); int len = strlen(ranStr);
//ASSERT(code == 0); int code;
//code = walRollFileInfo(pWal);
//ASSERT(code == 0); for(int i = 0; i < 10; i++) {
//code = walWriteMeta(pWal); code = walWrite(pWal, i, i+1, (void*)ranStr, len);
//ASSERT(code == 0); ASSERT_EQ(code, 0);
//char*oldss = walMetaSerialize(pWal); ASSERT_EQ(pWal->lastVersion, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
//TearDown(); ASSERT_EQ(code, -1);
//SetUp(); ASSERT_EQ(pWal->lastVersion, i);
//code = walReadMeta(pWal); }
//ASSERT(code == 0); char* oldss = walMetaSerialize(pWal);
//char* newss = walMetaSerialize(pWal);
TearDown();
//int len = strlen(oldss); SetUp();
//ASSERT_EQ(len, strlen(newss));
//for(int i = 0; i < len; i++) { ASSERT_EQ(pWal->firstVersion, 0);
//EXPECT_EQ(oldss[i], newss[i]); ASSERT_EQ(pWal->lastVersion, 9);
//}
//} char* newss = walMetaSerialize(pWal);
len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
for(int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
}
free(oldss);
free(newss);
}
TEST_F(WalCleanEnv, write) { TEST_F(WalCleanEnv, write) {
const char* ranStr = "tvapq02tcp"; const char* ranStr = "tvapq02tcp";
...@@ -228,6 +252,9 @@ TEST_F(WalCleanDeleteEnv, roll) { ...@@ -228,6 +252,9 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->commitVersion, i); ASSERT_EQ(pWal->commitVersion, i);
} }
code = walWriteMeta(pWal); //code = walWriteMeta(pWal);
code = walBeginTakeSnapshot(pWal, i - 1);
ASSERT_EQ(code, 0);
code = walEndTakeSnapshot(pWal);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }
...@@ -58,24 +58,31 @@ static int32_t taosArrayResize(SArray* pArray) { ...@@ -58,24 +58,31 @@ static int32_t taosArrayResize(SArray* pArray) {
return 0; return 0;
} }
void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles) { int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
if (pArray == NULL || pData == NULL) { if (newCap > pArray->capacity) {
return NULL;
}
if (pArray->size + nEles > pArray->capacity) {
size_t tsize = (pArray->capacity << 1u); size_t tsize = (pArray->capacity << 1u);
while (pArray->size + nEles > tsize) { while (newCap > tsize) {
tsize = (tsize << 1u); tsize = (tsize << 1u);
} }
pArray->pData = realloc(pArray->pData, tsize * pArray->elemSize); pArray->pData = realloc(pArray->pData, tsize * pArray->elemSize);
if (pArray->pData == NULL) { if (pArray->pData == NULL) {
return NULL; return -1;
} }
pArray->capacity = tsize; pArray->capacity = tsize;
} }
return 0;
}
void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles) {
if (pArray == NULL || pData == NULL) {
return NULL;
}
if(taosArrayEnsureCap(pArray, pArray->size + nEles) != 0){
return NULL;
}
void* dst = TARRAY_GET_ELEM(pArray, pArray->size); void* dst = TARRAY_GET_ELEM(pArray, pArray->size);
memcpy(dst, pData, pArray->elemSize * nEles); memcpy(dst, pData, pArray->elemSize * nEles);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册