提交 f3eaca04 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat--tag-index

...@@ -200,11 +200,37 @@ TEST(testCase, index_filter) { ...@@ -200,11 +200,37 @@ TEST(testCase, index_filter) {
doFilterTag(opNode, result); doFilterTag(opNode, result);
EXPECT_EQ(1, taosArrayGetSize(result)); EXPECT_EQ(1, taosArrayGetSize(result));
taosArrayDestroy(result);
nodesDestroyNode(res);
}
{
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_INT, pLeft, pRight);
SArray *result = taosArrayInit(4, sizeof(uint64_t));
doFilterTag(opNode, result);
EXPECT_EQ(0, taosArrayGetSize(result));
taosArrayDestroy(result);
nodesDestroyNode(res);
}
{
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
sifMakeColumnNode(&pLeft, "test", "col", COLUMN_TYPE_TAG, TSDB_DATA_TYPE_INT);
sifMakeValueNode(&pRight, TSDB_DATA_TYPE_INT, &sifRightV);
sifMakeOpNode(&opNode, OP_TYPE_GREATER_EQUAL, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
SArray *result = taosArrayInit(4, sizeof(uint64_t));
doFilterTag(opNode, result);
EXPECT_EQ(0, taosArrayGetSize(result));
taosArrayDestroy(result); taosArrayDestroy(result);
nodesDestroyNode(res); nodesDestroyNode(res);
} }
} }
// add other greater/lower/equal/in compare func test
TEST(testCase, index_filter_varify) { TEST(testCase, index_filter_varify) {
{ {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL; SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL, *res = NULL;
......
...@@ -310,28 +310,28 @@ enum { ...@@ -310,28 +310,28 @@ enum {
}; };
int64_t gUdfTaskSeqNum = 0; int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy { typedef struct SUdfcProxy {
char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_barrier_t gUdfInitBarrier; uv_barrier_t initBarrier;
uv_loop_t gUdfdLoop; uv_loop_t uvLoop;
uv_thread_t gUdfLoopThread; uv_thread_t loopThread;
uv_async_t gUdfLoopTaskAync; uv_async_t loopTaskAync;
uv_async_t gUdfLoopStopAsync; uv_async_t loopStopAsync;
uv_mutex_t gUdfTaskQueueMutex; uv_mutex_t taskQueueMutex;
int8_t gUdfcState; int8_t udfcState;
QUEUE gUdfTaskQueue; QUEUE taskQueue;
QUEUE gUvProcTaskQueue; QUEUE uvProcTaskQueue;
int8_t initialized; int8_t initialized;
} SUdfdProxy; } SUdfcProxy;
SUdfdProxy gUdfdProxy = {0}; SUdfcProxy gUdfdProxy = {0};
typedef struct SClientUdfUvSession { typedef struct SClientUdfUvSession {
SUdfdProxy *udfc; SUdfcProxy *udfc;
int64_t severHandle; int64_t severHandle;
uv_pipe_t *udfUvPipe; uv_pipe_t *udfUvPipe;
...@@ -341,7 +341,7 @@ typedef struct SClientUdfUvSession { ...@@ -341,7 +341,7 @@ typedef struct SClientUdfUvSession {
} SClientUdfUvSession; } SClientUdfUvSession;
typedef struct SClientUvTaskNode { typedef struct SClientUvTaskNode {
SUdfdProxy *udfc; SUdfcProxy *udfc;
int8_t type; int8_t type;
int errCode; int errCode;
...@@ -1055,11 +1055,11 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -1055,11 +1055,11 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask); fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
SUdfdProxy *udfc = uvTask->udfc; SUdfcProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex); uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue); QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->taskQueueMutex);
uv_async_send(&udfc->gUdfLoopTaskAync); uv_async_send(&udfc->loopTaskAync);
uv_sem_wait(&uvTask->taskSem); uv_sem_wait(&uvTask->taskSem);
fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask); fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
...@@ -1073,7 +1073,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1073,7 +1073,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0); uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0);
uvTask->pipe = pipe; uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn)); SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
...@@ -1113,46 +1113,46 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1113,46 +1113,46 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
} }
void udfClientAsyncCb(uv_async_t *async) { void udfClientAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data; SUdfcProxy *udfc = async->data;
QUEUE wq; QUEUE wq;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex); uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); QUEUE_MOVE(&udfc->taskQueue, &wq);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->taskQueueMutex);
while (!QUEUE_EMPTY(&wq)) { while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
udfcStartUvTask(task); udfcStartUvTask(task);
QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue); QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
} }
} }
void cleanUpUvTasks(SUdfdProxy *udfc) { void cleanUpUvTasks(SUdfcProxy *udfc) {
fnDebug("clean up uv tasks") fnDebug("clean up uv tasks")
QUEUE wq; QUEUE wq;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex); uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); QUEUE_MOVE(&udfc->taskQueue, &wq);
uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->taskQueueMutex);
while (!QUEUE_EMPTY(&wq)) { while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) { if (udfc->udfcState == UDFC_STATE_STOPPING) {
task->errCode = TSDB_CODE_UDF_STOPPING; task->errCode = TSDB_CODE_UDF_STOPPING;
} }
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
} }
while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) { while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue); QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) { if (udfc->udfcState == UDFC_STATE_STOPPING) {
task->errCode = TSDB_CODE_UDF_STOPPING; task->errCode = TSDB_CODE_UDF_STOPPING;
} }
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
...@@ -1160,28 +1160,28 @@ void cleanUpUvTasks(SUdfdProxy *udfc) { ...@@ -1160,28 +1160,28 @@ void cleanUpUvTasks(SUdfdProxy *udfc) {
} }
void udfStopAsyncCb(uv_async_t *async) { void udfStopAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data; SUdfcProxy *udfc = async->data;
cleanUpUvTasks(udfc); cleanUpUvTasks(udfc);
if (udfc->gUdfcState == UDFC_STATE_STOPPING) { if (udfc->udfcState == UDFC_STATE_STOPPING) {
uv_stop(&udfc->gUdfdLoop); uv_stop(&udfc->uvLoop);
} }
} }
void constructUdfService(void *argsThread) { void constructUdfService(void *argsThread) {
SUdfdProxy *udfc = (SUdfdProxy*)argsThread; SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
uv_loop_init(&udfc->gUdfdLoop); uv_loop_init(&udfc->uvLoop);
uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfClientAsyncCb);
udfc->gUdfLoopTaskAync.data = udfc; udfc->loopTaskAync.data = udfc;
uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb); uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
udfc->gUdfLoopStopAsync.data = udfc; udfc->loopStopAsync.data = udfc;
uv_mutex_init(&udfc->gUdfTaskQueueMutex); uv_mutex_init(&udfc->taskQueueMutex);
QUEUE_INIT(&udfc->gUdfTaskQueue); QUEUE_INIT(&udfc->taskQueue);
QUEUE_INIT(&udfc->gUvProcTaskQueue); QUEUE_INIT(&udfc->uvProcTaskQueue);
uv_barrier_wait(&udfc->gUdfInitBarrier); uv_barrier_wait(&udfc->initBarrier);
//TODO return value of uv_run //TODO return value of uv_run
uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT); uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
uv_loop_close(&udfc->gUdfdLoop); uv_loop_close(&udfc->uvLoop);
} }
int32_t udfcOpen() { int32_t udfcOpen() {
...@@ -1189,14 +1189,14 @@ int32_t udfcOpen() { ...@@ -1189,14 +1189,14 @@ int32_t udfcOpen() {
if (old == 1) { if (old == 1) {
return 0; return 0;
} }
SUdfdProxy *proxy = &gUdfdProxy; SUdfcProxy *proxy = &gUdfdProxy;
getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName)); getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
proxy->gUdfcState = UDFC_STATE_STARTNG; proxy->udfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->gUdfInitBarrier, 2); uv_barrier_init(&proxy->initBarrier, 2);
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
atomic_store_8(&proxy->gUdfcState, UDFC_STATE_READY); atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
proxy->gUdfcState = UDFC_STATE_READY; proxy->udfcState = UDFC_STATE_READY;
uv_barrier_wait(&proxy->gUdfInitBarrier); uv_barrier_wait(&proxy->initBarrier);
fnInfo("udfc initialized") fnInfo("udfc initialized")
return 0; return 0;
} }
...@@ -1207,13 +1207,13 @@ int32_t udfcClose() { ...@@ -1207,13 +1207,13 @@ int32_t udfcClose() {
return 0; return 0;
} }
SUdfdProxy *udfc = &gUdfdProxy; SUdfcProxy *udfc = &gUdfdProxy;
udfc->gUdfcState = UDFC_STATE_STOPPING; udfc->udfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->gUdfLoopStopAsync); uv_async_send(&udfc->loopStopAsync);
uv_thread_join(&udfc->gUdfLoopThread); uv_thread_join(&udfc->loopThread);
uv_mutex_destroy(&udfc->gUdfTaskQueueMutex); uv_mutex_destroy(&udfc->taskQueueMutex);
uv_barrier_destroy(&udfc->gUdfInitBarrier); uv_barrier_destroy(&udfc->initBarrier);
udfc->gUdfcState = UDFC_STATE_INITAL; udfc->udfcState = UDFC_STATE_INITAL;
fnInfo("udfc cleaned up"); fnInfo("udfc cleaned up");
return 0; return 0;
} }
...@@ -1236,7 +1236,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -1236,7 +1236,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
fnInfo("udfc setup udf. udfName: %s", udfName); fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) { if (gUdfdProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE; return TSDB_CODE_UDF_INVALID_STATE;
} }
SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask));
...@@ -1484,7 +1484,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { ...@@ -1484,7 +1484,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SSDataBlock tempBlock = {0}; SSDataBlock tempBlock = {0};
tempBlock.info.numOfCols = numOfCols; tempBlock.info.numOfCols = numOfCols;
tempBlock.info.rows = numOfRows; tempBlock.info.rows = pInput->totalRows;
tempBlock.info.uid = pInput->uid; tempBlock.info.uid = pInput->uid;
bool hasVarCol = false; bool hasVarCol = false;
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
......
...@@ -27,10 +27,33 @@ ...@@ -27,10 +27,33 @@
#endif #endif
#define INDEX_NUM_OF_THREADS 4 #define INDEX_NUM_OF_THREADS 4
#define INDEX_QUEUE_SIZE 200 #define INDEX_QUEUE_SIZE 200
void* indexQhandle = NULL; void* indexQhandle = NULL;
#define INDEX_DATA_BOOL_NULL 0x02
#define INDEX_DATA_TINYINT_NULL 0x80
#define INDEX_DATA_SMALLINT_NULL 0x8000
#define INDEX_DATA_INT_NULL 0x80000000L
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
#define INDEX_DATA_BINARY_NULL 0xFF
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
#define INDEX_DATA_JSON_null 0xFFFFFFFE
#define INDEX_DATA_JSON_NOT_NULL 0x01
#define INDEX_DATA_UTINYINT_NULL 0xFF
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
#define INDEX_DATA_NULL_STR "NULL"
#define INDEX_DATA_NULL_STR_L "null"
void indexInit() { void indexInit() {
// refactor later // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
...@@ -67,12 +90,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -67,12 +90,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
return -1; return -1;
} }
#ifdef USE_LUCENE
index_t* index = index_open(path);
sIdx->index = index;
#endif
#ifdef USE_INVERTED_INDEX
// sIdx->cache = (void*)indexCacheCreate(sIdx); // sIdx->cache = (void*)indexCacheCreate(sIdx);
sIdx->tindex = indexTFileCreate(path); sIdx->tindex = indexTFileCreate(path);
if (sIdx->tindex == NULL) { if (sIdx->tindex == NULL) {
...@@ -85,7 +102,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -85,7 +102,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadMutexInit(&sIdx->mtx, NULL); taosThreadMutexInit(&sIdx->mtx, NULL);
*index = sIdx; *index = sIdx;
return 0; return 0;
#endif
END: END:
if (sIdx != NULL) { if (sIdx != NULL) {
...@@ -97,12 +113,6 @@ END: ...@@ -97,12 +113,6 @@ END:
} }
void indexClose(SIndex* sIdx) { void indexClose(SIndex* sIdx) {
#ifdef USE_LUCENE
index_close(sIdex->index);
sIdx->index = NULL;
#endif
#ifdef USE_INVERTED_INDEX
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
...@@ -114,31 +124,12 @@ void indexClose(SIndex* sIdx) { ...@@ -114,31 +124,12 @@ void indexClose(SIndex* sIdx) {
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
taosThreadMutexDestroy(&sIdx->mtx); taosThreadMutexDestroy(&sIdx->mtx);
indexTFileDestroy(sIdx->tindex); indexTFileDestroy(sIdx->tindex);
#endif
taosMemoryFree(sIdx->path); taosMemoryFree(sIdx->path);
taosMemoryFree(sIdx); taosMemoryFree(sIdx);
return; return;
} }
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
#ifdef USE_LUCENE
index_document_t* doc = index_document_create();
char buf[16] = {0};
sprintf(buf, "%d", uid);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i);
index_document_add(doc, (const char*)(p->key), p->nKey, (const char*)(p->val), p->nVal, 1);
}
index_document_add(doc, NULL, 0, buf, strlen(buf), 0);
index_put(index->index, doc);
index_document_destroy(doc);
#endif
#ifdef USE_INVERTED_INDEX
// TODO(yihao): reduce the lock range // TODO(yihao): reduce the lock range
taosThreadMutexLock(&index->mtx); taosThreadMutexLock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
...@@ -170,12 +161,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -170,12 +161,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
return ret; return ret;
} }
} }
#endif
return 0; return 0;
} }
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
#ifdef USE_INVERTED_INDEX
EIndexOperatorType opera = multiQuerys->opera; // relation of querys EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray* iRslts = taosArrayInit(4, POINTER_BYTES); SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
...@@ -188,35 +176,14 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result ...@@ -188,35 +176,14 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
} }
indexMergeFinalResults(iRslts, opera, result); indexMergeFinalResults(iRslts, opera, result);
indexInterResultsDestroy(iRslts); indexInterResultsDestroy(iRslts);
#endif
return 0; return 0;
} }
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
#ifdef USE_INVERTED_INDEX int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
#endif
return 1; SIndexOpts* indexOptsCreate() { return NULL; }
} void indexOptsDestroy(SIndexOpts* opts) { return; }
int indexRebuild(SIndex* index, SIndexOpts* opts) {
#ifdef USE_INVERTED_INDEX
#endif
return 0;
}
SIndexOpts* indexOptsCreate() {
#ifdef USE_LUCENE
#endif
return NULL;
}
void indexOptsDestroy(SIndexOpts* opts) {
#ifdef USE_LUCENE
#endif
return;
}
/* /*
* @param: oper * @param: oper
* *
......
...@@ -403,6 +403,19 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -403,6 +403,19 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
EXPECT_EQ(1000, taosArrayGetSize(result)); EXPECT_EQ(1000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
} }
{
std::string colName("other_column");
std::string colVal("100");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); std::string colVal("10");
......
...@@ -145,9 +145,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); ...@@ -145,9 +145,9 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} while (0) } while (0)
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
do { \ do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
...@@ -223,11 +223,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); ...@@ -223,11 +223,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
#define CONN_RELEASE_BY_SERVER(conn) \ #define CONN_RELEASE_BY_SERVER(conn) \
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release) #define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn) #define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port) #define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
static void* cliWorkThread(void* arg); static void* cliWorkThread(void* arg);
......
...@@ -109,6 +109,7 @@ if $data01 != 18.547236991 then ...@@ -109,6 +109,7 @@ if $data01 != 18.547236991 then
endi endi
sql select udf2(udf1(f2-f1)), udf2(udf1(f2/f1)) from t2; sql select udf2(udf1(f2-f1)), udf2(udf1(f2/f1)) from t2;
print $rows , $data00 , $data01
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
...@@ -118,7 +119,19 @@ endi ...@@ -118,7 +119,19 @@ endi
if $data01 != 152.420471066 then if $data01 != 152.420471066 then
return -1 return -1
endi endi
print $rows , $data00 , $data01
sql select udf2(f2) from udf.t2 group by 1-udf1(f1);
print $rows , $data00 , $data10
if $rows != 2 then
return -1
endi
if $data00 != 2.000000000 then
return -1
endi
if $data10 != 12.083045974 then
return -1
endi
sql drop function udf1; sql drop function udf1;
sql show functions; sql show functions;
if $rows != 1 then if $rows != 1 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册