提交 f0da893b 编写于 作者: dengyihao's avatar dengyihao

fix: avoid multi thread read/write crash

上级 bad68656
...@@ -31,7 +31,7 @@ if (${BUILD_WITH_INVERTEDINDEX}) ...@@ -31,7 +31,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX}) endif(${BUILD_WITH_INVERTEDINDEX})
#if (${BUILD_TEST}) if (${BUILD_TEST})
# add_subdirectory(test) add_subdirectory(test)
#endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -40,7 +40,7 @@ typedef struct TFileHeader { ...@@ -40,7 +40,7 @@ typedef struct TFileHeader {
} TFileHeader; } TFileHeader;
#pragma pack(pop) #pragma pack(pop)
#define TFILE_HEADER_SIZE (sizeof(TFileHeader)) #define TFILE_HEADER_SIZE (sizeof(TFileHeader))
#define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t)) #define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t))
typedef struct TFileValue { typedef struct TFileValue {
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "lucene++/Lucene_c.h" #include "lucene++/Lucene_c.h"
#endif #endif
#define INDEX_NUM_OF_THREADS 4 #define INDEX_NUM_OF_THREADS 1
#define INDEX_QUEUE_SIZE 200 #define INDEX_QUEUE_SIZE 200
#define INDEX_DATA_BOOL_NULL 0x02 #define INDEX_DATA_BOOL_NULL 0x02
...@@ -117,7 +117,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -117,7 +117,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
sIdx->path = tstrdup(path); sIdx->path = tstrdup(path);
taosThreadMutexInit(&sIdx->mtx, NULL); taosThreadMutexInit(&sIdx->mtx, NULL);
tsem_init(&sIdx->sem, 0, 0); tsem_init(&sIdx->sem, 0, 0);
// taosThreadCondInit(&sIdx->finished, NULL);
sIdx->refId = indexAddRef(sIdx); sIdx->refId = indexAddRef(sIdx);
indexAcquireRef(sIdx->refId); indexAcquireRef(sIdx->refId);
...@@ -143,13 +142,13 @@ void indexDestroy(void* handle) { ...@@ -143,13 +142,13 @@ void indexDestroy(void* handle) {
return; return;
} }
void indexClose(SIndex* sIdx) { void indexClose(SIndex* sIdx) {
indexReleaseRef(sIdx->refId);
bool ref = 0; bool ref = 0;
if (sIdx->colObj != NULL) { if (sIdx->colObj != NULL) {
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
indexCacheForceToMerge((void*)(*pCache)); indexCacheForceToMerge((void*)(*pCache));
indexInfo("%s wait to merge", (*pCache)->colName);
indexWait((void*)(sIdx)); indexWait((void*)(sIdx));
iter = taosHashIterate(sIdx->colObj, iter); iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache); indexCacheUnRef(*pCache);
...@@ -157,7 +156,7 @@ void indexClose(SIndex* sIdx) { ...@@ -157,7 +156,7 @@ void indexClose(SIndex* sIdx) {
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
sIdx->colObj = NULL; sIdx->colObj = NULL;
} }
// taosMsleep(1000 * 5); indexReleaseRef(sIdx->refId);
indexRemoveRef(sIdx->refId); indexRemoveRef(sIdx->refId);
} }
int64_t indexAddRef(void* p) { int64_t indexAddRef(void* p) {
...@@ -554,8 +553,29 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { ...@@ -554,8 +553,29 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
taosMemoryFree(value->colVal); taosMemoryFree(value->colVal);
value->colVal = NULL; value->colVal = NULL;
} }
static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
int64_t ver = CACHE_VERSION(cache);
taosThreadMutexLock(&sIdx->mtx);
TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key);
if (trd != NULL) {
if (ver < trd->header.version) {
ver = trd->header.version + 1;
} else {
ver += 1;
}
indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver);
tfileReaderUnRef(trd);
} else {
indexInfo("not found reader base %p", trd);
}
taosThreadMutexUnlock(&sIdx->mtx);
return ver;
}
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
int32_t version = CACHE_VERSION(cache); int64_t version = indexGetAvaialbleVer(sIdx, cache);
indexInfo("file name version: %" PRId64 "", version);
uint8_t colType = cache->type; uint8_t colType = cache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType); TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
...@@ -575,6 +595,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { ...@@ -575,6 +595,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
if (reader == NULL) { if (reader == NULL) {
return -1; return -1;
} }
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
......
...@@ -335,6 +335,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in ...@@ -335,6 +335,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
taosThreadCondInit(&cache->finished, NULL); taosThreadCondInit(&cache->finished, NULL);
indexCacheRef(cache); indexCacheRef(cache);
if (idx != NULL) {
indexAcquireRef(idx->refId);
}
return cache; return cache;
} }
void indexCacheDebug(IndexCache* cache) { void indexCacheDebug(IndexCache* cache) {
...@@ -426,13 +429,16 @@ void indexCacheDestroy(void* cache) { ...@@ -426,13 +429,16 @@ void indexCacheDestroy(void* cache) {
if (pCache == NULL) { if (pCache == NULL) {
return; return;
} }
indexMemUnRef(pCache->mem); indexMemUnRef(pCache->mem);
indexMemUnRef(pCache->imm); indexMemUnRef(pCache->imm);
taosMemoryFree(pCache->colName); taosMemoryFree(pCache->colName);
taosThreadMutexDestroy(&pCache->mtx); taosThreadMutexDestroy(&pCache->mtx);
taosThreadCondDestroy(&pCache->finished); taosThreadCondDestroy(&pCache->finished);
if (pCache->index != NULL) {
indexReleaseRef(((SIndex*)pCache->index)->refId);
}
taosMemoryFree(pCache); taosMemoryFree(pCache);
} }
......
...@@ -97,6 +97,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -97,6 +97,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
int64_t file_size; int64_t file_size;
taosStatFile(path, &file_size, NULL); taosStatFile(path, &file_size, NULL);
ctx->file.size = (int)file_size; ctx->file.size = (int)file_size;
} else { } else {
// ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); // ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
......
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
p *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation. * or later ("AGPL"), as published by the Free Software Foundation.
...@@ -152,10 +151,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { ...@@ -152,10 +151,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
char buf[128] = {0}; char buf[128] = {0};
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
indexInfo("Try to get key: %s", buf);
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
if (reader == NULL) { if (reader == NULL || *reader == NULL) {
indexInfo("failed to get key: %s", buf);
return NULL; return NULL;
} }
indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf);
tfileReaderRef(*reader); tfileReaderRef(*reader);
return *reader; return *reader;
...@@ -165,9 +167,10 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { ...@@ -165,9 +167,10 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
int32_t sz = indexSerialCacheKey(key, buf); int32_t sz = indexSerialCacheKey(key, buf);
// remove last version index reader // remove last version index reader
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL) { if (p != NULL && *p != NULL) {
TFileReader* oldReader = *p; TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, sz); taosHashRemove(tcache->tableCache, buf, sz);
indexInfo("found %s, remove file %s", buf, oldReader->ctx->file.buf);
oldReader->remove = true; oldReader->remove = true;
tfileReaderUnRef(oldReader); tfileReaderUnRef(oldReader);
} }
...@@ -180,7 +183,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { ...@@ -180,7 +183,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
if (reader == NULL) { if (reader == NULL) {
return NULL; return NULL;
} }
reader->ctx = ctx; reader->ctx = ctx;
if (0 != tfileReaderVerify(reader)) { if (0 != tfileReaderVerify(reader)) {
...@@ -202,6 +204,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { ...@@ -202,6 +204,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
tfileReaderDestroy(reader); tfileReaderDestroy(reader);
return NULL; return NULL;
} }
reader->remove = false;
return reader; return reader;
} }
...@@ -536,7 +539,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -536,7 +539,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr()); indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
return NULL; return NULL;
} }
indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
return reader; return reader;
......
...@@ -674,10 +674,13 @@ class IndexObj { ...@@ -674,10 +674,13 @@ class IndexObj {
// opt // opt
numOfWrite = 0; numOfWrite = 0;
numOfRead = 0; numOfRead = 0;
indexInit(); // indexInit();
} }
int Init(const std::string& dir) { int Init(const std::string& dir, bool remove = true) {
taosRemoveDir(dir.c_str()); if (remove) {
taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
}
taosMkDir(dir.c_str()); taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx); int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) { if (ret != 0) {
...@@ -838,8 +841,11 @@ class IndexEnv2 : public ::testing::Test { ...@@ -838,8 +841,11 @@ class IndexEnv2 : public ::testing::Test {
initLog(); initLog();
index = new IndexObj(); index = new IndexObj();
} }
virtual void TearDown() { delete index; } virtual void TearDown() {
IndexObj* index; // taosMsleep(500);
delete index;
}
IndexObj* index;
}; };
TEST_F(IndexEnv2, testIndexOpen) { TEST_F(IndexEnv2, testIndexOpen) {
std::string path = TD_TMP_DIR_PATH "test"; std::string path = TD_TMP_DIR_PATH "test";
...@@ -951,6 +957,8 @@ static void single_write_and_search(IndexObj* idx) { ...@@ -951,6 +957,8 @@ static void single_write_and_search(IndexObj* idx) {
target = idx->SearchOne("tag2", "Test"); target = idx->SearchOne("tag2", "Test");
} }
static void multi_write_and_search(IndexObj* idx) { static void multi_write_and_search(IndexObj* idx) {
idx->PutOne("tag1", "Hello");
idx->PutOne("tag2", "Test");
int target = idx->SearchOne("tag1", "Hello"); int target = idx->SearchOne("tag1", "Hello");
target = idx->SearchOne("tag2", "Test"); target = idx->SearchOne("tag2", "Test");
idx->WriteMultiMillonData("tag1", "hello world test", 100 * 100); idx->WriteMultiMillonData("tag1", "hello world test", 100 * 100);
...@@ -992,16 +1000,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { ...@@ -992,16 +1000,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
} }
} }
// TEST_F(IndexEnv2, testIndex_restart) { TEST_F(IndexEnv2, testIndex_restart) {
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile"; std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
// if (index->Init(path) != 0) { if (index->Init(path, false) != 0) {
// } }
// index->SearchOneTarget("tag1", "Hello", 10); index->SearchOneTarget("tag1", "Hello", 10);
// index->SearchOneTarget("tag2", "Test", 10); index->SearchOneTarget("tag2", "Test", 10);
//} }
// TEST_F(IndexEnv2, testIndex_restart1) { // TEST_F(IndexEnv2, testIndex_restart1) {
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile"; // std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
// if (index->Init(path) != 0) { // if (index->Init(path, false) != 0) {
// } // }
// index->ReadMultiMillonData("tag1", "coding"); // index->ReadMultiMillonData("tag1", "coding");
// index->SearchOneTarget("tag1", "Hello", 10); // index->SearchOneTarget("tag1", "Hello", 10);
...@@ -1018,16 +1026,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { ...@@ -1018,16 +1026,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; // std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
// assert(3 == index->SearchOne("tag1", "Hello")); // assert(3 == index->SearchOne("tag1", "Hello"));
//} //}
// TEST_F(IndexEnv2, testIndexMultiTag) { TEST_F(IndexEnv2, testIndexMultiTag) {
// std::string path = TD_TMP_DIR_PATH "multi_tag"; std::string path = TD_TMP_DIR_PATH "multi_tag";
// if (index->Init(path) != 0) { if (index->Init(path) != 0) {
// } }
// int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// int32_t num = 1000 * 10000; int32_t num = 100 * 100;
// index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
// std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
// // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
//} }
TEST_F(IndexEnv2, testLongComVal1) { TEST_F(IndexEnv2, testLongComVal1) {
std::string path = TD_TMP_DIR_PATH "long_colVal"; std::string path = TD_TMP_DIR_PATH "long_colVal";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册