From 80217813fd4a6aa03795a2ca6db6d53606aa5530 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 14 Mar 2020 18:05:42 +0800 Subject: [PATCH] file rename, move query functions to different module. --- src/client/inc/tscSQLParser.h | 19 +- src/client/inc/tscSecondaryMerge.h | 4 +- src/client/inc/tscUtil.h | 5 +- src/client/inc/tsclient.h | 17 +- src/client/src/tscCache.c | 264 ------------------ src/client/src/tscFunctionImpl.c | 12 +- src/client/src/tscLocal.c | 2 +- src/client/src/tscParseInsert.c | 3 +- src/client/src/tscSQLParser.c | 3 +- src/client/src/tscServer.c | 20 +- src/client/src/tscSql.c | 2 +- src/client/src/tscSystem.c | 3 - src/client/src/tscUtil.c | 93 +++++- src/inc/{tpercentile.h => qpercentile.h} | 2 +- src/mnode/src/mgmtTable.c | 28 +- src/query/inc/{textbuffer.h => qextbuffer.h} | 0 src/query/inc/{thistogram.h => qhistogram.h} | 0 .../{tinterpolation.h => qinterpolation.h} | 4 + src/query/inc/{tresultBuf.h => qresultBuf.h} | 2 +- src/query/inc/qsqlparser.h | 12 + ...axtreefunction.h => qsyntaxtreefunction.h} | 0 src/query/src/{tscAst.c => qast.c} | 9 +- src/query/src/{textbuffer.c => qextbuffer.c} | 4 +- src/query/src/{thistogram.c => qhistogram.c} | 4 +- .../{tinterpolation.c => qinterpolation.c} | 6 +- .../src/{tscSQLParserImpl.c => qparserImpl.c} | 0 .../src/{tpercentile.c => qpercentile.c} | 5 +- src/query/src/{tresultBuf.c => qresultBuf.c} | 4 +- ...axtreefunction.c => qsyntaxtreefunction.c} | 2 +- src/query/src/{ttokenizer.c => qtokenizer.c} | 0 src/{query => util}/inc/tcache.h | 0 src/{client => util}/src/tcache.c | 0 src/vnode/detail/inc/vnodeRead.h | 4 +- src/vnode/detail/src/vnodeQueryImpl.c | 4 +- src/vnode/detail/src/vnodeQueryProcess.c | 2 +- src/vnode/detail/src/vnodeRead.c | 2 +- src/vnode/detail/src/vnodeSupertableQuery.c | 6 +- src/vnode/detail/src/vnodeTagMgmt.c | 6 +- 38 files changed, 182 insertions(+), 371 deletions(-) delete mode 100644 src/client/src/tscCache.c rename src/inc/{tpercentile.h => qpercentile.h} (98%) rename src/query/inc/{textbuffer.h => qextbuffer.h} (100%) rename src/query/inc/{thistogram.h => qhistogram.h} (100%) rename src/query/inc/{tinterpolation.h => qinterpolation.h} (98%) rename src/query/inc/{tresultBuf.h => qresultBuf.h} (99%) rename src/query/inc/{tscSyntaxtreefunction.h => qsyntaxtreefunction.h} (100%) rename src/query/src/{tscAst.c => qast.c} (99%) rename src/query/src/{textbuffer.c => qextbuffer.c} (99%) rename src/query/src/{thistogram.c => qhistogram.c} (99%) rename src/query/src/{tinterpolation.c => qinterpolation.c} (99%) rename src/query/src/{tscSQLParserImpl.c => qparserImpl.c} (100%) rename src/query/src/{tpercentile.c => qpercentile.c} (99%) rename src/query/src/{tresultBuf.c => qresultBuf.c} (99%) rename src/query/src/{tscSyntaxtreefunction.c => qsyntaxtreefunction.c} (99%) rename src/query/src/{ttokenizer.c => qtokenizer.c} (100%) rename src/{query => util}/inc/tcache.h (100%) rename src/{client => util}/src/tcache.c (100%) diff --git a/src/client/inc/tscSQLParser.h b/src/client/inc/tscSQLParser.h index c7f8ba06e8..0341b7ed03 100644 --- a/src/client/inc/tscSQLParser.h +++ b/src/client/inc/tscSQLParser.h @@ -20,24 +20,9 @@ extern "C" { #endif -#include "taos.h" -#include "taosmsg.h" -#include "ttokendef.h" -#include "taosdef.h" -#include "tvariant.h" -#include "qsqlparser.h" +#include "tsclient.h" -enum { - TSQL_NODE_TYPE_EXPR = 0x1, - TSQL_NODE_TYPE_ID = 0x2, - TSQL_NODE_TYPE_VALUE = 0x4, -}; - -#define NON_ARITHMEIC_EXPR 0 -#define NORMAL_ARITHMETIC 1 -#define AGG_ARIGHTMEIC 2 - -int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql); +int32_t tscToSQLCmd(struct SSqlObj* pSql, struct SSqlInfo* pInfo); #ifdef __cplusplus } diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index 08d995c9f3..5370d0ec52 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -20,9 +20,9 @@ extern "C" { #endif +#include "qextbuffer.h" +#include "qinterpolation.h" #include "taosmsg.h" -#include "textbuffer.h" -#include "tinterpolation.h" #include "tlosertree.h" #include "tsclient.h" diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index a869e45198..ef82ac53a2 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -24,10 +24,10 @@ extern "C" { * @date 2018/09/30 */ #include "os.h" -#include "textbuffer.h" +#include "qextbuffer.h" +#include "taosdef.h" #include "tscSecondaryMerge.h" #include "tsclient.h" -#include "taosdef.h" #define UTIL_METER_IS_SUPERTABLE(metaInfo) \ (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE)) @@ -252,6 +252,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); +int32_t launchMultivnodeInsert(SSqlObj *pSql); #ifdef __cplusplus } diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index caec0fdbb8..0823e6175f 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -32,6 +32,7 @@ extern "C" { #include "tutil.h" #include "trpc.h" #include "qsqltype.h" +#include "qsqlparser.h" #define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows) @@ -308,14 +309,14 @@ typedef struct _tsc_obj { char sversion[TSDB_VERSION_LEN]; char writeAuth : 1; char superAuth : 1; - struct _sql_obj *pSql; - struct _sql_obj *pHb; - struct _sql_obj *sqlList; + struct SSqlObj *pSql; + struct SSqlObj *pHb; + struct SSqlObj *sqlList; struct _sstream *streamList; pthread_mutex_t mutex; } STscObj; -typedef struct _sql_obj { +typedef struct SSqlObj { void * signature; STscObj *pTscObj; void (*fp)(); @@ -340,8 +341,8 @@ typedef struct _sql_obj { uint8_t numOfSubs; char * asyncTblPos; void * pTableHashList; - struct _sql_obj **pSubs; - struct _sql_obj * prev, *next; + struct SSqlObj **pSubs; + struct SSqlObj * prev, *next; } SSqlObj; typedef struct _sstream { @@ -442,9 +443,6 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); -// transfer SSqlInfo to SqlCmd struct -int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); - void tscQueueAsyncFreeResult(SSqlObj *pSql); extern void * pVnodeConn; @@ -453,7 +451,6 @@ extern void * tscCacheHandle; extern int32_t globalCode; extern int slaveIndex; extern void * tscTmr; -extern void * tscConnCache; extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; diff --git a/src/client/src/tscCache.c b/src/client/src/tscCache.c deleted file mode 100644 index 666d069a58..0000000000 --- a/src/client/src/tscCache.c +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" - -#include "tglobalcfg.h" -#include "tlog.h" -#include "tmempool.h" -#include "tsclient.h" -#include "ttime.h" -#include "ttimer.h" -#include "tutil.h" - -typedef struct _c_hash_t { - uint32_t ip; - uint16_t port; - struct _c_hash_t *prev; - struct _c_hash_t *next; - void * data; - uint64_t time; -} SConnHash; - -typedef struct { - SConnHash ** connHashList; - mpool_h connHashMemPool; - int maxSessions; - int total; - int * count; - int64_t keepTimer; - pthread_mutex_t mutex; - void (*cleanFp)(void *); - void *tmrCtrl; - void *pTimer; -} SConnCache; - -int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { - SConnCache *pObj = (SConnCache *)handle; - int hash = 0; - // size_t user_len = strlen(user); - - hash = ip >> 16; - hash += (unsigned short)(ip & 0xFFFF); - hash += port; - while (*user != '\0') { - hash += *user; - user++; - } - - hash = hash % pObj->maxSessions; - - return hash; -} - -void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) { - if (pNode == NULL) return; - if (time < pObj->keepTimer + pNode->time) return; - - SConnHash *pPrev = pNode->prev, *pNext; - - while (pNode) { - (*pObj->cleanFp)(pNode->data); - pNext = pNode->next; - pObj->total--; - pObj->count[hash]--; - tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, - pObj->count[hash]); - taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); - pNode = pNext; - } - - if (pPrev) - pPrev->next = NULL; - else - pObj->connHashList[hash] = NULL; -} - -void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { - int hash; - SConnHash * pNode; - SConnCache *pObj; - - uint64_t time = taosGetTimestampMs(); - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; - - if (data == NULL) { - tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port); - return NULL; - } - - hash = taosHashConn(pObj, ip, port, user); - pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool); - pNode->ip = ip; - pNode->port = port; - pNode->data = data; - pNode->prev = NULL; - pNode->time = time; - - pthread_mutex_lock(&pObj->mutex); - - pNode->next = pObj->connHashList[hash]; - if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode; - pObj->connHashList[hash] = pNode; - - pObj->total++; - pObj->count[hash]++; - taosRemoveExpiredNodes(pObj, pNode->next, hash, time); - - pthread_mutex_unlock(&pObj->mutex); - - tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]); - - return pObj; -} - -void taosCleanConnCache(void *handle, void *tmrId) { - int hash; - SConnHash * pNode; - SConnCache *pObj; - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; - if (pObj->pTimer != tmrId) return; - - uint64_t time = taosGetTimestampMs(); - - for (hash = 0; hash < pObj->maxSessions; ++hash) { - pthread_mutex_lock(&pObj->mutex); - pNode = pObj->connHashList[hash]; - taosRemoveExpiredNodes(pObj, pNode, hash, time); - pthread_mutex_unlock(&pObj->mutex); - } - - // tscTrace("timer, total connections in cache:%d", pObj->total); - taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); -} - -void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) { - int hash; - SConnHash * pNode; - SConnCache *pObj; - void * pData = NULL; - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; - - uint64_t time = taosGetTimestampMs(); - - hash = taosHashConn(pObj, ip, port, user); - pthread_mutex_lock(&pObj->mutex); - - pNode = pObj->connHashList[hash]; - while (pNode) { - if (time >= pObj->keepTimer + pNode->time) { - taosRemoveExpiredNodes(pObj, pNode, hash, time); - pNode = NULL; - break; - } - - if (pNode->ip == ip && pNode->port == port) break; - - pNode = pNode->next; - } - - if (pNode) { - taosRemoveExpiredNodes(pObj, pNode->next, hash, time); - - if (pNode->prev) { - pNode->prev->next = pNode->next; - } else { - pObj->connHashList[hash] = pNode->next; - } - - if (pNode->next) { - pNode->next->prev = pNode->prev; - } - - pData = pNode->data; - taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); - pObj->total--; - pObj->count[hash]--; - } - - pthread_mutex_unlock(&pObj->mutex); - - if (pData) { - tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]); - } - - return pData; -} - -void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { - SConnHash **connHashList; - mpool_h connHashMemPool; - SConnCache *pObj; - - connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); - if (connHashMemPool == 0) return NULL; - - connHashList = calloc(sizeof(SConnHash *), maxSessions); - if (connHashList == 0) { - taosMemPoolCleanUp(connHashMemPool); - return NULL; - } - - pObj = malloc(sizeof(SConnCache)); - if (pObj == NULL) { - taosMemPoolCleanUp(connHashMemPool); - free(connHashList); - return NULL; - } - memset(pObj, 0, sizeof(SConnCache)); - - pObj->count = calloc(sizeof(int), maxSessions); - pObj->total = 0; - pObj->keepTimer = keepTimer; - pObj->maxSessions = maxSessions; - pObj->connHashMemPool = connHashMemPool; - pObj->connHashList = connHashList; - pObj->cleanFp = cleanFp; - pObj->tmrCtrl = tmrCtrl; - taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); - - pthread_mutex_init(&pObj->mutex, NULL); - - return pObj; -} - -void taosCloseConnCache(void *handle) { - SConnCache *pObj; - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; - - pthread_mutex_lock(&pObj->mutex); - - taosTmrStopA(&(pObj->pTimer)); - - if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool); - - tfree(pObj->connHashList); - tfree(pObj->count) - - pthread_mutex_unlock(&pObj->mutex); - - pthread_mutex_destroy(&pObj->mutex); - - memset(pObj, 0, sizeof(SConnCache)); - free(pObj); -} diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 9c4f7e9c55..dc6f3fb5ee 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -14,20 +14,20 @@ */ #include "os.h" +#include "qextbuffer.h" +#include "qhistogram.h" +#include "qinterpolation.h" +#include "qpercentile.h" +#include "qsyntaxtreefunction.h" +#include "taosdef.h" #include "taosmsg.h" #include "tast.h" -#include "textbuffer.h" -#include "thistogram.h" -#include "tinterpolation.h" #include "tlog.h" #include "tscJoinProcess.h" -#include "tscSyntaxtreefunction.h" #include "tscompression.h" #include "tsqlfunction.h" #include "ttime.h" -#include "taosdef.h" #include "tutil.h" -#include "tpercentile.h" #define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) #define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes) diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 0a4d7e74d6..08ef80b9ab 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -21,7 +21,7 @@ #include "tsclient.h" #include "taosdef.h" -#include "textbuffer.h" +#include "qextbuffer.h" #include "tscSecondaryMerge.h" #include "tschemautil.h" #include "tsocket.h" diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 1b7ed4002b..db7d40f98d 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1314,11 +1314,10 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { * the error handle callback function can rightfully restore the user defined function (fp) */ if (pSql->fp != NULL && multiVnodeInsertion) { - assert(pSql->fetchFp == NULL); pSql->fetchFp = pSql->fp; // replace user defined callback function with multi-insert proxy function - pSql->fp = tscAsyncInsertMultiVnodesProxy; + pSql->fp = launchMultivnodeInsert; } ret = tsParseInsertSql(pSql); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4193903795..4f4b557051 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -28,6 +28,7 @@ #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" +#include "ttokendef.h" #define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0" @@ -59,7 +60,7 @@ static int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pD static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength); static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName); -static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool isResultColumn); +static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool finalResult); static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, int8_t type, char* fieldName, SSqlExpr* pSqlExpr); static int32_t changeFunctionID(int32_t optr, int16_t* functionId); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a7c5b0f6b9..8453fdb305 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -605,7 +605,7 @@ int tscProcessSql(SSqlObj *pSql) { } } } - + if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { /* * (ref. line: 964) @@ -615,24 +615,16 @@ int tscProcessSql(SSqlObj *pSql) { * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL, * which causes deadlock. So we keep it as local variable. */ - void *fp = pSql->fp; - if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) { return pRes->code; } - - if (fp == NULL) { - tsem_post(&pSql->emptyRspSem); - tsem_wait(&pSql->rspSem); - tsem_post(&pSql->emptyRspSem); - - // set the command flag must be after the semaphore been correctly set. - pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - } - + + return pSql->res.code; + } else if (pSql->fp == launchMultivnodeInsert) { // multi-vnodes insertion + launchMultivnodeInsert(pSql); return pSql->res.code; } - + return doProcessSql(pSql); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index bbe2fa8d3a..eb09231578 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include #include "hash.h" #include "os.h" #include "tcache.h" @@ -30,6 +29,7 @@ #include "tsocket.h" #include "ttimer.h" #include "tutil.h" +#include "ttokendef.h" TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 866398b7f5..4265701343 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -38,7 +38,6 @@ int initialized = 0; int slaveIndex; void * tscTmr; void * tscQhandle; -void * tscConnCache; void * tscCheckDiskUsageTmr; int tsInsertHeadSize; @@ -188,8 +187,6 @@ void taos_init_imp() { if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime); - tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000); - initialized = 1; tscTrace("client is initialized successfully"); tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b655832f11..5314f35757 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2105,7 +2105,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void void tscDoQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; - void* fp = pSql->fp; pSql->res.code = TSDB_CODE_SUCCESS; @@ -2121,7 +2120,6 @@ void tscDoQuery(SSqlObj* pSql) { } else { // pSql may be released in this function if it is a async insertion. tscProcessSql(pSql); - if (NULL == fp) tscProcessMultiVnodesInsert(pSql); } } } @@ -2321,3 +2319,94 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { tscProcessSql(pSql); } } + +typedef struct SinsertSupporter { + SSubqueryState* pState; + SSqlObj* pSql; +} SinsertSupporter; + +void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { + SinsertSupporter *pSupporter = (SinsertSupporter *)param; + SSqlObj* pParentObj = pSupporter->pSql; + SSqlCmd* pParentCmd = &pParentObj->cmd; + + SSubqueryState* pState = pSupporter->pState; + int32_t total = pState->numOfTotal; + + // increase the total inserted rows + if (numOfRows > 0) { + pParentObj->res.numOfRows += numOfRows; + } + + int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (completed < total) { + return; + } + + tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); + + // release data block data + pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); + + // restore user defined fp + pParentObj->fp = pParentObj->fetchFp; + + // all data has been sent to vnode, call user function + (*pParentObj->fp)(pParentObj->param, tres, numOfRows); +} + +int32_t launchMultivnodeInsert(SSqlObj *pSql) { + SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; + + pRes->qhandle = 1; // hack the qhandle check + SDataBlockList *pDataBlocks = pCmd->pDataBlocks; + + pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); + pSql->numOfSubs = pDataBlocks->nSize; + assert(pDataBlocks->nSize > 0); + + tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize); + SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + pState->numOfTotal = pSql->numOfSubs; + + pRes->code = TSDB_CODE_SUCCESS; + + int32_t i = 0; + for (; i < pSql->numOfSubs; ++i) { + SinsertSupporter* pSupporter = calloc(1, sizeof(SinsertSupporter)); + pSupporter->pSql = pSql; + pSupporter->pState = pState; + + SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL); + if (pNew == NULL) { + tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + break; + } + + pSql->pSubs[i] = pNew; + tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i); + } + + if (i < pSql->numOfSubs) { + tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + return pRes->code; // free all allocated resource + } + + for (int32_t j = 0; j < pSql->numOfSubs; ++j) { + SSqlObj *pSub = pSql->pSubs[j]; + pSub->cmd.command = TSDB_SQL_INSERT; + int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]); + + if (code != TSDB_CODE_SUCCESS) { + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j, + pDataBlocks->nSize, code); + } + + tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); + tscProcessSql(pSub); + } + + return TSDB_CODE_SUCCESS; +} diff --git a/src/inc/tpercentile.h b/src/inc/qpercentile.h similarity index 98% rename from src/inc/tpercentile.h rename to src/inc/qpercentile.h index b9cf50e0bb..73430bd05c 100644 --- a/src/inc/tpercentile.h +++ b/src/inc/qpercentile.h @@ -16,7 +16,7 @@ #ifndef TDENGINE_TPERCENTILE_H #define TDENGINE_TPERCENTILE_H -#include "textbuffer.h" +#include "qextbuffer.h" typedef struct MinMaxEntry { union { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index ee0f19b89b..9c4e0b57a1 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -14,22 +14,11 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "taoserror.h" -#include "taosmsg.h" -#include "tast.h" -#include "textbuffer.h" -#include "tschemautil.h" -#include "tscompression.h" -#include "tskiplist.h" -#include "tsqlfunction.h" -#include "tstatus.h" -#include "ttime.h" -#include "mnode.h" +#include "mgmtTable.h" #include "mgmtAcct.h" #include "mgmtChildTable.h" -#include "mgmtDb.h" #include "mgmtDClient.h" +#include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtGrant.h" #include "mgmtMnode.h" @@ -37,9 +26,20 @@ #include "mgmtProfile.h" #include "mgmtShell.h" #include "mgmtSuperTable.h" -#include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" +#include "mnode.h" +#include "os.h" +#include "qextbuffer.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tast.h" +#include "tschemautil.h" +#include "tscompression.h" +#include "tskiplist.h" +#include "tsqlfunction.h" +#include "tstatus.h" +#include "ttime.h" extern void *tsNormalTableSdb; extern void *tsChildTableSdb; diff --git a/src/query/inc/textbuffer.h b/src/query/inc/qextbuffer.h similarity index 100% rename from src/query/inc/textbuffer.h rename to src/query/inc/qextbuffer.h diff --git a/src/query/inc/thistogram.h b/src/query/inc/qhistogram.h similarity index 100% rename from src/query/inc/thistogram.h rename to src/query/inc/qhistogram.h diff --git a/src/query/inc/tinterpolation.h b/src/query/inc/qinterpolation.h similarity index 98% rename from src/query/inc/tinterpolation.h rename to src/query/inc/qinterpolation.h index f4b327bcbe..c8ebd850b6 100644 --- a/src/query/inc/tinterpolation.h +++ b/src/query/inc/qinterpolation.h @@ -20,6 +20,10 @@ extern "C" { #endif +#include "os.h" +#include "taosdef.h" +#include "qextbuffer.h" + typedef struct SInterpolationInfo { int64_t startTimestamp; int32_t order; // order [asc/desc] diff --git a/src/query/inc/tresultBuf.h b/src/query/inc/qresultBuf.h similarity index 99% rename from src/query/inc/tresultBuf.h rename to src/query/inc/qresultBuf.h index 8f30ff7c61..346dc2d00c 100644 --- a/src/query/inc/tresultBuf.h +++ b/src/query/inc/qresultBuf.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "os.h" -#include "textbuffer.h" +#include "qextbuffer.h" typedef struct SIDList { uint32_t alloc; diff --git a/src/query/inc/qsqlparser.h b/src/query/inc/qsqlparser.h index 7a1322be10..064ded2fad 100644 --- a/src/query/inc/qsqlparser.h +++ b/src/query/inc/qsqlparser.h @@ -329,6 +329,18 @@ void tSQLSetColumnType(TAOS_FIELD *pField, SSQLToken *pToken); void *ParseAlloc(void *(*mallocProc)(size_t)); +enum { + TSQL_NODE_TYPE_EXPR = 0x1, + TSQL_NODE_TYPE_ID = 0x2, + TSQL_NODE_TYPE_VALUE = 0x4, +}; + +#define NON_ARITHMEIC_EXPR 0 +#define NORMAL_ARITHMETIC 1 +#define AGG_ARIGHTMEIC 2 + +int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql); + #ifdef __cplusplus } #endif diff --git a/src/query/inc/tscSyntaxtreefunction.h b/src/query/inc/qsyntaxtreefunction.h similarity index 100% rename from src/query/inc/tscSyntaxtreefunction.h rename to src/query/inc/qsyntaxtreefunction.h diff --git a/src/query/src/tscAst.c b/src/query/src/qast.c similarity index 99% rename from src/query/src/tscAst.c rename to src/query/src/qast.c index ef83083a2c..807fd5e5aa 100644 --- a/src/query/src/tscAst.c +++ b/src/query/src/qast.c @@ -14,18 +14,17 @@ */ #include "os.h" +#include "qsqlparser.h" +#include "qsyntaxtreefunction.h" #include "taosdef.h" #include "taosmsg.h" #include "tast.h" #include "tlog.h" -#include "tscSyntaxtreefunction.h" #include "tschemautil.h" #include "tsqlfunction.h" #include "tstoken.h" #include "ttokendef.h" -#include "taosdef.h" #include "tutil.h" -#include "qsqlparser.h" /* * @@ -648,7 +647,7 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults /* * traverse the result and apply the function to each item to check if the item is qualified or not */ -static void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { +static UNUSED_FUNC void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { assert(pExpr->pLeft->nodeType == TSQL_NODE_COL && pExpr->pRight->nodeType == TSQL_NODE_VALUE); // brutal force scan the result list and check for each item in the list @@ -705,7 +704,7 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu * @param pSchema tag schemas * @param fp filter callback function */ -static void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) { +static UNUSED_FUNC void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) { int32_t n = 0; for (int32_t i = 0; i < pResult->num; ++i) { void *pItem = pResult->pRes[i]; diff --git a/src/query/src/textbuffer.c b/src/query/src/qextbuffer.c similarity index 99% rename from src/query/src/textbuffer.c rename to src/query/src/qextbuffer.c index 056fe80858..92f5175b98 100644 --- a/src/query/src/textbuffer.c +++ b/src/query/src/qextbuffer.c @@ -12,14 +12,14 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "qextbuffer.h" #include "os.h" #include "taos.h" +#include "taosdef.h" #include "taosmsg.h" -#include "textbuffer.h" #include "tlog.h" #include "tsqlfunction.h" #include "ttime.h" -#include "taosdef.h" #include "tutil.h" #define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ diff --git a/src/query/src/thistogram.c b/src/query/src/qhistogram.c similarity index 99% rename from src/query/src/thistogram.c rename to src/query/src/qhistogram.c index 31045a4957..26482e9f14 100644 --- a/src/query/src/thistogram.c +++ b/src/query/src/qhistogram.c @@ -14,10 +14,10 @@ */ #include "os.h" +#include "qhistogram.h" +#include "taosdef.h" #include "taosmsg.h" -#include "thistogram.h" #include "tlosertree.h" -#include "taosdef.h" /** * diff --git a/src/query/src/tinterpolation.c b/src/query/src/qinterpolation.c similarity index 99% rename from src/query/src/tinterpolation.c rename to src/query/src/qinterpolation.c index 1a9da44788..1731e16ed8 100644 --- a/src/query/src/tinterpolation.c +++ b/src/query/src/qinterpolation.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ +#include "qinterpolation.h" #include "os.h" +#include "qextbuffer.h" +#include "taosdef.h" #include "taosmsg.h" -#include "textbuffer.h" -#include "tinterpolation.h" #include "tsqlfunction.h" -#include "taosdef.h" #define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC) diff --git a/src/query/src/tscSQLParserImpl.c b/src/query/src/qparserImpl.c similarity index 100% rename from src/query/src/tscSQLParserImpl.c rename to src/query/src/qparserImpl.c diff --git a/src/query/src/tpercentile.c b/src/query/src/qpercentile.c similarity index 99% rename from src/query/src/tpercentile.c rename to src/query/src/qpercentile.c index 6e1c28c516..3b12dee053 100644 --- a/src/query/src/tpercentile.c +++ b/src/query/src/qpercentile.c @@ -15,11 +15,10 @@ #include "os.h" -#include "taosmsg.h" +#include "qpercentile.h" #include "taosdef.h" +#include "taosmsg.h" #include "tlog.h" -#include "taosdef.h" -#include "tpercentile.h" tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) { tExtMemBuffer *pBuffer = NULL; diff --git a/src/query/src/tresultBuf.c b/src/query/src/qresultBuf.c similarity index 99% rename from src/query/src/tresultBuf.c rename to src/query/src/qresultBuf.c index 11e17cc5a3..fa7c59be4e 100644 --- a/src/query/src/tresultBuf.c +++ b/src/query/src/qresultBuf.c @@ -1,9 +1,9 @@ +#include "qresultBuf.h" #include "hash.h" +#include "qextbuffer.h" #include "taoserror.h" -#include "textbuffer.h" #include "tlog.h" #include "tsqlfunction.h" -#include "tresultBuf.h" #define DEFAULT_INTERN_BUF_SIZE 16384L diff --git a/src/query/src/tscSyntaxtreefunction.c b/src/query/src/qsyntaxtreefunction.c similarity index 99% rename from src/query/src/tscSyntaxtreefunction.c rename to src/query/src/qsyntaxtreefunction.c index e3c00ed59d..d21f7dab73 100644 --- a/src/query/src/tscSyntaxtreefunction.c +++ b/src/query/src/qsyntaxtreefunction.c @@ -15,7 +15,7 @@ #include "os.h" -#include "tscSyntaxtreefunction.h" +#include "qsyntaxtreefunction.h" #include "taosdef.h" #include "tutil.h" diff --git a/src/query/src/ttokenizer.c b/src/query/src/qtokenizer.c similarity index 100% rename from src/query/src/ttokenizer.c rename to src/query/src/qtokenizer.c diff --git a/src/query/inc/tcache.h b/src/util/inc/tcache.h similarity index 100% rename from src/query/inc/tcache.h rename to src/util/inc/tcache.h diff --git a/src/client/src/tcache.c b/src/util/src/tcache.c similarity index 100% rename from src/client/src/tcache.c rename to src/util/src/tcache.c diff --git a/src/vnode/detail/inc/vnodeRead.h b/src/vnode/detail/inc/vnodeRead.h index 2758cfe1d9..232a92608c 100644 --- a/src/vnode/detail/inc/vnodeRead.h +++ b/src/vnode/detail/inc/vnodeRead.h @@ -21,9 +21,9 @@ extern "C" { #endif #include "os.h" -#include "tresultBuf.h" +#include "qresultBuf.h" -#include "tinterpolation.h" +#include "qinterpolation.h" #include "vnodeTagMgmt.h" /* diff --git a/src/vnode/detail/src/vnodeQueryImpl.c b/src/vnode/detail/src/vnodeQueryImpl.c index 2cdcc91300..80e0735672 100644 --- a/src/vnode/detail/src/vnodeQueryImpl.c +++ b/src/vnode/detail/src/vnodeQueryImpl.c @@ -16,11 +16,11 @@ #include "hash.h" #include "hashfunc.h" #include "os.h" +#include "qextbuffer.h" #include "taosmsg.h" -#include "textbuffer.h" #include "ttime.h" -#include "tinterpolation.h" +#include "qinterpolation.h" #include "tscJoinProcess.h" #include "tscSecondaryMerge.h" #include "tscompression.h" diff --git a/src/vnode/detail/src/vnodeQueryProcess.c b/src/vnode/detail/src/vnodeQueryProcess.c index ae51365918..cedb76b4ac 100644 --- a/src/vnode/detail/src/vnodeQueryProcess.c +++ b/src/vnode/detail/src/vnodeQueryProcess.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "qextbuffer.h" #include "taosmsg.h" -#include "textbuffer.h" #include "tscJoinProcess.h" #include "ttime.h" #include "vnode.h" diff --git a/src/vnode/detail/src/vnodeRead.c b/src/vnode/detail/src/vnodeRead.c index f21294a68e..87047f8039 100644 --- a/src/vnode/detail/src/vnodeRead.c +++ b/src/vnode/detail/src/vnodeRead.c @@ -19,9 +19,9 @@ #include "hash.h" #include "hashfunc.h" #include "ihash.h" +#include "qextbuffer.h" #include "taosmsg.h" #include "tast.h" -#include "textbuffer.h" #include "tscJoinProcess.h" #include "tscompression.h" #include "vnode.h" diff --git a/src/vnode/detail/src/vnodeSupertableQuery.c b/src/vnode/detail/src/vnodeSupertableQuery.c index 038577bd8d..349e67e3a2 100644 --- a/src/vnode/detail/src/vnodeSupertableQuery.c +++ b/src/vnode/detail/src/vnodeSupertableQuery.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE -#include "os.h" #include "mnode.h" -#include "textbuffer.h" +#include "os.h" +#include "qextbuffer.h" +#include "tast.h" #include "tschemautil.h" #include "tsqlfunction.h" -#include "tast.h" //#include "vnodeTagMgmt.h" typedef struct SSyntaxTreeFilterSupporter { diff --git a/src/vnode/detail/src/vnodeTagMgmt.c b/src/vnode/detail/src/vnodeTagMgmt.c index d3e22ec00a..9bb0191afe 100644 --- a/src/vnode/detail/src/vnodeTagMgmt.c +++ b/src/vnode/detail/src/vnodeTagMgmt.c @@ -16,12 +16,12 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "qextbuffer.h" #include "taosdef.h" -#include "tlog.h" -#include "tutil.h" #include "taosmsg.h" -#include "textbuffer.h" #include "tast.h" +#include "tlog.h" +#include "tutil.h" #include "vnodeTagMgmt.h" #define GET_TAG_VAL_POINTER(s, col, sc, t) ((t *)(&((s)->tags[getColumnModelOffset(sc, col)]))) -- GitLab