提交 e3090485 编写于 作者: dengyihao's avatar dengyihao

handle read/write crash concurrently

上级 236b8bc3
......@@ -181,9 +181,9 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
break;
} else if (cache->imm != NULL) {
// TODO: wake up by condition variable
// pthread_mutex_unlock(&cache->mtx);
pthread_mutex_unlock(&cache->mtx);
taosMsleep(50);
// pthread_mutex_lock(&cache->mtx);
pthread_mutex_lock(&cache->mtx);
} else {
indexCacheRef(cache);
cache->imm = cache->mem;
......
......@@ -15,6 +15,7 @@
#include <gtest/gtest.h>
#include <iostream>
#include <string>
#include <thread>
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
......@@ -25,6 +26,9 @@
#include "tskiplist.h"
#include "tutil.h"
using namespace std;
#define NUM_OF_THREAD 10
class DebugInfo {
public:
DebugInfo(const char* str) : info(str) {
......@@ -41,6 +45,7 @@ class DebugInfo {
private:
std::string info;
};
class FstWriter {
public:
FstWriter() {
......@@ -637,6 +642,23 @@ class IndexObj {
indexMultiTermDestroy(terms);
return numOfTable;
}
int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
std::string tColVal = colVal;
for (int i = 0; i < numOfTable; i++) {
tColVal[tColVal.size() - 1] = 'a' + i % 26;
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 10; i++) {
int ret = Put(terms, i);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
return numOfTable;
}
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
numOfWrite += taosArrayGetSize(fvs);
......@@ -659,6 +681,14 @@ class IndexObj {
return taosArrayGetSize(result);
// assert(taosArrayGetSize(result) == targetSize);
}
void PutOne(const std::string& colName, const std::string& colVal) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
Put(terms, 10);
indexMultiTermDestroy(terms);
}
void Debug() {
std::cout << "numOfWrite:" << numOfWrite << std::endl;
std::cout << "numOfRead:" << numOfRead << std::endl;
......@@ -758,15 +788,38 @@ TEST_F(IndexEnv2, testIndexOpen) {
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp/test";
if (index->Init(path) != 0) {}
if (index->Init(path) != 0) {
// r
std::cout << "failed to init" << std::endl;
}
int numOfTable = 100 * 10000;
index->WriteMillonData("tag1", "Hello", numOfTable);
int target = index->SearchOne("tag1", "Hello");
assert(numOfTable == target);
}
static void write_and_search(IndexObj* idx) {
std::string colName("tag1"), colVal("Hello");
int target = idx->SearchOne("tag1", "Hello");
idx->PutOne(colName, colVal);
}
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
std::string path = "/tmp/cache_and_tfile";
if (index->Init(path) != 0) {
// opt
}
index->WriteMultiMillonData("tag1", "Hello", 200000);
std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) {
//
threads[i] = std::thread(write_and_search, index);
}
for (int i = 0; i < NUM_OF_THREAD; i++) {
// TOD
threads[i].join();
}
}
TEST_F(IndexEnv2, testIndex_multi_thread_write) {
std::string path = "/tmp";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册