提交 79adde0b 编写于 作者: H Hongze Cheng

Merge branch '3.0' of github.com:taosdata/TDengine into feature/vnode

......@@ -38,6 +38,7 @@ typedef struct WriterCtx {
int fd;
bool readOnly;
char buf[256];
int size;
} file;
struct {
int32_t capa;
......
......@@ -73,6 +73,7 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
#ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { goto END; }
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
sIdx->cVersion = 1;
sIdx->path = calloc(1, strlen(path) + 1);
......@@ -83,6 +84,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
return 0;
#endif
END:
if (sIdx != NULL) { indexClose(sIdx); }
*index = NULL;
return -1;
......@@ -135,7 +138,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
......@@ -150,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
......@@ -212,7 +215,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
indexInterResultsDestroy(interResults);
#endif
return 1;
return 0;
}
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) {
......@@ -310,7 +313,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
pthread_mutex_lock(&sIdx->mtx);
char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName};
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
......
......@@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000
#define MEM_TERM_LIMIT 5 * 10000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
......
......@@ -72,9 +72,17 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if (readOnly == false) {
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.fd = tfOpenCreateWriteAppend(path);
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
} else {
// ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.fd = tfOpenRead(path);
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
}
memcpy(ctx->file.buf, path, strlen(path));
if (ctx->file.fd < 0) {
......@@ -104,6 +112,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
free(ctx->mem.buf);
} else {
tfClose(ctx->file.fd);
ctx->flush(ctx);
if (remove) { unlink(ctx->file.buf); }
}
free(ctx);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
p *
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
......@@ -45,12 +45,13 @@ static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static int tfileGetFileList(const char* path, SArray* result);
static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
static SArray* tfileGetFileList(const char* path);
static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version);
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version);
TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache));
......@@ -59,21 +60,24 @@ TFileCache* tfileCacheCreate(const char* path) {
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64;
SArray* files = taosArrayInit(4, sizeof(void*));
tfileGetFileList(path, files);
taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
SArray* files = tfileGetFileList(path);
uint64_t suid;
int32_t colId, version;
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i);
if (0 != tfileParseFileName(file, &suid, (int*)&colId, (int*)&version)) {
// refactor later, use colname and version info
char colName[256] = {0};
if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) {
indexInfo("try parse invalid file: %s, skip it", file);
continue;
}
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
char fullName[256] = {0};
sprintf(fullName, "%s/%s", path, file);
WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64);
if (wc == NULL) {
indexError("failed to open index:%s", file);
goto End;
......@@ -200,12 +204,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
}
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) { return NULL; }
......@@ -218,13 +219,11 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
return tfileWriterCreate(wcx, &tfh);
}
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) {
char filename[128] = {0};
int32_t coldId = 1;
tfileGenFileName(filename, suid, coldId, version);
char fullname[256] = {0};
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
// indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
if (wc == NULL) { return NULL; }
TFileReader* reader = tfileReaderCreate(wc);
......@@ -324,7 +323,6 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
}
// write data
indexError("--------Begin----------------");
for (size_t i = 0; i < sz; i++) {
// TODO, fst batch write later
TFileValue* v = taosArrayGetP((SArray*)data, i);
......@@ -332,11 +330,10 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId));
} else {
indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
(int)taosArrayGetSize(v->tableId));
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
// (int)taosArrayGetSize(v->tableId));
}
}
indexError("--------End----------------");
fstBuilderFinish(tw->fb);
fstBuilderDestroy(tw->fb);
tw->fb = NULL;
......@@ -361,6 +358,7 @@ IndexTFile* indexTFileCreate(const char* path) {
return tfile;
}
void indexTFileDestroy(IndexTFile* tfile) {
if (tfile == NULL) { return; }
tfileCacheDestroy(tfile->cache);
free(tfile);
}
......@@ -550,6 +548,9 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
//
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf);
} else {
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
errno, reader->ctx->file.fd, reader->ctx->file.buf);
}
// assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf));
......@@ -558,13 +559,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
}
static int tfileReaderLoadFst(TFileReader* reader) {
// current load fst into memory, refactor it later
static int FST_MAX_SIZE = 64 * 1024;
static int FST_MAX_SIZE = 64 * 1024 * 1024;
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
if (buf == NULL) { return -1; }
WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf);
// we assuse fst size less than FST_MAX_SIZE
assert(nread > 0 && nread < FST_MAX_SIZE);
......@@ -608,19 +610,26 @@ void tfileReaderUnRef(TFileReader* reader) {
}
}
static int tfileGetFileList(const char* path, SArray* result) {
static SArray* tfileGetFileList(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*));
DIR* dir = opendir(path);
if (NULL == dir) { return -1; }
if (NULL == dir) { return NULL; }
struct dirent* entry;
while ((entry = readdir(dir)) != NULL) {
if (entry->d_type && DT_DIR) { continue; }
size_t len = strlen(entry->d_name);
char* buf = calloc(1, len + 1);
memcpy(buf, entry->d_name, len);
taosArrayPush(result, &buf);
taosArrayPush(files, &buf);
}
closedir(dir);
return 0;
taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
return files;
}
static int tfileRmExpireFile(SArray* result) {
// TODO(yihao): remove expire tindex after restart
......@@ -641,15 +650,21 @@ static int tfileCompare(const void* a, const void* b) {
if (ret == 0) { return ret; }
return ret < 0 ? -1 : 1;
}
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) {
sprintf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version);
return;
}
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%d.tindex", suid, col, version)) {
// read suid & colid & version success
return 0;
}
return -1;
}
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version) {
sprintf(filename, "%" PRIu64 "-%s-%d.tindex", suid, col, version);
return;
}
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version) {
char filename[128] = {0};
tfileGenFileName(filename, suid, col, version);
sprintf(fullname, "%s/%s", path, filename);
}
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
......@@ -638,7 +639,7 @@ class IndexObj {
indexInit();
}
int Init(const std::string& dir) {
taosRemoveDir(dir.c_str());
// taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
int ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) {
......@@ -663,10 +664,11 @@ class IndexObj {
int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world",
size_t numOfTable = 100 * 10000) {
std::string tColVal = colVal;
size_t colValSize = tColVal.size();
for (int i = 0; i < numOfTable; i++) {
tColVal[tColVal.size() - 1] = 'a' + i % 26;
tColVal[i % colValSize] = 'a' + i % 26;
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
tColVal.c_str(), tColVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 10; i++) {
......@@ -695,7 +697,13 @@ class IndexObj {
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; }
int64_t s = taosGetTimestampUs();
if (Search(mq, result) == 0) {
int64_t e = taosGetTimestampUs();
std::cout << "search one successfully and time cost:" << e - s << std::endl;
} else {
}
int sz = taosArrayGetSize(result);
indexMultiTermQueryDestroy(mq);
taosArrayDestroy(result);
......@@ -810,7 +818,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
}
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp/test1";
std::string path = "/tmp/testxxx";
if (index->Init(path) != 0) {
// r
std::cout << "failed to init" << std::endl;
......@@ -826,6 +834,10 @@ static void write_and_search(IndexObj* idx) {
std::string colName("tag1"), colVal("Hello");
int target = idx->SearchOne("tag1", "Hello");
std::cout << "search: " << target << std::endl;
target = idx->SearchOne("tag2", "Test");
std::cout << "search: " << target << std::endl;
idx->PutOne(colName, colVal);
}
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
......@@ -833,7 +845,10 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
if (index->Init(path) != 0) {
// opt
}
index->WriteMultiMillonData("tag1", "Hello", 200000);
index->PutOne("tag1", "Hello");
index->PutOne("tag2", "Test");
index->WriteMultiMillonData("tag1", "Hello", 50 * 10000);
index->WriteMultiMillonData("tag2", "Test", 50 * 10000);
std::thread threads[NUM_OF_THREAD];
for (int i = 0; i < NUM_OF_THREAD; i++) {
......@@ -847,15 +862,15 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
}
TEST_F(IndexEnv2, testIndex_restart) {
std::string path = "/tmp";
std::string path = "/tmp/test1";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndex_performance) {
std::string path = "/tmp";
std::string path = "/tmp/test2";
if (index->Init(path) != 0) {}
}
TEST_F(IndexEnv2, testIndexMultiTag) {
std::string path = "/tmp";
std::string path = "/tmp/test3";
if (index->Init(path) != 0) {}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册