提交 c2d352b9 编写于 作者: H Hongze Cheng

TD-353

上级 34b17dc0
...@@ -25,7 +25,7 @@ typedef int (*iterFunc)(void *, void *cont, int contLen); ...@@ -25,7 +25,7 @@ typedef int (*iterFunc)(void *, void *cont, int contLen);
typedef void (*afterFunc)(void *); typedef void (*afterFunc)(void *);
typedef struct { typedef struct {
int64_t size; int64_t size; // including 512 bytes of header size
int64_t tombSize; int64_t tombSize;
int64_t nRecords; int64_t nRecords;
int64_t nDels; int64_t nDels;
......
...@@ -58,7 +58,7 @@ int tdCreateKVStore(char *fname) { ...@@ -58,7 +58,7 @@ int tdCreateKVStore(char *fname) {
if (fd < 0) { if (fd < 0) {
uError("failed to open file %s since %s", fname, strerror(errno)); uError("failed to open file %s since %s", fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; goto _err;
} }
if (tdInitKVStoreHeader(fd, fname) < 0) { if (tdInitKVStoreHeader(fd, fname) < 0) {
...@@ -80,6 +80,11 @@ int tdCreateKVStore(char *fname) { ...@@ -80,6 +80,11 @@ int tdCreateKVStore(char *fname) {
} }
return 0; return 0;
_err:
if (fd > 0) close(fd);
remove(fname);
return -1;
} }
int tdDestroyKVStore(char *fname) { int tdDestroyKVStore(char *fname) {
...@@ -105,7 +110,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH ...@@ -105,7 +110,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
goto _err; goto _err;
} }
if (access(pStore->fsnap, F_OK) == 0) { if (access(pStore->fsnap, F_OK) == 0) { // .snap file exists
uTrace("file %s exists, try to recover the KV store", pStore->fsnap); uTrace("file %s exists, try to recover the KV store", pStore->fsnap);
pStore->sfd = open(pStore->fsnap, O_RDONLY); pStore->sfd = open(pStore->fsnap, O_RDONLY);
if (pStore->sfd < 0) { if (pStore->sfd < 0) {
...@@ -114,8 +119,9 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH ...@@ -114,8 +119,9 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
goto _err; goto _err;
} }
if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) goto _err; if (tdLoadKVStoreHeader(pStore->sfd, pStore->fsnap, &info) < 0) {
if (terrno != TSDB_CODE_COM_FILE_CORRUPTED) goto _err;
} else {
if (ftruncate(pStore->fd, info.size) < 0) { if (ftruncate(pStore->fd, info.size) < 0) {
uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno)); uError("failed to truncate %s to " PRId64 " size since %s", pStore->fname, info.size, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -123,6 +129,11 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH ...@@ -123,6 +129,11 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
} }
if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; if (tdUpdateKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
if (fsync(pStore->fd) < 0) {
uError("failed to fsync file %s since %s", pStore->fname, strerror(errno));
goto _err;
}
}
close(pStore->sfd); close(pStore->sfd);
pStore->sfd = -1; pStore->sfd = -1;
...@@ -131,22 +142,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH ...@@ -131,22 +142,7 @@ SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH
if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err; if (tdLoadKVStoreHeader(pStore->fd, pStore->fname, &info) < 0) goto _err;
struct stat tfstat; pStore->info.size = TD_KVSTORE_HEADER_SIZE;
if (fstat(pStore->fd, &tfstat) < 0) {
uError("failed to fstat file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
ASSERT(info.size == tfstat.st_size);
if (lseek(pStore->fd, TD_KVSTORE_HEADER_SIZE, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
pStore->info.size += TD_KVSTORE_HEADER_SIZE;
if (tdRestoreKVStore(pStore) < 0) goto _err; if (tdRestoreKVStore(pStore) < 0) goto _err;
...@@ -326,7 +322,9 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) { ...@@ -326,7 +322,9 @@ static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo) {
return -1; return -1;
} }
tdEncodeStoreInfo(buf, pInfo); void *pBuf = tdEncodeStoreInfo(buf, pInfo);
ASSERT(POINTER_DISTANCE(pBuf, buf) + sizeof(TSCKSUM) <= TD_KVSTORE_HEADER_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE); taosCalcChecksumAppend(0, (uint8_t *)buf, TD_KVSTORE_HEADER_SIZE);
if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) { if (twrite(fd, buf, TD_KVSTORE_HEADER_SIZE) < TD_KVSTORE_HEADER_SIZE) {
uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno)); uError("failed to write %d bytes to file %s since %s", TD_KVSTORE_HEADER_SIZE, fname, strerror(errno));
...@@ -449,30 +447,32 @@ static int tdRestoreKVStore(SKVStore *pStore) { ...@@ -449,30 +447,32 @@ static int tdRestoreKVStore(SKVStore *pStore) {
void * buf = NULL; void * buf = NULL;
int maxBufSize = 0; int maxBufSize = 0;
SKVRecord rInfo = {0}; SKVRecord rInfo = {0};
SHashMutableIterator *pIter = NULL;
ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR)); ASSERT(TD_KVSTORE_HEADER_SIZE == lseek(pStore->fd, 0, SEEK_CUR));
ASSERT(pStore->info.size == TD_KVSTORE_HEADER_SIZE);
while (true) { while (true) {
ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord)); ssize_t tsize = tread(pStore->fd, tbuf, sizeof(SKVRecord));
if (tsize == 0) break; if (tsize == 0) break;
if (tsize < sizeof(SKVRecord)) { if (tsize < sizeof(SKVRecord)) {
uError("failed to read %d bytes from file %s since %s", sizeof(SKVRecord), pStore->fname, strerror(errno)); uError("failed to read %d bytes from file %s at offset %" PRId64 "since %s", sizeof(SKVRecord), pStore->fname,
pStore->info.size, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
char *pBuf = tdDecodeKVRecord(tbuf, &rInfo); char *pBuf = tdDecodeKVRecord(tbuf, &rInfo);
ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord)); ASSERT(POINTER_DISTANCE(pBuf, tbuf) == sizeof(SKVRecord));
ASSERT(rInfo.offset > 0 ? pStore->info.size == rInfo.offset : true); ASSERT(pStore->info.size == rInfo.offset);
if (rInfo.offset < 0) { if (rInfo.offset < 0) {
taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid)); taosHashRemove(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid));
pStore->info.size += sizeof(SKVRecord); pStore->info.size += sizeof(SKVRecord);
pStore->info.nRecords--; pStore->info.nRecords--;
pStore->info.nDels++; pStore->info.nDels++;
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) + sizeof(SKVRecord)); pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
} else { } else {
// TODO: add statistics
ASSERT(rInfo.offset > 0 && rInfo.size > 0); ASSERT(rInfo.offset > 0 && rInfo.size > 0);
if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { if (taosHashPut(pStore->map, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
uError("failed to put record in KV store %s", pStore->fname); uError("failed to put record in KV store %s", pStore->fname);
...@@ -487,6 +487,9 @@ static int tdRestoreKVStore(SKVStore *pStore) { ...@@ -487,6 +487,9 @@ static int tdRestoreKVStore(SKVStore *pStore) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
pStore->info.size += (sizeof(SKVRecord) + rInfo.size);
pStore->info.nRecords++;
} }
} }
...@@ -497,7 +500,7 @@ static int tdRestoreKVStore(SKVStore *pStore) { ...@@ -497,7 +500,7 @@ static int tdRestoreKVStore(SKVStore *pStore) {
goto _err; goto _err;
} }
SHashMutableIterator *pIter = taosHashCreateIter(pStore->map); pIter = taosHashCreateIter(pStore->map);
if (pIter == NULL) { if (pIter == NULL) {
uError("failed to create hash iter while opening KV store %s", pStore->fname); uError("failed to create hash iter while opening KV store %s", pStore->fname);
terrno = TSDB_CODE_COM_OUT_OF_MEMORY; terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
...@@ -508,13 +511,14 @@ static int tdRestoreKVStore(SKVStore *pStore) { ...@@ -508,13 +511,14 @@ static int tdRestoreKVStore(SKVStore *pStore) {
SKVRecord *pRecord = taosHashIterGet(pIter); SKVRecord *pRecord = taosHashIterGet(pIter);
if (lseek(pStore->fd, pRecord->offset, SEEK_SET) < 0) { if (lseek(pStore->fd, pRecord->offset, SEEK_SET) < 0) {
uError("failed to lseek file %s since %s", pStore->fname, strerror(errno)); uError("failed to lseek file %s since %s, offset %" PRId64, pStore->fname, strerror(errno), pRecord->offset);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (tread(pStore->fd, buf, pRecord->size) < pRecord->size) { if (tread(pStore->fd, buf, pRecord->size) < pRecord->size) {
uError("failed to read %d bytes from file %s since %s", pRecord->size, pStore->fname, strerror(errno)); uError("failed to read %" PRId64 " bytes from file %s since %s, offset %" PRId64, pRecord->size, pStore->fname,
strerror(errno), pRecord->offset);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
...@@ -528,14 +532,14 @@ static int tdRestoreKVStore(SKVStore *pStore) { ...@@ -528,14 +532,14 @@ static int tdRestoreKVStore(SKVStore *pStore) {
if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size); if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size);
} }
taosHashDestroyIter(pIter);
if (pStore->aFunc) (*pStore->aFunc)(pStore->appH); if (pStore->aFunc) (*pStore->aFunc)(pStore->appH);
taosHashDestroyIter(pIter);
tfree(buf); tfree(buf);
return 0; return 0;
_err: _err:
taosHashDestroyIter(pIter);
tfree(buf); tfree(buf);
return -1; return -1;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册