提交 bd408504 编写于 作者: L Liu Jicong

make wal meta serialized and flushed into disk

上级 713c20c4
......@@ -83,8 +83,8 @@ int walReadMeta(SWal* pWal);
int walWriteMeta(SWal* pWal);
int walRollFileInfo(SWal* pWal);
char* walFileInfoSerialize(SWal* pWal);
SArray* walFileInfoDeserialize(const char* bytes);
char* walMetaSerialize(SWal* pWal);
int walMetaDeserialize(SWal* pWal, const char* bytes);
//meta section end
int64_t walGetSeq();
......
......@@ -47,49 +47,74 @@ int walRollFileInfo(SWal* pWal) {
return 0;
}
char* walFileInfoSerialize(SWal* pWal) {
char* walMetaSerialize(SWal* pWal) {
char buf[30];
if(pWal == NULL || pWal->fileInfoSet == NULL) return 0;
int sz = pWal->fileInfoSet->size;
cJSON* root = cJSON_CreateArray();
cJSON* field;
if(root == NULL) {
cJSON* pRoot = cJSON_CreateObject();
cJSON* pMeta = cJSON_CreateObject();
cJSON* pFiles = cJSON_CreateArray();
cJSON* pField;
if(pRoot == NULL || pMeta == NULL || pFiles == NULL) {
//TODO
return NULL;
}
cJSON_AddItemToObject(pRoot, "meta", pMeta);
sprintf(buf, "%" PRId64, pWal->firstVersion);
cJSON_AddStringToObject(pMeta, "firstVer", buf);
sprintf(buf, "%" PRId64, pWal->snapshotVersion);
cJSON_AddStringToObject(pMeta, "snapshotVer", buf);
sprintf(buf, "%" PRId64, pWal->commitVersion);
cJSON_AddStringToObject(pMeta, "commitVer", buf);
sprintf(buf, "%" PRId64, pWal->lastVersion);
cJSON_AddStringToObject(pMeta, "lastVer", buf);
cJSON_AddItemToObject(pRoot, "files", pFiles);
WalFileInfo* pData = pWal->fileInfoSet->pData;
for(int i = 0; i < sz; i++) {
WalFileInfo* pInfo = &pData[i];
cJSON_AddItemToArray(root, field = cJSON_CreateObject());
if(field == NULL) {
cJSON_Delete(root);
cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject());
if(pField == NULL) {
cJSON_Delete(pRoot);
return NULL;
}
//cjson only support int32_t or double
//string are used to prohibit the loss of precision
sprintf(buf, "%ld", pInfo->firstVer);
cJSON_AddStringToObject(field, "firstVer", buf);
sprintf(buf, "%ld", pInfo->lastVer);
cJSON_AddStringToObject(field, "lastVer", buf);
sprintf(buf, "%ld", pInfo->createTs);
cJSON_AddStringToObject(field, "createTs", buf);
sprintf(buf, "%ld", pInfo->closeTs);
cJSON_AddStringToObject(field, "closeTs", buf);
sprintf(buf, "%ld", pInfo->fileSize);
cJSON_AddStringToObject(field, "fileSize", buf);
sprintf(buf, "%" PRId64, pInfo->firstVer);
cJSON_AddStringToObject(pField, "firstVer", buf);
sprintf(buf, "%" PRId64, pInfo->lastVer);
cJSON_AddStringToObject(pField, "lastVer", buf);
sprintf(buf, "%" PRId64, pInfo->createTs);
cJSON_AddStringToObject(pField, "createTs", buf);
sprintf(buf, "%" PRId64, pInfo->closeTs);
cJSON_AddStringToObject(pField, "closeTs", buf);
sprintf(buf, "%" PRId64, pInfo->fileSize);
cJSON_AddStringToObject(pField, "fileSize", buf);
}
return cJSON_Print(root);
return cJSON_Print(pRoot);
}
SArray* walFileInfoDeserialize(const char* bytes) {
cJSON *root, *pInfoJson, *pField;
root = cJSON_Parse(bytes);
int sz = cJSON_GetArraySize(root);
int walMetaDeserialize(SWal* pWal, const char* bytes) {
ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0);
cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField;
pRoot = cJSON_Parse(bytes);
pMeta = cJSON_GetObjectItem(pRoot, "meta");
pField = cJSON_GetObjectItem(pMeta, "firstVer");
pWal->firstVersion = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "snapshotVer");
pWal->snapshotVersion = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "commitVer");
pWal->commitVersion = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "lastVer");
pWal->lastVersion = atoll(cJSON_GetStringValue(pField));
pFiles = cJSON_GetObjectItem(pRoot, "files");
int sz = cJSON_GetArraySize(pFiles);
//deserialize
SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo));
WalFileInfo *pData = pArray->pData;
for(int i = 0; i < sz; i++) {
cJSON* pInfoJson = cJSON_GetArrayItem(root, i);
cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i);
WalFileInfo* pInfo = &pData[i];
pField = cJSON_GetObjectItem(pInfoJson, "firstVer");
pInfo->firstVer = atoll(cJSON_GetStringValue(pField));
......@@ -103,7 +128,8 @@ SArray* walFileInfoDeserialize(const char* bytes) {
pInfo->fileSize = atoll(cJSON_GetStringValue(pField));
}
taosArraySetSize(pArray, sz);
return pArray;
pWal->fileInfoSet = pArray;
return 0;
}
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
......@@ -144,7 +170,7 @@ int walWriteMeta(SWal* pWal) {
if(metaTfd < 0) {
return -1;
}
char* serialized = walFileInfoSerialize(pWal);
char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized);
if(len != tfWrite(metaTfd, serialized, len)) {
//TODO:clean file
......@@ -183,8 +209,8 @@ int walReadMeta(SWal* pWal) {
return -1;
}
//load into fileInfoSet
pWal->fileInfoSet = walFileInfoDeserialize(buf);
if(pWal->fileInfoSet == NULL) {
int code = walMetaDeserialize(pWal, buf);
if(code != 0) {
free(buf);
return -1;
}
......
......@@ -175,8 +175,6 @@ static void walFreeObj(void *wal) {
tfClose(pWal->writeIdxTfd);
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
pthread_mutex_destroy(&pWal->mutex);
tfree(pWal);
}
......
......@@ -92,7 +92,7 @@ TEST_F(WalCleanEnv, serialize) {
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
char*ss = walFileInfoSerialize(pWal);
char*ss = walMetaSerialize(pWal);
printf("%s\n", ss);
code = walWriteMeta(pWal);
ASSERT(code == 0);
......@@ -113,20 +113,19 @@ TEST_F(WalCleanEnv, removeOldMeta) {
TEST_F(WalKeepEnv, readOldMeta) {
int code = walRollFileInfo(pWal);
ASSERT(code == 0);
ASSERT(pWal->fileInfoSet != NULL);
code = walWriteMeta(pWal);
ASSERT(code == 0);
code = walRollFileInfo(pWal);
ASSERT(code == 0);
code = walWriteMeta(pWal);
ASSERT(code == 0);
char*oldss = walFileInfoSerialize(pWal);
char*oldss = walMetaSerialize(pWal);
TearDown();
SetUp();
code = walReadMeta(pWal);
ASSERT(code == 0);
char* newss = walFileInfoSerialize(pWal);
char* newss = walMetaSerialize(pWal);
int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册