提交 236b8bc3 编写于 作者: dengyihao's avatar dengyihao

handle read/write crash concurrently

上级 261ef100
...@@ -142,7 +142,8 @@ uint64_t fstStateInputLen(FstState* state); ...@@ -142,7 +142,8 @@ uint64_t fstStateInputLen(FstState* state);
// end_addr // end_addr
uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data); uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data);
uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes); uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes);
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans); uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes,
uint64_t nTrans);
// input // input
uint8_t fstStateInput(FstState* state, FstNode* node); uint8_t fstStateInput(FstState* state, FstNode* node);
uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i); uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i);
...@@ -255,9 +256,10 @@ typedef struct FstMeta { ...@@ -255,9 +256,10 @@ typedef struct FstMeta {
} FstMeta; } FstMeta;
typedef struct Fst { typedef struct Fst {
FstMeta* meta; FstMeta* meta;
FstSlice* data; // FstSlice* data; //
FstNode* root; // FstNode* root; //
pthread_mutex_t mtx;
} Fst; } Fst;
// refactor simple function // refactor simple function
...@@ -310,7 +312,8 @@ StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* sta ...@@ -310,7 +312,8 @@ StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* sta
void swsResultDestroy(StreamWithStateResult* result); void swsResultDestroy(StreamWithStateResult* result);
typedef void* (*StreamCallback)(void*); typedef void* (*StreamCallback)(void*);
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max); StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min,
FstBoundWithData* max);
void streamWithStateDestroy(StreamWithState* sws); void streamWithStateDestroy(StreamWithState* sws);
......
...@@ -77,6 +77,7 @@ typedef struct TFileReader { ...@@ -77,6 +77,7 @@ typedef struct TFileReader {
Fst* fst; Fst* fst;
WriterCtx* ctx; WriterCtx* ctx;
TFileHeader header; TFileHeader header;
bool remove;
} TFileReader; } TFileReader;
typedef struct IndexTFile { typedef struct IndexTFile {
......
...@@ -94,7 +94,6 @@ void indexClose(SIndex* sIdx) { ...@@ -94,7 +94,6 @@ void indexClose(SIndex* sIdx) {
#endif #endif
#ifdef USE_INVERTED_INDEX #ifdef USE_INVERTED_INDEX
indexCacheDestroy(sIdx->cache);
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
...@@ -104,6 +103,7 @@ void indexClose(SIndex* sIdx) { ...@@ -104,6 +103,7 @@ void indexClose(SIndex* sIdx) {
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
pthread_mutex_destroy(&sIdx->mtx); pthread_mutex_destroy(&sIdx->mtx);
#endif #endif
free(sIdx->path);
free(sIdx); free(sIdx);
return; return;
} }
...@@ -459,7 +459,7 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { ...@@ -459,7 +459,7 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
} else { } else {
if (value->val != NULL) { taosArrayClear(value->val); } if (value->val != NULL) { taosArrayClear(value->val); }
} }
// free(value->colVal); free(value->colVal);
value->colVal = NULL; value->colVal = NULL;
} }
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 200 #define MEM_TERM_LIMIT 10000 * 10
// ref index_cache.h:22 // ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \ //#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
...@@ -110,7 +110,10 @@ void indexCacheDestroySkiplist(SSkipList* slt) { ...@@ -110,7 +110,10 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
while (tSkipListIterNext(iter)) { while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) {} if (ct != NULL) {
free(ct->colVal);
free(ct);
}
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
tSkipListDestroy(slt); tSkipListDestroy(slt);
...@@ -178,9 +181,9 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -178,9 +181,9 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
break; break;
} else if (cache->imm != NULL) { } else if (cache->imm != NULL) {
// TODO: wake up by condition variable // TODO: wake up by condition variable
pthread_mutex_unlock(&cache->mtx); // pthread_mutex_unlock(&cache->mtx);
taosMsleep(50); taosMsleep(50);
pthread_mutex_lock(&cache->mtx); // pthread_mutex_lock(&cache->mtx);
} else { } else {
indexCacheRef(cache); indexCacheRef(cache);
cache->imm = cache->mem; cache->imm = cache->mem;
...@@ -271,7 +274,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV ...@@ -271,7 +274,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType; EIndexQueryType qtype = query->qType;
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)}; CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
indexCacheDebug(pCache); // indexCacheDebug(pCache);
int ret = indexQueryMem(mem, &ct, qtype, result, s); int ret = indexQueryMem(mem, &ct, qtype, result, s);
if (ret == 0 && *s != kTypeDeletion) { if (ret == 0 && *s != kTypeDeletion) {
......
...@@ -354,7 +354,8 @@ uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes ...@@ -354,7 +354,8 @@ uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size
- FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes); - FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes);
} }
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans) { uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes,
uint64_t nTrans) {
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes); uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes);
uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes; uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes;
return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size
...@@ -403,8 +404,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i ...@@ -403,8 +404,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i
FstSlice* slice = &node->data; FstSlice* slice = &node->data;
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - node->nTrans - uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) -
(i * tSizes) - tSizes; node->nTrans - (i * tSizes) - tSizes;
uint8_t* data = fstSliceData(slice, NULL); uint8_t* data = fstSliceData(slice, NULL);
return unpackDelta(data + at, tSizes, node->end); return unpackDelta(data + at, tSizes, node->end);
} }
...@@ -595,7 +596,8 @@ FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) { ...@@ -595,7 +596,8 @@ FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) {
n->isFinal = fstStateIsFinalState(&st); // s.is_final_state(); n->isFinal = fstStateIsFinalState(&st); // s.is_final_state();
n->nTrans = nTrans; n->nTrans = nTrans;
n->sizes = sz; n->sizes = sz;
n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans); n->finalOutput =
fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans);
} }
return n; return n;
} }
...@@ -875,9 +877,7 @@ void* fstBuilderInsertInner(FstBuilder* b) { ...@@ -875,9 +877,7 @@ void* fstBuilderInsertInner(FstBuilder* b) {
// b->wrt = NULL; // b->wrt = NULL;
return b->wrt; return b->wrt;
} }
void fstBuilderFinish(FstBuilder* b) { void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); }
fstBuilderInsertInner(b);
}
FstSlice fstNodeAsSlice(FstNode* node) { FstSlice fstNodeAsSlice(FstNode* node) {
FstSlice* slice = &node->data; FstSlice* slice = &node->data;
...@@ -894,9 +894,7 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) { ...@@ -894,9 +894,7 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) {
return trn; return trn;
} }
void fstLastTransitionDestroy(FstLastTransition* trn) { void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); }
free(trn);
}
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) { void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) {
FstLastTransition* trn = unNode->last; FstLastTransition* trn = unNode->last;
if (trn == NULL) { return; } if (trn == NULL) { return; }
...@@ -959,9 +957,10 @@ Fst* fstCreate(FstSlice* slice) { ...@@ -959,9 +957,10 @@ Fst* fstCreate(FstSlice* slice) {
fst->meta->checkSum = checkSum; fst->meta->checkSum = checkSum;
FstSlice* s = calloc(1, sizeof(FstSlice)); FstSlice* s = calloc(1, sizeof(FstSlice));
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice)); *s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1);
fst->data = s; fst->data = s;
pthread_mutex_init(&fst->mtx, NULL);
return fst; return fst;
FST_CREAT_FAILED: FST_CREAT_FAILED:
...@@ -973,14 +972,18 @@ void fstDestroy(Fst* fst) { ...@@ -973,14 +972,18 @@ void fstDestroy(Fst* fst) {
free(fst->meta); free(fst->meta);
fstSliceDestroy(fst->data); fstSliceDestroy(fst->data);
free(fst->data); free(fst->data);
pthread_mutex_destroy(&fst->mtx);
} }
free(fst); free(fst);
} }
bool fstGet(Fst* fst, FstSlice* b, Output* out) { bool fstGet(Fst* fst, FstSlice* b, Output* out) {
// dec lock range
pthread_mutex_lock(&fst->mtx);
FstNode* root = fstGetRoot(fst); FstNode* root = fstGetRoot(fst);
Output tOut = 0; Output tOut = 0;
int32_t len; int32_t len;
uint8_t* data = fstSliceData(b, &len); uint8_t* data = fstSliceData(b, &len);
SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*)); SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*));
...@@ -988,7 +991,10 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { ...@@ -988,7 +991,10 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
for (uint32_t i = 0; i < len; i++) { for (uint32_t i = 0; i < len; i++) {
uint8_t inp = data[i]; uint8_t inp = data[i];
Output res = 0; Output res = 0;
if (false == fstNodeFindInput(root, inp, &res)) { return false; } if (false == fstNodeFindInput(root, inp, &res)) {
pthread_mutex_unlock(&fst->mtx);
return false;
}
FstTransition trn; FstTransition trn;
fstNodeGetTransitionAt(root, res, &trn); fstNodeGetTransitionAt(root, res, &trn);
...@@ -997,6 +1003,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { ...@@ -997,6 +1003,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
taosArrayPush(nodes, &root); taosArrayPush(nodes, &root);
} }
if (!FST_NODE_IS_FINAL(root)) { if (!FST_NODE_IS_FINAL(root)) {
pthread_mutex_unlock(&fst->mtx);
return false; return false;
} else { } else {
tOut = tOut + FST_NODE_FINAL_OUTPUT(root); tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
...@@ -1007,13 +1014,13 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { ...@@ -1007,13 +1014,13 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
fstNodeDestroy(*node); fstNodeDestroy(*node);
} }
taosArrayDestroy(nodes); taosArrayDestroy(nodes);
fst->root = NULL; fst->root = NULL;
pthread_mutex_unlock(&fst->mtx);
*out = tOut; *out = tOut;
return true; return true;
} }
FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) { FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) {
// refactor later
return fstStreamBuilderCreate(fst, ctx); return fstStreamBuilderCreate(fst, ctx);
} }
StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
...@@ -1021,24 +1028,28 @@ StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { ...@@ -1021,24 +1028,28 @@ StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max); return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max);
} }
FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) { FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
// refactor later
return fstStreamBuilderCreate(fst, ctx); return fstStreamBuilderCreate(fst, ctx);
} }
FstNode* fstGetRoot(Fst* fst) { FstNode* fstGetRoot(Fst* fst) {
if (fst->root != NULL) { return fst->root; } // pthread_mutex_lock(&fst->mtx);
if (fst->root != NULL) {
// pthread_mutex_unlock(&fst->mtx);
return fst->root;
}
CompiledAddr rAddr = fstGetRootAddr(fst); CompiledAddr rAddr = fstGetRootAddr(fst);
fst->root = fstGetNode(fst, rAddr); fst->root = fstGetNode(fst, rAddr);
// pthread_mutex_unlock(&fst->mtx);
return fst->root; return fst->root;
} }
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) { FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
// refactor later
return fstNodeCreate(fst->meta->version, addr, fst->data); return fstNodeCreate(fst->meta->version, addr, fst->data);
} }
FstType fstGetType(Fst* fst) { FstType fstGetType(Fst* fst) { return fst->meta->ty; }
return fst->meta->ty; CompiledAddr fstGetRootAddr(Fst* fst) { return fst->meta->rootAddr; }
}
CompiledAddr fstGetRootAddr(Fst* fst) {
return fst->meta->rootAddr;
}
Output fstEmptyFinalOutput(Fst* fst, bool* null) { Output fstEmptyFinalOutput(Fst* fst, bool* null) {
Output res = 0; Output res = 0;
...@@ -1053,8 +1064,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) { ...@@ -1053,8 +1064,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) {
} }
bool fstVerify(Fst* fst) { bool fstVerify(Fst* fst) {
uint32_t checkSum = fst->meta->checkSum; uint32_t len, checkSum = fst->meta->checkSum;
int32_t len;
uint8_t* data = fstSliceData(fst->data, &len); uint8_t* data = fstSliceData(fst->data, &len);
TSCKSUM initSum = 0; TSCKSUM initSum = 0;
if (!taosCheckChecksumWhole(data, len)) { return false; } if (!taosCheckChecksumWhole(data, len)) { return false; }
...@@ -1094,15 +1104,12 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) { ...@@ -1094,15 +1104,12 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) {
} }
} }
bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { return bound->type == Excluded ? false : true; }
return bound->type == Excluded ? false : true;
}
void fstBoundDestroy(FstBoundWithData* bound) { void fstBoundDestroy(FstBoundWithData* bound) { free(bound); }
free(bound);
}
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max) { StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min,
FstBoundWithData* max) {
StreamWithState* sws = calloc(1, sizeof(StreamWithState)); StreamWithState* sws = calloc(1, sizeof(StreamWithState));
if (sws == NULL) { return NULL; } if (sws == NULL) { return NULL; }
...@@ -1131,7 +1138,9 @@ void streamWithStateDestroy(StreamWithState* sws) { ...@@ -1131,7 +1138,9 @@ void streamWithStateDestroy(StreamWithState* sws) {
bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
AutomationCtx* aut = sws->aut; AutomationCtx* aut = sws->aut;
if (fstBoundWithDataIsEmpty(min)) { if (fstBoundWithDataIsEmpty(min)) {
if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); } if (fstBoundWithDataIsIncluded(min)) {
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
}
StreamState s = {.node = fstGetRoot(sws->fst), StreamState s = {.node = fstGetRoot(sws->fst),
.trans = 0, .trans = 0,
.out = {.null = false, .out = 0}, .out = {.null = false, .out = 0},
...@@ -1203,7 +1212,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { ...@@ -1203,7 +1212,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
uint64_t trans = s->trans; uint64_t trans = s->trans;
FstTransition trn; FstTransition trn;
fstNodeGetTransitionAt(n, trans - 1, &trn); fstNodeGetTransitionAt(n, trans - 1, &trn);
StreamState s = {.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState}; StreamState s = {
.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
taosArrayPush(sws->stack, &s); taosArrayPush(sws->stack, &s);
return true; return true;
} }
...@@ -1260,9 +1270,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb ...@@ -1260,9 +1270,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
size_t isz = taosArrayGetSize(sws->inp); size_t isz = taosArrayGetSize(sws->inp);
uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t)); uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t));
for (uint32_t i = 0; i < isz; i++) { for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); }
buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i);
}
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
taosArrayDestroyEx(sws->stack, streamStateDestroy); taosArrayDestroyEx(sws->stack, streamStateDestroy);
...@@ -1327,8 +1335,8 @@ FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) { ...@@ -1327,8 +1335,8 @@ FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) {
} }
void fstStreamBuilderDestroy(FstStreamBuilder* b) { void fstStreamBuilderDestroy(FstStreamBuilder* b) {
fstSliceDestroy(&b->min->data); fstSliceDestroy(&b->min->data);
tfree(b->min);
fstSliceDestroy(&b->max->data); fstSliceDestroy(&b->max->data);
tfree(b->min);
tfree(b->max); tfree(b->max);
free(b); free(b);
} }
......
...@@ -17,9 +17,7 @@ ...@@ -17,9 +17,7 @@
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) { StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { if (nsv == NULL) { return NULL; }
return NULL;
}
nsv->kind = kind; nsv->kind = kind;
nsv->type = ty; nsv->type = ty;
...@@ -37,9 +35,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp ...@@ -37,9 +35,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp
} }
void startWithStateValueDestroy(void* val) { void startWithStateValueDestroy(void* val) {
StartWithStateValue* sv = (StartWithStateValue*)val; StartWithStateValue* sv = (StartWithStateValue*)val;
if (sv == NULL) { if (sv == NULL) { return; }
return;
}
if (sv->type == FST_INT) { if (sv->type == FST_INT) {
// //
...@@ -52,9 +48,7 @@ void startWithStateValueDestroy(void* val) { ...@@ -52,9 +48,7 @@ void startWithStateValueDestroy(void* val) {
} }
StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { if (nsv == NULL) { return NULL; }
return NULL;
}
nsv->kind = sv->kind; nsv->kind = sv->kind;
nsv->type = sv->type; nsv->type = sv->type;
...@@ -94,14 +88,10 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) { ...@@ -94,14 +88,10 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) {
static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; } static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) { static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
StartWithStateValue* ssv = (StartWithStateValue*)state; StartWithStateValue* ssv = (StartWithStateValue*)state;
if (ssv == NULL || ctx == NULL) { if (ssv == NULL || ctx == NULL) { return NULL; }
return NULL;
}
char* data = ctx->data; char* data = ctx->data;
if (ssv->kind == Done) { if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); }
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
}
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
int val = ssv->val + 1; int val = ssv->val + 1;
...@@ -138,9 +128,7 @@ AutomationFunc automFuncs[] = { ...@@ -138,9 +128,7 @@ AutomationFunc automFuncs[] = {
AutomationCtx* automCtxCreate(void* data, AutomationType atype) { AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx)); AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx));
if (ctx == NULL) { if (ctx == NULL) { return NULL; }
return NULL;
}
StartWithStateValue* sv = NULL; StartWithStateValue* sv = NULL;
if (atype == AUTOMATION_ALWAYS) { if (atype == AUTOMATION_ALWAYS) {
......
...@@ -53,13 +53,6 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, ...@@ -53,13 +53,6 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
static TFileReader* tfileReaderCreateImpl(WriterCtx* ctx) {
TFileReader* reader = tfileReaderCreate(ctx);
tfileReaderRef(reader);
// tfileSerialCacheKey(&key, buf);
return reader;
}
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache)); TFileCache* tcache = calloc(1, sizeof(TFileCache));
if (tcache == NULL) { return NULL; } if (tcache == NULL) { return NULL; }
...@@ -88,13 +81,16 @@ TFileCache* tfileCacheCreate(const char* path) { ...@@ -88,13 +81,16 @@ TFileCache* tfileCacheCreate(const char* path) {
} }
char buf[128] = {0}; char buf[128] = {0};
TFileReader* reader = tfileReaderCreateImpl(wc); TFileReader* reader = tfileReaderCreate(wc);
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid, TFileCacheKey key = {.suid = header->suid,
.colName = header->colName, .colName = header->colName,
.nColName = strlen(header->colName), .nColName = strlen(header->colName),
.colType = header->colType}; .colType = header->colType};
tfileSerialCacheKey(&key, buf); tfileSerialCacheKey(&key, buf);
tfileReaderRef(reader);
// indexTable
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
} }
taosArrayDestroyEx(files, tfileDestroyFileName); taosArrayDestroyEx(files, tfileDestroyFileName);
...@@ -139,6 +135,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) ...@@ -139,6 +135,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
if (p != NULL) { if (p != NULL) {
TFileReader* oldReader = *p; TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, strlen(buf)); taosHashRemove(tcache->tableCache, buf, strlen(buf));
oldReader->remove = true;
tfileReaderUnRef(oldReader); tfileReaderUnRef(oldReader);
} }
...@@ -172,7 +169,7 @@ void tfileReaderDestroy(TFileReader* reader) { ...@@ -172,7 +169,7 @@ void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) { return; } if (reader == NULL) { return; }
// T_REF_INC(reader); // T_REF_INC(reader);
fstDestroy(reader->fst); fstDestroy(reader->fst);
writerCtxDestroy(reader->ctx, true); writerCtxDestroy(reader->ctx, reader->remove);
free(reader); free(reader);
} }
...@@ -232,7 +229,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c ...@@ -232,7 +229,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
if (wc == NULL) { return NULL; } if (wc == NULL) { return NULL; }
TFileReader* reader = tfileReaderCreateImpl(wc); TFileReader* reader = tfileReaderCreate(wc);
return reader; return reader;
// tfileSerialCacheKey(&key, buf); // tfileSerialCacheKey(&key, buf);
...@@ -330,13 +327,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -330,13 +327,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
return -1; return -1;
} }
// write fst // write fst
indexError("--------Begin----------------");
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
// TODO, fst batch write later // TODO, fst batch write later
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
if (tfileWriteData(tw, v) == 0) { if (tfileWriteData(tw, v) == 0) {
// //
} }
indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
} }
indexError("--------End----------------");
fstBuilderFinish(tw->fb); fstBuilderFinish(tw->fb);
fstBuilderDestroy(tw->fb); fstBuilderDestroy(tw->fb);
tw->fb = NULL; tw->fb = NULL;
...@@ -360,7 +360,10 @@ IndexTFile* indexTFileCreate(const char* path) { ...@@ -360,7 +360,10 @@ IndexTFile* indexTFileCreate(const char* path) {
tfile->cache = tfileCacheCreate(path); tfile->cache = tfileCacheCreate(path);
return tfile; return tfile;
} }
void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); } void IndexTFileDestroy(IndexTFile* tfile) {
tfileCacheDestroy(tfile->cache);
free(tfile);
}
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
int ret = -1; int ret = -1;
......
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <thread>
#include <vector> #include <vector>
#include "index.h" #include "index.h"
#include "indexInt.h" #include "indexInt.h"
...@@ -42,7 +43,8 @@ class FstWriter { ...@@ -42,7 +43,8 @@ class FstWriter {
class FstReadMemory { class FstReadMemory {
public: public:
FstReadMemory(size_t size) { FstReadMemory(size_t size, const std::string& fileName = fileName) {
tfInit();
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); _wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
_w = fstCountingWriterCreate(_wc); _w = fstCountingWriterCreate(_wc);
_size = size; _size = size;
...@@ -101,6 +103,7 @@ class FstReadMemory { ...@@ -101,6 +103,7 @@ class FstReadMemory {
fstDestroy(_fst); fstDestroy(_fst);
fstSliceDestroy(&_s); fstSliceDestroy(&_s);
writerCtxDestroy(_wc, false); writerCtxDestroy(_wc, false);
tfCleanup();
} }
private: private:
...@@ -165,8 +168,44 @@ void checkFstCheckIterator() { ...@@ -165,8 +168,44 @@ void checkFstCheckIterator() {
delete m; delete m;
tfCleanup(); tfCleanup();
} }
int main() {
checkFstCheckIterator(); void fst_get(Fst* fst) {
for (int i = 0; i < 10000; i++) {
std::string term = "Hello";
FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size());
uint64_t offset = 0;
bool ret = fstGet(fst, &key, &offset);
if (ret == false) {
std::cout << "not found" << std::endl;
} else {
std::cout << "found value:" << offset << std::endl;
}
}
}
#define NUM_OF_THREAD 10
void validateTFile(char* arg) {
tfInit();
std::thread threads[NUM_OF_THREAD];
// std::vector<std::thread> threads;
TFileReader* reader = tfileReaderOpen(arg, 0, 8417, "tag1");
for (int i = 0; i < NUM_OF_THREAD; i++) {
threads[i] = std::thread(fst_get, reader->fst);
// threads.push_back(fst_get, reader->fst);
// std::thread t(fst_get, reader->fst);
}
for (int i = 0; i < NUM_OF_THREAD; i++) {
// wait join
threads[i].join();
}
tfCleanup();
}
int main(int argc, char* argv[]) {
if (argc > 1) { validateTFile(argv[1]); }
// checkFstCheckIterator();
// checkFstPrefixSearch(); // checkFstPrefixSearch();
return 1; return 1;
} }
...@@ -332,6 +332,8 @@ class TFileObj { ...@@ -332,6 +332,8 @@ class TFileObj {
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage")
: path_(path), colName_(colName) { : path_(path), colName_(colName) {
colId_ = 10; colId_ = 10;
reader_ = NULL;
writer_ = NULL;
// Do Nothing // Do Nothing
// //
} }
...@@ -527,6 +529,7 @@ TEST_F(IndexCacheEnv, cache_test) { ...@@ -527,6 +529,7 @@ TEST_F(IndexCacheEnv, cache_test) {
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
coj->Put(term, colId, version++, suid++); coj->Put(term, colId, version++, suid++);
// indexTermDestry(term);
} }
{ {
std::string colVal("v3"); std::string colVal("v3");
...@@ -687,7 +690,7 @@ class IndexEnv2 : public ::testing::Test { ...@@ -687,7 +690,7 @@ class IndexEnv2 : public ::testing::Test {
IndexObj* index; IndexObj* index;
}; };
TEST_F(IndexEnv2, testIndexOpen) { TEST_F(IndexEnv2, testIndexOpen) {
std::string path = "/tmp"; std::string path = "/tmp/test";
if (index->Init(path) != 0) { if (index->Init(path) != 0) {
std::cout << "failed to init index" << std::endl; std::cout << "failed to init index" << std::endl;
exit(1); exit(1);
...@@ -723,10 +726,24 @@ TEST_F(IndexEnv2, testIndexOpen) { ...@@ -723,10 +726,24 @@ TEST_F(IndexEnv2, testIndexOpen) {
} }
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
{ {
size_t size = 200;
std::string colName("tag1"), colVal("Hello"); std::string colName("tag1"), colVal("Hello");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = size * 3; i < size * 4; i++) {
int tableId = i;
int ret = index->Put(terms, tableId);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("tag1"), colVal("Hello");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); colVal.c_str(), colVal.size());
...@@ -735,16 +752,16 @@ TEST_F(IndexEnv2, testIndexOpen) { ...@@ -735,16 +752,16 @@ TEST_F(IndexEnv2, testIndexOpen) {
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
index->Search(mq, result); index->Search(mq, result);
std::cout << "target size: " << taosArrayGetSize(result) << std::endl; std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
// assert(taosArrayGetSize(result) == targetSize); assert(taosArrayGetSize(result) == 400);
} }
} }
TEST_F(IndexEnv2, testIndex_TrigeFlush) { TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp"; std::string path = "/tmp/test";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {}
int numOfTable = 100 * 10000; int numOfTable = 100 * 10000;
index->WriteMillonData("tag1", "Hello world", numOfTable); index->WriteMillonData("tag1", "Hello", numOfTable);
int target = index->SearchOne("tag1", "Hello world"); int target = index->SearchOne("tag1", "Hello");
assert(numOfTable == target); assert(numOfTable == target);
} }
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
...@@ -769,4 +786,7 @@ TEST_F(IndexEnv2, testIndex_performance) { ...@@ -769,4 +786,7 @@ TEST_F(IndexEnv2, testIndex_performance) {
std::string path = "/tmp"; std::string path = "/tmp";
if (index->Init(path) != 0) {} if (index->Init(path) != 0) {}
} }
TEST_F(IndexEnv2, testIndexMultiTag) {} TEST_F(IndexEnv2, testIndexMultiTag) {
std::string path = "/tmp";
if (index->Init(path) != 0) {}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册