From 2eb0053e5ad173ced8ca6817ecc67ccfe52401c4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 30 Dec 2021 22:19:56 +0800 Subject: [PATCH] handle read/write crash concurrently --- include/util/tfile.h | 3 ++- source/libs/index/src/index.c | 2 +- source/libs/index/src/index_fst.c | 18 +++++++------ .../index/src/index_fst_counting_writer.c | 19 ++++++------- source/libs/index/src/index_tfile.c | 11 +++++--- source/libs/index/test/fstTest.cc | 2 +- source/util/src/tfile.c | 27 ++++++++++++------- 7 files changed, 50 insertions(+), 32 deletions(-) diff --git a/include/util/tfile.h b/include/util/tfile.h index af4c19e7d1..b3d141c443 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -38,6 +38,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname); int64_t tfClose(int64_t tfd); int64_t tfWrite(int64_t tfd, void *buf, int64_t count); int64_t tfRead(int64_t tfd, void *buf, int64_t count); +int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset); int32_t tfFsync(int64_t tfd); bool tfValid(int64_t tfd); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); @@ -47,4 +48,4 @@ int32_t tfFtruncate(int64_t tfd, int64_t length); } #endif -#endif /*_TD_UTIL_FILE_H*/ +#endif /*_TD_UTIL_FILE_H*/ diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 167d33f51f..5167196031 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -459,7 +459,7 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { } else { if (value->val != NULL) { taosArrayClear(value->val); } } - free(value->colVal); + // free(value->colVal); value->colVal = NULL; } static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 246f50a12b..04a08dafd2 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1033,15 +1033,17 @@ FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) { } FstNode* fstGetRoot(Fst* fst) { - // pthread_mutex_lock(&fst->mtx); - if (fst->root != NULL) { - // pthread_mutex_unlock(&fst->mtx); - return fst->root; - } CompiledAddr rAddr = fstGetRootAddr(fst); - fst->root = fstGetNode(fst, rAddr); - // pthread_mutex_unlock(&fst->mtx); - return fst->root; + return fstGetNode(fst, rAddr); + // pthread_mutex_lock(&fst->mtx); + // if (fst->root != NULL) { + // // pthread_mutex_unlock(&fst->mtx); + // return fst->root; + //} + // CompiledAddr rAddr = fstGetRootAddr(fst); + // fst->root = fstGetNode(fst, rAddr); + //// pthread_mutex_unlock(&fst->mtx); + // return fst->root; } FstNode* fstGetNode(Fst* fst, CompiledAddr addr) { diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 710db563d9..7906dfea11 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -42,8 +42,8 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) { int nRead = 0; if (ctx->type == TFile) { - tfLseek(ctx->file.fd, offset, 0); - nRead = tfRead(ctx->file.fd, buf, len); + // tfLseek(ctx->file.fd, offset, 0); + nRead = tfPread(ctx->file.fd, buf, len, offset); } else { // refactor later assert(0); @@ -52,6 +52,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off } static int writeCtxDoFlush(WriterCtx* ctx) { if (ctx->type == TFile) { + // taosFsyncFile(ctx->file.fd); tfFsync(ctx->file.fd); // tfFlush(ctx->file.fd); } else { @@ -69,13 +70,15 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int // ugly code, refactor later ctx->file.readOnly = readOnly; if (readOnly == false) { + // ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.fd = tfOpenCreateWriteAppend(path); } else { - ctx->file.fd = tfOpenReadWrite(path); + // ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); + ctx->file.fd = tfOpenRead(path); } memcpy(ctx->file.buf, path, strlen(path)); if (ctx->file.fd < 0) { - indexError("open file error %d", errno); + indexError("failed to open file, error %d", errno); goto END; } } else if (ctx->type == TMemory) { @@ -101,10 +104,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { free(ctx->mem.buf); } else { tfClose(ctx->file.fd); - if (remove) { - indexError("rm file %s", ctx->file.buf); - unlink(ctx->file.buf); - } + if (remove) { unlink(ctx->file.buf); } } free(ctx); } @@ -144,7 +144,8 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; } -int fstCountingWriterFlush(FstCountingWriter* write) { + +int fstCountingWriterFlush(FstCountingWriter* write) { WriterCtx* ctx = write->wrt; ctx->flush(ctx); // write->wtr->flush diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 177132c67d..fc4f8593a1 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -149,7 +149,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { // T_REF_INC(reader); reader->ctx = ctx; - if (0 != tfileReaderLoadHeader(reader)) { tfileReaderDestroy(reader); indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, @@ -542,8 +541,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) { char buf[TFILE_HEADER_SIZE] = {0}; int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); - assert(nread == sizeof(buf)); + if (nread == -1) { + // + indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf), + errno, reader->ctx->file.fd, reader->ctx->file.buf); + } + // assert(nread == sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf)); + return 0; } static int tfileReaderLoadFst(TFileReader* reader) { @@ -576,7 +581,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* char* buf = calloc(1, total); if (buf == NULL) { return -1; } - nread = ctx->read(ctx, buf, total); + nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid)); assert(total == nread); for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); } diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index 3b5410547d..da974ce6c4 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -189,7 +189,7 @@ void validateTFile(char* arg) { std::thread threads[NUM_OF_THREAD]; // std::vector threads; - TFileReader* reader = tfileReaderOpen(arg, 0, 8417, "tag1"); + TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1"); for (int i = 0; i < NUM_OF_THREAD; i++) { threads[i] = std::thread(fst_get, reader->fst); diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 313f1d97af..4cb20802c7 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -16,21 +16,19 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" -#include "ulog.h" -#include "tutil.h" #include "tref.h" +#include "tutil.h" +#include "ulog.h" static int32_t tsFileRsetId = -1; static int8_t tfInited = 0; -static void tfCloseFile(void *p) { - taosCloseFile((int32_t)(uintptr_t)p); -} +static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); } int32_t tfInit() { int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1); - if(old == 1) return 0; + if (old == 1) return 0; tsFileRsetId = taosOpenRef(2000, tfCloseFile); if (tsFileRsetId > 0) { return 0; @@ -79,9 +77,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode return tfOpenImp(fd); } -int64_t tfClose(int64_t tfd) { - return taosRemoveRef(tsFileRsetId, tfd); -} +int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); } int64_t tfWrite(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); @@ -109,6 +105,19 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) { return ret; } +int64_t tfPread(int64_t tfd, void *buf, int64_t count, int32_t offset) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + + int64_t ret = pread(fd, buf, count, offset); + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); + + taosReleaseRef(tsFileRsetId, tfd); + return ret; +} + int32_t tfFsync(int64_t tfd) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; -- GitLab