提交 f5c13c13 编写于 作者: dengyihao's avatar dengyihao

support json

上级 b02b53eb
...@@ -29,6 +29,12 @@ typedef struct SIndexOpts SIndexOpts; ...@@ -29,6 +29,12 @@ typedef struct SIndexOpts SIndexOpts;
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
typedef struct SArray SIndexMultiTerm; typedef struct SArray SIndexMultiTerm;
typedef struct SIndex SIndexJson;
typedef struct SIndexTerm SIndexJsonTerm;
typedef struct SIndexOpts SIndexJsonOpts;
typedef struct SIndexMultiTermQuery SIndexJsonMultiTermQuery;
typedef struct SArray SIndexJsonMultiTerm;
typedef enum { typedef enum {
ADD_VALUE, // add index colume value ADD_VALUE, // add index colume value
DEL_VALUE, // delete index column value DEL_VALUE, // delete index column value
...@@ -39,24 +45,108 @@ typedef enum { ...@@ -39,24 +45,108 @@ typedef enum {
} SIndexOperOnColumn; } SIndexOperOnColumn;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3 } EIndexQueryType; typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3, QUERY_RANGE = 4 } EIndexQueryType;
/* /*
* @param: oper * create multi query
* * @param oper (input, relation between querys)
*/ */
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType oper); SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType oper);
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery);
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
/* /*
* @param: * destroy multi query
* @param: * @param pQuery (input, multi-query-object to be destory)
*/
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery);
/*
* add query to multi query
* @param pQuery (input, multi-query-object)
* @param term (input, single query term)
* @param type (input, single query type)
* @return error code
*/
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
/*
* open index
* @param opt (input, index opt)
* @param path (input, index path)
* @param index (output, index object)
* @return error code
*/
int indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
/*
* close index
* @param index (input, index to be closed)
* @return error code
*/ */
int indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
void indexClose(SIndex* index); void indexClose(SIndex* index);
int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
int indexDelete(SIndex* index, SIndexMultiTermQuery* query); /*
int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result); * insert terms into index
int indexRebuild(SIndex* index, SIndexOpts* opt); * @param index (input, index object)
* @param term (input, terms inserted into index)
* @param uid (input, uid of terms)
* @return error code
*/
int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
/*
* delete terms that meet query condition
* @param index (input, index object)
* @param query (input, condition query to deleted)
* @return error code
*/
int indexDelete(SIndex* index, SIndexMultiTermQuery* query);
/*
* search index
* @param index (input, index object)
* @param query (input, multi query condition)
* @param result(output, query result)
* @return error code
*/
int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
/*
* rebuild index
* @param index (input, index object)
* @parma opt (input, rebuild index opts)
* @return error code
*/
int indexRebuild(SIndex* index, SIndexOpts* opt);
/*
* open index
* @param opt (input,index json opt)
* @param path (input, index json path)
* @param index (output, index json object)
* @return error code
*/
int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
/*
* close index
* @param index (input, index to be closed)
* @return error code
*/
int tIndexJsonClose(SIndexJson* index);
/*
* insert terms into index
* @param index (input, index object)
* @param term (input, terms inserted into index)
* @param uid (input, uid of terms)
* @return error code
*/
int tIndexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
/*
* search index
* @param index (input, index object)
* @param query (input, multi query condition)
* @param result(output, query result)
* @return error code
*/
int tIndexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
/* /*
* @param * @param
* @param * @param
......
...@@ -120,29 +120,50 @@ int indexFlushCacheToTFile(SIndex* sIdx, void*); ...@@ -120,29 +120,50 @@ int indexFlushCacheToTFile(SIndex* sIdx, void*);
int32_t indexSerialCacheKey(ICacheKey* key, char* buf); int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
#define indexFatal(...) \ #define indexFatal(...) \
do { \ do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLog("index FATAL ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexError(...) \ #define indexError(...) \
do { \ do { \
if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexWarn(...) \ #define indexWarn(...) \
do { \ do { \
if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexInfo(...) \ #define indexInfo(...) \
do { \ do { \
if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexDebug(...) \ #define indexDebug(...) \
do { \ do { \
if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexTrace(...) \ #define indexTrace(...) \
do { \ do { \
if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
do { \
uint8_t oldTy = ty; \
ty = (ty >> 4) | exTy; \
ty = (ty << 4) | oldTy; \
} while (0) } while (0)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* or later ("AGPL"), as published by the Free Software Foundation. * Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT * This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
void* indexQhandle = NULL; void* indexQhandle = NULL;
static char JSON_COLUMN[] = "JSON";
void indexInit() { void indexInit() {
// refactor later // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
...@@ -63,6 +65,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); ...@@ -63,6 +65,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv);
static void indexMergeSameKey(SArray* result, TFileValue* tv); static void indexMergeSameKey(SArray* result, TFileValue* tv);
static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
int32_t indexSerialKey(ICacheKey* key, char* buf);
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
pthread_once(&isInit, indexInit); pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex)); SIndex* sIdx = calloc(1, sizeof(SIndex));
...@@ -147,9 +152,8 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -147,9 +152,8 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; int32_t sz = indexSerialTermKey(p, buf);
int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
if (cache == NULL) { if (cache == NULL) {
...@@ -162,9 +166,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -162,9 +166,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; // ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialTermKey(p, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
assert(*cache != NULL); assert(*cache != NULL);
...@@ -554,7 +558,24 @@ END: ...@@ -554,7 +558,24 @@ END:
return -1; return -1;
} }
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { int32_t indexSerialTermKeyOfTag(SIndexTerm* p, char* buf) {
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)};
return indexSerialKey(&key, buf);
}
int32_t indexSerilaTermKeyOfJson(SIndexTerm* p, char* buf) {
ICacheKey key = {.suid = p->suid, .colName = JSON_COLUMN, .nColName = strlen(JSON_COLUMN)};
return indexSerialKey(&key, buf);
}
int32_t indexSerialTermKey(SIndexTerm* itm, char* buf) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(itm->colType, TSDB_DATA_TYPE_JSON);
if (hasJson) {
return indexSerilaTermKeyOfJson(itm, buf);
} else {
return indexSerialTermKeyOfTag(itm, buf);
}
}
int32_t indexSerialKey(ICacheKey* key, char* buf) {
char* p = buf; char* p = buf;
SERIALIZE_MEM_TO_BUF(buf, key, suid); SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_VAR_TO_BUF(buf, '_', char);
...@@ -563,3 +584,13 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { ...@@ -563,3 +584,13 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
return buf - p; return buf - p;
} }
// int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
// char* p = buf;
// SERIALIZE_MEM_TO_BUF(buf, key, suid);
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
// // SERIALIZE_MEM_TO_BUF(buf, key, colType);
// // SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
// return buf - p;
//}
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
#define MEM_THRESHOLD 1024 * 1024 #define MEM_THRESHOLD 1024 * 1024
#define MEM_ESTIMATE_RADIO 1.5 #define MEM_ESTIMATE_RADIO 1.5
static char JSON_COLUMN[] = "JSON";
static void indexMemRef(MemTable* tbl); static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl);
...@@ -45,7 +47,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in ...@@ -45,7 +47,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
return NULL; return NULL;
}; };
cache->mem = indexInternalCacheCreate(type); cache->mem = indexInternalCacheCreate(type);
cache->colName = tstrdup(colName); cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
cache->type = type; cache->type = type;
cache->index = idx; cache->index = idx;
cache->version = 0; cache->version = 0;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indeInt.h"
#include "index.h"
int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
// handle
return tIndexOpen(opts, path, index);
}
// k
int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
}
return indexPut(index, terms, uid);
// handle put
}
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *query, SArray *result) {
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
}
return indexSearch(index, query, result);
// handle search
}
int tIndexJsonClose(SIndexJson *index) {
return tIndexClose(index);
// handle close
}
...@@ -205,7 +205,11 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul ...@@ -205,7 +205,11 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
} else if (qtype == QUERY_PREFIX) { } else if (qtype == QUERY_PREFIX) {
// handle later // handle later
// //
} else { } else if (qtype == QUERY_SUFFIX) {
// handle later
} else if (qtype == QUERY_REGEX) {
// handle later
} else if (qtype == QUERY_RANGE) {
// handle later // handle later
} }
tfileReaderUnRef(reader); tfileReaderUnRef(reader);
...@@ -586,11 +590,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -586,11 +590,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
if (nread == -1) { if (nread == -1) {
indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno,
errno, reader->ctx->file.buf); reader->ctx->file.buf);
} else { } else {
indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
reader->ctx->file.buf);
} }
// assert(nread == sizeof(buf)); // assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf));
......
...@@ -216,21 +216,21 @@ class FstEnv : public ::testing::Test { ...@@ -216,21 +216,21 @@ class FstEnv : public ::testing::Test {
TEST_F(FstEnv, writeNormal) { TEST_F(FstEnv, writeNormal) {
fst->CreateWriter(); fst->CreateWriter();
std::string str("aa"); std::string str("11");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
str[0] = 'a' + i; str[0] = '1' + i;
str.resize(2); str.resize(2);
assert(fst->Put(str, i) == true); assert(fst->Put(str, i) == true);
} }
// order failed // order failed
assert(fst->Put("aa", 1) == false); assert(fst->Put("11", 1) == false);
fst->DestroyWriter(); fst->DestroyWriter();
fst->CreateReader(); fst->CreateReader();
uint64_t val; uint64_t val;
assert(fst->Get("a", &val) == false); assert(fst->Get("1", &val) == false);
assert(fst->Get("aa", &val) == true); assert(fst->Get("11", &val) == true);
assert(val == 0); assert(val == 0);
std::vector<uint64_t> rlt; std::vector<uint64_t> rlt;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册