未验证 提交 ff8c9dd9 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10491 from taosdata/feature/support_json

support json
...@@ -29,6 +29,12 @@ typedef struct SIndexOpts SIndexOpts; ...@@ -29,6 +29,12 @@ typedef struct SIndexOpts SIndexOpts;
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
typedef struct SArray SIndexMultiTerm; typedef struct SArray SIndexMultiTerm;
typedef struct SIndex SIndexJson;
typedef struct SIndexTerm SIndexJsonTerm;
typedef struct SIndexOpts SIndexJsonOpts;
typedef struct SIndexMultiTermQuery SIndexJsonMultiTermQuery;
typedef struct SArray SIndexJsonMultiTerm;
typedef enum { typedef enum {
ADD_VALUE, // add index colume value ADD_VALUE, // add index colume value
DEL_VALUE, // delete index column value DEL_VALUE, // delete index column value
...@@ -39,24 +45,108 @@ typedef enum { ...@@ -39,24 +45,108 @@ typedef enum {
} SIndexOperOnColumn; } SIndexOperOnColumn;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3 } EIndexQueryType; typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3, QUERY_RANGE = 4 } EIndexQueryType;
/* /*
* @param: oper * create multi query
* * @param oper (input, relation between querys)
*/ */
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType oper); SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType oper);
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery);
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
/* /*
* @param: * destroy multi query
* @param: * @param pQuery (input, multi-query-object to be destory)
*/
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery);
/*
* add query to multi query
* @param pQuery (input, multi-query-object)
* @param term (input, single query term)
* @param type (input, single query type)
* @return error code
*/
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
/*
* open index
* @param opt (input, index opt)
* @param path (input, index path)
* @param index (output, index object)
* @return error code
*/
int indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
/*
* close index
* @param index (input, index to be closed)
* @return error code
*/ */
int indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
void indexClose(SIndex* index); void indexClose(SIndex* index);
int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
int indexDelete(SIndex* index, SIndexMultiTermQuery* query); /*
int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result); * insert terms into index
int indexRebuild(SIndex* index, SIndexOpts* opt); * @param index (input, index object)
* @param term (input, terms inserted into index)
* @param uid (input, uid of terms)
* @return error code
*/
int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
/*
* delete terms that meet query condition
* @param index (input, index object)
* @param query (input, condition query to deleted)
* @return error code
*/
int indexDelete(SIndex* index, SIndexMultiTermQuery* query);
/*
* search index
* @param index (input, index object)
* @param query (input, multi query condition)
* @param result(output, query result)
* @return error code
*/
int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
/*
* rebuild index
* @param index (input, index object)
* @parma opt (input, rebuild index opts)
* @return error code
*/
int indexRebuild(SIndex* index, SIndexOpts* opt);
/*
* open index
* @param opt (input,index json opt)
* @param path (input, index json path)
* @param index (output, index json object)
* @return error code
*/
int tIndexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
/*
* close index
* @param index (input, index to be closed)
* @return void
*/
void tIndexJsonClose(SIndexJson* index);
/*
* insert terms into index
* @param index (input, index object)
* @param term (input, terms inserted into index)
* @param uid (input, uid of terms)
* @return error code
*/
int tIndexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
/*
* search index
* @param index (input, index object)
* @param query (input, multi query condition)
* @param result(output, query result)
* @return error code
*/
int tIndexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
/* /*
* @param * @param
* @param * @param
......
...@@ -122,30 +122,53 @@ typedef struct TFileCacheKey { ...@@ -122,30 +122,53 @@ typedef struct TFileCacheKey {
int indexFlushCacheToTFile(SIndex* sIdx, void*); int indexFlushCacheToTFile(SIndex* sIdx, void*);
int32_t indexSerialCacheKey(ICacheKey* key, char* buf); int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
#define indexFatal(...) \ // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
do { \
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ #define indexFatal(...) \
do { \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLog("index FATAL ", 255, __VA_ARGS__); \
} \
} while (0)
#define indexError(...) \
do { \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLog("index ERROR ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexError(...) \ #define indexWarn(...) \
do { \ do { \
if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("index WARN ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexWarn(...) \ #define indexInfo(...) \
do { \ do { \
if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_INFO) { \
taosPrintLog("index ", 255, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexInfo(...) \ #define indexDebug(...) \
do { \ do { \
if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexDebug(...) \ #define indexTrace(...) \
do { \ do { \
if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \
} \
} while (0) } while (0)
#define indexTrace(...) \
do { \ #define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ #define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
do { \
uint8_t oldTy = ty; \
ty = (ty >> 4) | exTy; \
ty = (ty << 4) | oldTy; \
} while (0) } while (0)
#ifdef __cplusplus #ifdef __cplusplus
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_INDEX_COMM_H_
#define _TD_INDEX_COMM_H_
#ifdef __cplusplus
extern "C" {
#endif
extern char JSON_COLUMN[];
extern char JSON_VALUE_DELIM;
char* indexPackJsonData(SIndexTerm* itm);
#ifdef __cplusplus
}
#endif
#endif
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* or later ("AGPL"), as published by the Free Software Foundation. * Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT * This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
void* indexQhandle = NULL; void* indexQhandle = NULL;
static char JSON_COLUMN[] = "JSON";
void indexInit() { void indexInit() {
// refactor later // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
...@@ -63,6 +65,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); ...@@ -63,6 +65,9 @@ static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv);
static void indexMergeSameKey(SArray* result, TFileValue* tv); static void indexMergeSameKey(SArray* result, TFileValue* tv);
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
pthread_once(&isInit, indexInit); pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex)); SIndex* sIdx = calloc(1, sizeof(SIndex));
...@@ -148,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -148,7 +153,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
...@@ -163,7 +168,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -163,7 +168,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName)}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
IndexCache** cache = taosHashGet(index->colObj, buf, sz); IndexCache** cache = taosHashGet(index->colObj, buf, sz);
...@@ -330,8 +335,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result ...@@ -330,8 +335,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
IndexCache* cache = NULL; IndexCache* cache = NULL;
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)}; ICacheKey key = {
int32_t sz = indexSerialCacheKey(&key, buf); .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
int32_t sz = indexSerialCacheKey(&key, buf);
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
...@@ -555,11 +561,17 @@ END: ...@@ -555,11 +561,17 @@ END:
} }
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
char* p = buf; char* p = buf;
SERIALIZE_MEM_TO_BUF(buf, key, suid); SERIALIZE_MEM_TO_BUF(buf, key, suid);
SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_VAR_TO_BUF(buf, '_', char);
// SERIALIZE_MEM_TO_BUF(buf, key, colType); // SERIALIZE_MEM_TO_BUF(buf, key, colType);
// SERIALIZE_VAR_TO_BUF(buf, '_', char); // SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); if (hasJson) {
SERIALIZE_STR_VAR_TO_BUF(buf, JSON_COLUMN, strlen(JSON_COLUMN));
} else {
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}
return buf - p; return buf - p;
} }
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "index_cache.h" #include "index_cache.h"
#include "index_comm.h"
#include "index_util.h" #include "index_util.h"
#include "tcompare.h" #include "tcompare.h"
#include "tsched.h" #include "tsched.h"
...@@ -44,8 +45,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in ...@@ -44,8 +45,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
indexError("failed to create index cache"); indexError("failed to create index cache");
return NULL; return NULL;
}; };
cache->mem = indexInternalCacheCreate(type); cache->mem = indexInternalCacheCreate(type);
cache->colName = tstrdup(colName); cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
cache->type = type; cache->type = type;
cache->index = idx; cache->index = idx;
cache->version = 0; cache->version = 0;
...@@ -207,11 +209,11 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -207,11 +209,11 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
} }
} }
} }
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
if (cache == NULL) { if (cache == NULL) {
return -1; return -1;
} }
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
IndexCache* pCache = cache; IndexCache* pCache = cache;
indexCacheRef(pCache); indexCacheRef(pCache);
...@@ -222,8 +224,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { ...@@ -222,8 +224,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
} }
// set up key // set up key
ct->colType = term->colType; ct->colType = term->colType;
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); if (hasJson) {
memcpy(ct->colVal, term->colVal, term->nColVal); ct->colVal = indexPackJsonData(term);
} else {
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
memcpy(ct->colVal, term->colVal, term->nColVal);
}
ct->version = atomic_add_fetch_32(&pCache->version, 1); ct->version = atomic_add_fetch_32(&pCache->version, 1);
// set value // set value
ct->uid = uid; ct->uid = uid;
...@@ -294,13 +300,22 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV ...@@ -294,13 +300,22 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType; EIndexQueryType qtype = query->qType;
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
char* p = term->colVal;
if (hasJson) {
p = indexPackJsonData(term);
}
CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
int ret = indexQueryMem(mem, &ct, qtype, result, s); int ret = indexQueryMem(mem, &ct, qtype, result, s);
if (ret == 0 && *s != kTypeDeletion) { if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm // continue search in imm
ret = indexQueryMem(imm, &ct, qtype, result, s); ret = indexQueryMem(imm, &ct, qtype, result, s);
} }
if (hasJson) {
tfree(p);
}
indexMemUnRef(mem); indexMemUnRef(mem);
indexMemUnRef(imm); indexMemUnRef(imm);
...@@ -367,6 +382,8 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) { ...@@ -367,6 +382,8 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
} }
static MemTable* indexInternalCacheCreate(int8_t type) { static MemTable* indexInternalCacheCreate(int8_t type) {
type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
MemTable* tbl = calloc(1, sizeof(MemTable)); MemTable* tbl = calloc(1, sizeof(MemTable));
indexMemRef(tbl); indexMemRef(tbl);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
...@@ -389,9 +406,6 @@ static bool indexCacheIteratorNext(Iterate* itera) { ...@@ -389,9 +406,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
IterateValue* iv = &itera->val; IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);
// IterateValue* iv = &itera->val;
// IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))};
bool next = tSkipListIterNext(iter); bool next = tSkipListIterNext(iter);
if (next) { if (next) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
...@@ -411,10 +425,6 @@ static bool indexCacheIteratorNext(Iterate* itera) { ...@@ -411,10 +425,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
taosArrayPush(iv->val, &ct->uid); taosArrayPush(iv->val, &ct->uid);
} }
// IterateValue* iv = &itera->val;
// iterateValueDestroy(iv, true);
//*iv = tIterVal;
return next; return next;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index.h"
#include "indexInt.h"
char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&';
char* indexPackJsonData(SIndexTerm* itm) {
/*
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
*/
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
char* buf = (char*)calloc(1, sz);
char* p = buf;
memcpy(p, itm->colName, itm->nColName);
p += itm->nColName;
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
p += sizeof(JSON_VALUE_DELIM);
memcpy(p, &ty, sizeof(ty));
p += sizeof(ty);
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
p += sizeof(JSON_VALUE_DELIM);
memcpy(p, itm->colVal, itm->nColVal);
return buf;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "index.h"
#include "indexInt.h"
int tIndexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
// handle
return indexOpen(opts, path, index);
}
int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
}
return indexPut(index, terms, uid);
// handle put
}
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
SArray *terms = tq->query;
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
}
return indexSearch(index, tq, result);
// handle search
}
void tIndexJsonClose(SIndexJson *index) {
return indexClose(index);
// handle close
}
...@@ -15,6 +15,7 @@ p * ...@@ -15,6 +15,7 @@ p *
#include "index_tfile.h" #include "index_tfile.h"
#include "index.h" #include "index.h"
#include "index_comm.h"
#include "index_fst.h" #include "index_fst.h"
#include "index_fst_counting_writer.h" #include "index_fst_counting_writer.h"
#include "index_util.h" #include "index_util.h"
...@@ -186,13 +187,20 @@ void tfileReaderDestroy(TFileReader* reader) { ...@@ -186,13 +187,20 @@ void tfileReaderDestroy(TFileReader* reader) {
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
EIndexQueryType qtype = query->qType; EIndexQueryType qtype = query->qType;
int ret = -1; int ret = -1;
// refactor to callback later // refactor to callback later
if (qtype == QUERY_TERM) { if (qtype == QUERY_TERM) {
uint64_t offset; uint64_t offset;
FstSlice key = fstSliceCreate(term->colVal, term->nColVal); char* p = term->colVal;
uint64_t sz = term->nColVal;
if (hasJson) {
p = indexPackJsonData(term);
sz = strlen(p);
}
FstSlice key = fstSliceCreate(p, sz);
if (fstGet(reader->fst, &key, &offset)) { if (fstGet(reader->fst, &key, &offset)) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName,
term->colVal); term->colVal);
...@@ -202,10 +210,17 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul ...@@ -202,10 +210,17 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
term->colVal); term->colVal);
} }
fstSliceDestroy(&key); fstSliceDestroy(&key);
if (hasJson) {
free(p);
}
} else if (qtype == QUERY_PREFIX) { } else if (qtype == QUERY_PREFIX) {
// handle later // handle later
// //
} else { } else if (qtype == QUERY_SUFFIX) {
// handle later
} else if (qtype == QUERY_REGEX) {
// handle later
} else if (qtype == QUERY_RANGE) {
// handle later // handle later
} }
tfileReaderUnRef(reader); tfileReaderUnRef(reader);
...@@ -260,6 +275,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { ...@@ -260,6 +275,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
__compar_fn_t fn; __compar_fn_t fn;
int8_t colType = tw->header.colType; int8_t colType = tw->header.colType;
colType = INDEX_TYPE_GET_TYPE(colType);
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
fn = tfileStrCompare; fn = tfileStrCompare;
} else { } else {
...@@ -557,6 +573,8 @@ static int tfileWriteHeader(TFileWriter* writer) { ...@@ -557,6 +573,8 @@ static int tfileWriteHeader(TFileWriter* writer) {
static int tfileWriteData(TFileWriter* write, TFileValue* tval) { static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
TFileHeader* header = &write->header; TFileHeader* header = &write->header;
uint8_t colType = header->colType; uint8_t colType = header->colType;
colType = INDEX_TYPE_GET_TYPE(colType);
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) { if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal)); FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
if (fstBuilderInsert(write->fb, key, tval->offset)) { if (fstBuilderInsert(write->fb, key, tval->offset)) {
...@@ -586,11 +604,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) { ...@@ -586,11 +604,10 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
if (nread == -1) { if (nread == -1) {
indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno,
errno, reader->ctx->file.buf); reader->ctx->file.buf);
} else { } else {
indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
reader->ctx->file.buf);
} }
// assert(nread == sizeof(buf)); // assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf));
......
...@@ -2,6 +2,7 @@ add_executable(indexTest "") ...@@ -2,6 +2,7 @@ add_executable(indexTest "")
add_executable(fstTest "") add_executable(fstTest "")
add_executable(fstUT "") add_executable(fstUT "")
add_executable(UtilUT "") add_executable(UtilUT "")
add_executable(jsonUT "")
target_sources(indexTest target_sources(indexTest
PRIVATE PRIVATE
...@@ -21,6 +22,10 @@ target_sources(UtilUT ...@@ -21,6 +22,10 @@ target_sources(UtilUT
"utilUT.cc" "utilUT.cc"
) )
target_sources(jsonUT
PRIVATE
"jsonUT.cc"
)
target_include_directories ( indexTest target_include_directories ( indexTest
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/index" "${CMAKE_SOURCE_DIR}/include/libs/index"
...@@ -43,6 +48,12 @@ target_include_directories ( UtilUT ...@@ -43,6 +48,12 @@ target_include_directories ( UtilUT
"${CMAKE_SOURCE_DIR}/include/libs/index" "${CMAKE_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories (jsonUT
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/index"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries (indexTest target_link_libraries (indexTest
os os
util util
...@@ -73,6 +84,13 @@ target_link_libraries (UtilUT ...@@ -73,6 +84,13 @@ target_link_libraries (UtilUT
index index
) )
target_link_libraries (jsonUT
os
util
common
gtest_main
index
)
#add_test( #add_test(
# NAME index_test # NAME index_test
......
...@@ -301,13 +301,18 @@ void validateTFile(char* arg) { ...@@ -301,13 +301,18 @@ void validateTFile(char* arg) {
} }
} }
void iterTFileReader(char* path, char* ver) { void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
int version = atoi(ver); // tfInit();
TFileReader* reader = tfileReaderOpen(path, 0, version, "tag1");
Iterate* iter = tfileIteratorCreate(reader); uint64_t suid = atoi(uid);
bool tn = iter ? iter->next(iter) : false; int version = atoi(ver);
int count = 0;
int termCount = 0; TFileReader* reader = tfileReaderOpen(path, suid, version, colName);
Iterate* iter = tfileIteratorCreate(reader);
bool tn = iter ? iter->next(iter) : false;
int count = 0;
int termCount = 0;
while (tn == true) { while (tn == true) {
count++; count++;
IterateValue* cv = iter->getValue(iter); IterateValue* cv = iter->getValue(iter);
...@@ -323,9 +328,9 @@ void iterTFileReader(char* path, char* ver) { ...@@ -323,9 +328,9 @@ void iterTFileReader(char* path, char* ver) {
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
// tool to check all kind of fst test // tool to check all kind of fst test
// if (argc > 1) { validateTFile(argv[1]); } // if (argc > 1) { validateTFile(argv[1]); }
if (argc > 2) { if (argc > 4) {
// opt // path suid colName ver
iterTFileReader(argv[1], argv[2]); iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
} }
// checkFstCheckIterator(); // checkFstCheckIterator();
// checkFstLongTerm(); // checkFstLongTerm();
......
...@@ -213,21 +213,21 @@ class FstEnv : public ::testing::Test { ...@@ -213,21 +213,21 @@ class FstEnv : public ::testing::Test {
TEST_F(FstEnv, writeNormal) { TEST_F(FstEnv, writeNormal) {
fst->CreateWriter(); fst->CreateWriter();
std::string str("aa"); std::string str("11");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
str[0] = 'a' + i; str[0] = '1' + i;
str.resize(2); str.resize(2);
assert(fst->Put(str, i) == true); assert(fst->Put(str, i) == true);
} }
// order failed // order failed
assert(fst->Put("aa", 1) == false); assert(fst->Put("11", 1) == false);
fst->DestroyWriter(); fst->DestroyWriter();
fst->CreateReader(); fst->CreateReader();
uint64_t val; uint64_t val;
assert(fst->Get("a", &val) == false); assert(fst->Get("1", &val) == false);
assert(fst->Get("aa", &val) == true); assert(fst->Get("11", &val) == true);
assert(val == 0); assert(val == 0);
std::vector<uint64_t> rlt; std::vector<uint64_t> rlt;
...@@ -235,3 +235,19 @@ TEST_F(FstEnv, writeNormal) { ...@@ -235,3 +235,19 @@ TEST_F(FstEnv, writeNormal) {
assert(fst->Search(ctx, rlt) == true); assert(fst->Search(ctx, rlt) == true);
} }
TEST_F(FstEnv, WriteMillonrRecord) {} TEST_F(FstEnv, WriteMillonrRecord) {}
TEST_F(FstEnv, writeAbNormal) {
fst->CreateWriter();
std::string str1("voltage&\b&ab");
std::string str2("voltbge&\b&ab");
fst->Put(str1, 1);
fst->Put(str2, 2);
fst->DestroyWriter();
fst->CreateReader();
uint64_t val;
assert(fst->Get("1", &val) == false);
assert(fst->Get("voltage&\b&ab", &val) == true);
assert(val == 1);
}
#include <gtest/gtest.h>
#include <algorithm>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_fst.h"
#include "index_fst_counting_writer.h"
#include "index_fst_util.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tglobal.h"
#include "tskiplist.h"
#include "tutil.h"
static std::string dir = "/tmp/json";
class JsonEnv : public ::testing::Test {
protected:
virtual void SetUp() {
taosRemoveDir(dir.c_str());
taosMkDir(dir.c_str());
printf("set up\n");
opts = indexOptsCreate();
int ret = tIndexJsonOpen(opts, dir.c_str(), &index);
assert(ret == 0);
}
virtual void TearDown() {
tIndexJsonClose(index);
indexOptsDestroy(opts);
printf("destory\n");
}
SIndexJsonOpts* opts;
SIndexJson* index;
};
TEST_F(JsonEnv, testWrite) {
{
std::string colName("test");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("voltage");
std::string colVal("ab1");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("voltage");
std::string colVal("123");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test");
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
assert(100 == taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
}
TEST_F(JsonEnv, testWriteMillonData) {
{
std::string colName("test");
std::string colVal("ab");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 100; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test");
std::string colVal("ab");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
tIndexJsonSearch(index, mq, result);
assert(100 == taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
}
...@@ -286,15 +286,17 @@ void uvOnWriteCb(uv_write_t* req, int status) { ...@@ -286,15 +286,17 @@ void uvOnWriteCb(uv_write_t* req, int status) {
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
if (status == 0) { if (status == 0) {
tTrace("server conn %p data already was written on stream", conn); tTrace("server conn %p data already was written on stream", conn);
assert(taosArrayGetSize(conn->srvMsgs) >= 1); if (conn->srvMsgs != NULL) {
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); assert(taosArrayGetSize(conn->srvMsgs) >= 1);
taosArrayRemove(conn->srvMsgs, 0); SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0);
destroySmsg(msg); taosArrayRemove(conn->srvMsgs, 0);
destroySmsg(msg);
// send second data, just use for push
if (taosArrayGetSize(conn->srvMsgs) > 0) { // send second data, just use for push
msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); if (taosArrayGetSize(conn->srvMsgs) > 0) {
uvStartSendRespInternal(msg); msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0);
uvStartSendRespInternal(msg);
}
} }
} else { } else {
tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); tError("server conn %p failed to write data, %s", conn, uv_err_name(status));
...@@ -615,7 +617,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { ...@@ -615,7 +617,7 @@ static void destroyConn(SSrvConn* conn, bool clear) {
SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i); SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i);
destroySmsg(msg); destroySmsg(msg);
} }
taosArrayDestroy(conn->srvMsgs); conn->srvMsgs = taosArrayDestroy(conn->srvMsgs);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
if (clear) { if (clear) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册