From 7ae87c10103c801d06092ea3ddea00b9e9b671bd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 2 Mar 2022 22:06:02 +0800 Subject: [PATCH] add UT --- source/libs/index/inc/index_comm.h | 32 +++++++++++++++++++ source/libs/index/src/index_cache.c | 35 ++------------------ source/libs/index/src/index_comm.c | 48 ++++++++++++++++++++++++++++ source/libs/index/src/index_tfile.c | 13 +++++++- source/libs/index/test/fstUT.cc | 16 ++++++++++ source/libs/index/test/jsonDemo.cc | 0 source/libs/index/test/jsonUT.cc | 31 +++++++++++++++++- source/libs/transport/src/transSrv.c | 22 +++++++------ 8 files changed, 153 insertions(+), 44 deletions(-) create mode 100644 source/libs/index/inc/index_comm.h create mode 100644 source/libs/index/src/index_comm.c delete mode 100644 source/libs/index/test/jsonDemo.cc diff --git a/source/libs/index/inc/index_comm.h b/source/libs/index/inc/index_comm.h new file mode 100644 index 0000000000..0d8418ba65 --- /dev/null +++ b/source/libs/index/inc/index_comm.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_INDEX_COMM_H_ +#define _TD_INDEX_COMM_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +extern char JSON_COLUMN[]; +extern char JSON_VALUE_DELIM; + +char* indexPackJsonData(SIndexTerm* itm); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 2742830d3b..599bac3fe6 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -14,6 +14,7 @@ */ #include "index_cache.h" +#include "index_comm.h" #include "index_util.h" #include "tcompare.h" #include "tsched.h" @@ -24,9 +25,6 @@ #define MEM_THRESHOLD 1024 * 1024 #define MEM_ESTIMATE_RADIO 1.5 -static char JSON_COLUMN[] = "JSON"; -static char JSON_VALUE_DELIM = '&'; - static void indexMemRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl); @@ -211,33 +209,6 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { } } } -static char* indexCachePackJsonData(SIndexTerm* itm) { - /* - * |<-----colname---->|<-----dataType---->|<--------colVal---------->| - * |<-----string----->|<-----uint8_t----->|<----depend on dataType-->| - */ - uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType); - - int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1; - char* buf = (char*)calloc(1, sz); - char* p = buf; - - memcpy(p, itm->colName, itm->nColName); - p += itm->nColName; - - memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); - p += sizeof(JSON_VALUE_DELIM); - - memcpy(p, &ty, sizeof(ty)); - p += sizeof(ty); - - memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); - p += sizeof(JSON_VALUE_DELIM); - - memcpy(p, itm->colVal, itm->nColVal); - - return buf; -} int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { if (cache == NULL) { return -1; @@ -254,7 +225,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { // set up key ct->colType = term->colType; if (hasJson) { - ct->colVal = indexCachePackJsonData(term); + ct->colVal = indexPackJsonData(term); } else { ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); memcpy(ct->colVal, term->colVal, term->nColVal); @@ -333,7 +304,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); char* p = term->colVal; if (hasJson) { - p = indexCachePackJsonData(term); + p = indexPackJsonData(term); } CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)}; diff --git a/source/libs/index/src/index_comm.c b/source/libs/index/src/index_comm.c new file mode 100644 index 0000000000..4f3cbaa4da --- /dev/null +++ b/source/libs/index/src/index_comm.c @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "index.h" +#include "indexInt.h" + +char JSON_COLUMN[] = "JSON"; +char JSON_VALUE_DELIM = '&'; + +char* indexPackJsonData(SIndexTerm* itm) { + /* + * |<-----colname---->|<-----dataType---->|<--------colVal---------->| + * |<-----string----->|<-----uint8_t----->|<----depend on dataType-->| + */ + uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType); + + int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1; + char* buf = (char*)calloc(1, sz); + char* p = buf; + + memcpy(p, itm->colName, itm->nColName); + p += itm->nColName; + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, &ty, sizeof(ty)); + p += sizeof(ty); + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, itm->colVal, itm->nColVal); + + return buf; +} diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index a05884b790..a2089e8eee 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -15,6 +15,7 @@ p * #include "index_tfile.h" #include "index.h" +#include "index_comm.h" #include "index_fst.h" #include "index_fst_counting_writer.h" #include "index_util.h" @@ -186,13 +187,20 @@ void tfileReaderDestroy(TFileReader* reader) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { SIndexTerm* term = query->term; + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); EIndexQueryType qtype = query->qType; int ret = -1; // refactor to callback later if (qtype == QUERY_TERM) { uint64_t offset; - FstSlice key = fstSliceCreate(term->colVal, term->nColVal); + char* p = term->colVal; + uint64_t sz = term->nColVal; + if (hasJson) { + p = indexPackJsonData(term); + sz = strlen(p); + } + FstSlice key = fstSliceCreate(p, sz); if (fstGet(reader->fst, &key, &offset)) { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal); @@ -202,6 +210,9 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul term->colVal); } fstSliceDestroy(&key); + if (hasJson) { + free(p); + } } else if (qtype == QUERY_PREFIX) { // handle later // diff --git a/source/libs/index/test/fstUT.cc b/source/libs/index/test/fstUT.cc index b34fd71e9c..3bc3b6da6a 100644 --- a/source/libs/index/test/fstUT.cc +++ b/source/libs/index/test/fstUT.cc @@ -238,3 +238,19 @@ TEST_F(FstEnv, writeNormal) { assert(fst->Search(ctx, rlt) == true); } TEST_F(FstEnv, WriteMillonrRecord) {} +TEST_F(FstEnv, writeAbNormal) { + fst->CreateWriter(); + std::string str1("voltage&\b&ab"); + std::string str2("voltbge&\b&ab"); + + fst->Put(str1, 1); + fst->Put(str2, 2); + + fst->DestroyWriter(); + + fst->CreateReader(); + uint64_t val; + assert(fst->Get("1", &val) == false); + assert(fst->Get("voltage&\b&ab", &val) == true); + assert(val == 1); +} diff --git a/source/libs/index/test/jsonDemo.cc b/source/libs/index/test/jsonDemo.cc deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index 20b59add9f..e184bc49ae 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -21,6 +21,8 @@ class JsonEnv : public ::testing::Test { protected: virtual void SetUp() { taosRemoveDir(dir.c_str()); + taosMkDir(dir.c_str()); + opts = indexOptsCreate(); int ret = tIndexJsonOpen(opts, dir.c_str(), &index); assert(ret == 0); @@ -87,6 +89,33 @@ TEST_F(JsonEnv, testWrite) { assert(100 == taosArrayGetSize(result)); indexMultiTermQueryDestroy(mq); } +} +TEST_F(JsonEnv, testWriteMillonData) { + { + std::string colName("voltagefdadfa"); + std::string colVal("abxxxxxxxxxxxx"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); - // SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + { + std::string colName("voltage"); + std::string colVal("ab"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SArray* result = taosArrayInit(1, sizeof(uint64_t)); + indexMultiTermQueryAdd(mq, q, QUERY_TERM); + tIndexJsonSearch(index, mq, result); + assert(100 == taosArrayGetSize(result)); + indexMultiTermQueryDestroy(mq); + } } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index f0db054797..c7b6ca2a2c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -286,15 +286,17 @@ void uvOnWriteCb(uv_write_t* req, int status) { transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); - assert(taosArrayGetSize(conn->srvMsgs) >= 1); - SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); - taosArrayRemove(conn->srvMsgs, 0); - destroySmsg(msg); - - // send second data, just use for push - if (taosArrayGetSize(conn->srvMsgs) > 0) { - msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); - uvStartSendRespInternal(msg); + if (conn->srvMsgs != NULL) { + assert(taosArrayGetSize(conn->srvMsgs) >= 1); + SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); + taosArrayRemove(conn->srvMsgs, 0); + destroySmsg(msg); + + // send second data, just use for push + if (taosArrayGetSize(conn->srvMsgs) > 0) { + msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); + uvStartSendRespInternal(msg); + } } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); @@ -615,7 +617,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i); destroySmsg(msg); } - taosArrayDestroy(conn->srvMsgs); + conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); QUEUE_REMOVE(&conn->queue); if (clear) { -- GitLab