提交 64dfc1da 编写于 作者: M Minghao Li

merge 3.0 code

CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(inc)
INCLUDE_DIRECTORIES(jni)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/plugins/http/inc)
AUX_SOURCE_DIRECTORY(src SRC)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
# set the static lib name
ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m rt cJson ${VAR_TSZ})
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1)
# generate dynamic library (*.so)
ADD_LIBRARY(taos SHARED ${SRC})
TARGET_LINK_LIBRARIES(taos common query trpc tutil pthread m rt cJson)
IF (TD_LINUX_64)
TARGET_LINK_LIBRARIES(taos lua)
ENDIF ()
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
#set version of .so
#VERSION so version
#SOVERSION api version
#MESSAGE(STATUS "build version ${TD_VER_NUMBER}")
SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${TD_VER_NUMBER} SOVERSION 1)
ADD_SUBDIRECTORY(tests)
ELSEIF (TD_DARWIN)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
# set the static lib name
ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m lua cJson)
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1)
# generate dynamic library (*.dylib)
ADD_LIBRARY(taos SHARED ${SRC})
TARGET_LINK_LIBRARIES(taos common query trpc tutil pthread m lua cJson)
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
#set version of .dylib
#VERSION dylib version
#SOVERSION dylib version
#MESSAGE(STATUS "build version ${TD_VER_NUMBER}")
SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${TD_VER_NUMBER} SOVERSION 1)
ADD_SUBDIRECTORY(tests)
ELSEIF (TD_WINDOWS)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32)
CONFIGURE_FILE("${TD_COMMUNITY_DIR}/src/client/src/taos.rc.in" "${TD_COMMUNITY_DIR}/src/client/src/taos.rc")
ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static trpc tutil query cJson)
# generate dynamic library (*.dll)
ADD_LIBRARY(taos SHARED ${SRC} ${TD_COMMUNITY_DIR}/src/client/src/taos.rc)
IF (NOT TD_GODLL)
SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def)
ENDIF ()
TARGET_LINK_LIBRARIES(taos trpc tutil query lua cJson)
ELSEIF (TD_DARWIN)
SET(CMAKE_MACOSX_RPATH 1)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m lua cJson)
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
# generate dynamic library (*.dylib)
ADD_LIBRARY(taos SHARED ${SRC})
TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m lua cJson)
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
#set version of .so
#VERSION so version
#SOVERSION api version
#MESSAGE(STATUS "build version ${TD_VER_NUMBER}")
SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${TD_VER_NUMBER} SOVERSION 1)
ENDIF ()
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCGLOBALMERGE_H
#define TDENGINE_TSCGLOBALMERGE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "qExtbuffer.h"
#include "qFill.h"
#include "tmsg.h"
#include "tlosertree.h"
#include "qExecutor.h"
#define MAX_NUM_OF_SUBQUERY_RETRY 3
struct SQLFunctionCtx;
typedef struct SLocalDataSource {
tExtMemBuffer *pMemBuffer;
int32_t flushoutIdx;
int32_t pageId;
int32_t rowIdx;
tFilePage filePage;
} SLocalDataSource;
typedef struct SGlobalMerger {
SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;
int32_t numOfVnode;
SLoserTreeInfo *pLoserTree;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor *pDesc;
tExtMemBuffer **pExtMemBuffer; // disk-based buffer
char *buf; // temp buffer
} SGlobalMerger;
struct SSqlObj;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
int32_t subqueryIndex; // index of current vnode in vnode list
struct SSqlObj *pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscCreateGlobalMergerEnv(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
void tscDestroyGlobalMergerEnv(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType);
int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, int32_t orderType);
/*
* create local reducer to launch the second-stage reduce process at client site
*/
int32_t tscCreateGlobalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo *pQueryInfo, SGlobalMerger **pMerger, int64_t id);
void tscDestroyGlobalMerger(SGlobalMerger* pMerger);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCGLOBALMERGE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCLOG_H
#define TDENGINE_TSCLOG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
extern uint32_t cDebugFlag;
extern int8_t tscEmbedded;
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2021 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCPARSELINE_H
#define TDENGINE_TSCPARSELINE_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
char* key;
uint8_t type;
int16_t length;
char* value;
} TAOS_SML_KV;
typedef struct {
char* stbName;
char* childTableName;
TAOS_SML_KV* tags;
int32_t tagNum;
// first kv must be timestamp
TAOS_SML_KV* fields;
int32_t fieldNum;
} TAOS_SML_DATA_POINT;
typedef enum {
SML_TIME_STAMP_NOW,
SML_TIME_STAMP_SECONDS,
SML_TIME_STAMP_MILLI_SECONDS,
SML_TIME_STAMP_MICRO_SECONDS,
SML_TIME_STAMP_NANO_SECONDS
} SMLTimeStampType;
typedef struct {
uint64_t id;
SHashObj* smlDataToSchema;
} SSmlLinesInfo;
int tscSmlInsert(TAOS* taos, TAOS_SML_DATA_POINT* points, int numPoint, SSmlLinesInfo* info);
bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info);
int32_t isValidChildTableName(const char *pTbName, int16_t len);
bool convertSmlValueType(TAOS_SML_KV *pVal, char *value,
uint16_t len, SSmlLinesInfo* info);
int32_t convertSmlTimeStamp(TAOS_SML_KV *pVal, char *value,
uint16_t len, SSmlLinesInfo* info);
void destroySmlDataPoint(TAOS_SML_DATA_POINT* point);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCPARSELINE_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCSUBQUERY_H
#define TDENGINE_TSCSUBQUERY_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tscUtil.h"
#include "tsclient.h"
void tscFetchDatablockForSubquery(SSqlObj* pSql);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSupporter* tscCreateJoinSupporter(SSqlObj* pSql, int32_t index);
void tscHandleMasterJoinQuery(SSqlObj* pSql);
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
int32_t tscHandleInsertRetry(SSqlObj* parent, SSqlObj* child);
void tscBuildResFromSubqueries(SSqlObj *pSql);
TAOS_ROW doSetResultRowData(SSqlObj *pSql);
char *getArithmeticInputSrc(void *param, const char *name, int32_t colId);
void tscLockByThread(int64_t *lockedBy);
void tscUnlockByThread(int64_t *lockedBy);
int tsInsertInitialCheck(SSqlObj *pSql);
void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs);
void tscFreeRetrieveSup(SSqlObj *pSql);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCSUBQUERY_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCUTIL_H
#define TDENGINE_TSCUTIL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "exception.h"
#include "os.h"
#include "qExtbuffer.h"
#include "taosdef.h"
#include "tbuffer.h"
#include "tscGlobalmerge.h"
#include "tsched.h"
#include "tsclient.h"
#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
// and set 'pack' to 1 to avoid break existing code.
typedef struct STidTags {
int16_t padding;
int64_t uid;
int32_t tid;
int32_t vgId;
char tag[];
} STidTags;
#pragma pack(pop)
typedef struct SJoinSupporter {
SSqlObj* pObj; // parent SqlObj
int32_t subqueryIndex; // index of sub query
SInterval interval;
SLimitVal limit; // limit info
uint64_t uid; // query table uid
SArray* colList; // previous query information, no need to use this attribute, and the corresponding attribution
SArray* exprList;
SArray* colCond;
SFieldInfo fieldsInfo;
STagCond tagCond;
SGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
int32_t tagSize; // the length of each in the first filter stage
char* pIdTagList; // result of first stage tags
int32_t totalLen;
int32_t num;
SArray* pVgroupTables;
} SJoinSupporter;
typedef struct SMergeCtx {
SJoinSupporter* p;
int32_t idx;
SArray* res;
int8_t compared;
}SMergeCtx;
typedef struct SMergeTsCtx {
SJoinSupporter* p;
STSBuf* res;
int64_t numOfInput;
int8_t compared;
}SMergeTsCtx;
typedef struct SVgroupTableInfo {
SVgroupMsg vgInfo;
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
typedef struct SBlockKeyTuple {
TSKEY skey;
void* payloadAddr;
} SBlockKeyTuple;
typedef struct SBlockKeyInfo {
int32_t maxBytesAlloc;
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBlockMap);
int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, SName* pName, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks, SArray* pBlockList);
/**
* for the projection query on metric or point interpolation query on metric,
* we iterate all the meters, instead of invoke query on all qualified meters simultaneously.
*
* @param pSql sql object
* @return
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsIrateQuery(SQueryInfo* pQueryInfo);
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
int32_t tscGetTopBotQueryExprIndex(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo);
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsDiffDerivQuery(SQueryInfo* pQueryInfo);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscHasColumnFilter(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);
bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType, int16_t colId);
int32_t tscSetTableFullName(SName* pName, SStrToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertData(char* sqlstr);
// the memory is not reset in case of fast allocate payload function
int32_t tscAllocPayloadFast(SSqlCmd *pCmd, size_t size);
int32_t tscAllocPayload(SSqlCmd* pCmd, int size);
TAOS_FIELD tscCreateField(int8_t type, const char* name, int16_t bytes);
SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField);
SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* field);
SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index);
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscGetFirstInvisibleFieldPos(SQueryInfo* pQueryInfo);
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
int32_t tscFieldInfoSetSize(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscExprCreate(STableMetaInfo* pTableMetaInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType);
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
int32_t tscExprTopBottomIndex(SQueryInfo* pQueryInfo);
SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy);
void tscExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id);
SColumn* tscColumnClone(const SColumn* src);
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc);
int32_t tscColumnExists(SArray* pColumnList, int32_t columnId, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);
void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t numOfParams);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetSTableQueryCond(STagCond* pCond, uint64_t uid);
void tsSetSTableQueryCond(STagCond* pTagCond, uint64_t uid, SBufferWriter* bw);
int32_t tscTagCondCopy(STagCond* dest, const STagCond* src);
int32_t tscColCondCopy(SArray** dest, const SArray* src, uint64_t uid, int16_t tidx);
void tscTagCondRelease(STagCond* pCond);
void tscColCondRelease(SArray** pCond);
void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int32_t tscAddQueryInfo(SSqlCmd *pCmd);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
STableMetaInfo* tscAddTableMetaInfo(SQueryInfo* pQueryInfo, SName* name, STableMeta* pTableMeta,
SVgroupsInfo* vgroupList, SArray* pTagCols, SArray* pVgroupTables);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
void tscFreeVgroupTableInfo(SArray* pVgroupTables);
SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool createIfNotExists, bool onlyLocal);
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo);
void tscResetForNextRetrieve(SSqlRes* pRes);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
SVgroupsInfo* tscVgroupInfoClone(SVgroupsInfo *pInfo);
void* tscVgroupInfoClear(SVgroupsInfo *pInfo);
#if 0
void tscSVgroupInfoCopy(SVgroupInfo* dst, const SVgroupInfo* src);
#endif
/**
* The create object function must be successful expect for the out of memory issue.
*
* Therefore, the metermeta/metricmeta object is directly passed to the newly created subquery object from the
* previous sql object, instead of retrieving the metermeta/metricmeta from cache.
*
* Because the metermeta/metricmeta may have been released by other threads, resulting in the retrieving failed as
* well as the create function.
*
* @param pSql
* @param vnodeIndex
* @param tableIndex
* @param fp
* @param param
* @param pPrevSql
* @return
*/
SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, int32_t cmd);
void registerSqlObj(SSqlObj* pSql);
void tscInitResForMerge(SSqlRes* pRes);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlCmd* pCmd);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
int32_t doInitSubState(SSqlObj* pSql, int32_t numOfSubqueries);
void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize);
char* serializeTagData(STagData* pTagData, char* pMsg);
int32_t copyTagData(STagData* dst, const STagData* src);
STableMeta* createSuperTableMeta(STableMetaMsg* pChild);
uint32_t tscGetTableMetaSize(STableMeta* pTableMeta);
CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t *tableMetaCapacity, STableMeta **ppStable);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
int32_t tscGetTagFilterSerializeLen(SQueryInfo* pQueryInfo);
int32_t tscGetColFilterSerializeLen(SQueryInfo* pQueryInfo);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage, uint64_t qId);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
char* strdup_throw(const char* str);
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
STblCond* tsGetTableFilter(SArray* filters, uint64_t uid, int16_t idx);
void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
char* cloneCurrentDBName(SSqlObj* pSql);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCUTIL_H
此差异已折叠。
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_taosdata_jdbc_TSDBJNIConnector */
#ifndef _Included_com_taosdata_jdbc_TSDBJNIConnector
#define _Included_com_taosdata_jdbc_TSDBJNIConnector
#ifdef __cplusplus
extern "C" {
#endif
#undef com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE
#define com_taosdata_jdbc_TSDBJNIConnector_INVALID_CONNECTION_POINTER_VALUE 0LL
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method:
* Signature: (Ljava/lang/String;)V
*/
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setAllocModeImp
(JNIEnv *, jclass, jint, jstring, jboolean);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method:
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_dumpMemoryLeakImp
(JNIEnv *, jclass);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: initImp
* Signature: (Ljava/lang/String;)V
*/
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_initImp
(JNIEnv *, jclass, jstring);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setOptions
* Signature: (ILjava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setOptions
(JNIEnv *, jclass, jint, jstring);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: getTsCharset
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getTsCharset
(JNIEnv *, jclass);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: getResultTimePrecisionImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TDDBJNIConnector_getResultTimePrecisionImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: connectImp
* Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_connectImp
(JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeQueryImp
* Signature: ([BJ)I
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeQueryImp
(JNIEnv *, jobject, jbyteArray, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: getErrCodeImp
* Signature: (J)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrCodeImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: getErrMsgImp
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getErrMsgImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: getResultSetImp
* Signature: (J)J
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getResultSetImp
(JNIEnv *env, jobject jobj, jlong con, jlong tres);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: isUpdateQueryImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_isUpdateQueryImp
(JNIEnv *env, jobject jobj, jlong con, jlong tres);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: freeResultSetImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_freeResultSetImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: getAffectedRowsImp
* Signature: (J)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getAffectedRowsImp
(JNIEnv *env, jobject jobj, jlong con, jlong res);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: getSchemaMetaDataImp
* Signature: (JJLjava/util/List;)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_getSchemaMetaDataImp
(JNIEnv *, jobject, jlong, jlong, jobject);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: fetchRowImp
* Signature: (JJLcom/taosdata/jdbc/TSDBResultSetRowData;)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchRowImp
(JNIEnv *, jobject, jlong, jlong, jobject);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: fetchBlockImp
* Signature: (JJLcom/taosdata/jdbc/TSDBResultSetBlockData;)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_fetchBlockImp
(JNIEnv *, jobject, jlong, jlong, jobject);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: closeConnectionImp
* Signature: (J)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: subscribeImp
* Signature: (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;JI)J
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp
(JNIEnv *, jobject, jlong, jboolean, jstring, jstring, jint);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: consumeImp
* Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData;
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: unsubscribeImp
* Signature: (J)V
*/
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp
(JNIEnv *, jobject, jlong, jboolean);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: validateCreateTableSqlImp
* Signature: (J[B)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTableSqlImp
(JNIEnv *, jobject, jlong, jbyteArray);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: prepareStmtImp
* Signature: ([BJ)I
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp
(JNIEnv *, jobject, jbyteArray, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setBindTableNameImp
* Signature: (JLjava/lang/String;J)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameImp
(JNIEnv *, jobject, jlong, jstring, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: bindColDataImp
* Signature: (J[B[B[BIIIIJ)J
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp
(JNIEnv *, jobject, jlong, jbyteArray, jbyteArray, jbyteArray, jint, jint, jint, jint, jlong);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: closeStmt
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
#ifdef __cplusplus
}
#endif
#endif
此差异已折叠。
EXPORTS
taos_init
taos_cleanup
taos_options
taos_connect
taos_connect_auth
taos_close
taos_stmt_init
taos_stmt_prepare
taos_stmt_set_tbname_tags
taos_stmt_set_tbname
taos_stmt_is_insert
taos_stmt_num_params
taos_stmt_bind_param
taos_stmt_add_batch
taos_stmt_execute
taos_stmt_use_result
taos_stmt_close
taos_stmt_errstr
taos_query
taos_fetch_row
taos_result_precision
taos_free_result
taos_field_count
taos_num_fields
taos_affected_rows
taos_fetch_fields
taos_select_db
taos_print_row
taos_stop_query
taos_fetch_block
taos_validate_sql
taos_fetch_lengths
taos_get_server_info
taos_get_client_info
taos_errstr
taos_errno
taos_query_a
taos_fetch_rows_a
taos_subscribe
taos_consume
taos_unsubscribe
taos_open_stream
taos_close_stream
taos_load_table_info
1 VERSIONINFO
FILEVERSION ${TD_VER_NUMBER}
PRODUCTVERSION ${TD_VER_NUMBER}
FILEFLAGSMASK 0x17L
#ifdef _DEBUG
FILEFLAGS 0x1L
#else
FILEFLAGS 0x0L
#endif
FILEOS 0x4L
FILETYPE 0x0L
FILESUBTYPE 0x0L
BEGIN
BLOCK "StringFileInfo"
BEGIN
BLOCK "040904b0"
BEGIN
VALUE "FileDescription", "Native C Driver for TDengine"
VALUE "FileVersion", "${TD_VER_NUMBER}"
VALUE "InternalName", "taos.dll(${TD_VER_CPUTYPE})"
VALUE "LegalCopyright", "Copyright (C) 2020 TAOS Data"
VALUE "OriginalFilename", ""
VALUE "ProductName", "taos.dll(${TD_VER_CPUTYPE})"
VALUE "ProductVersion", "${TD_VER_NUMBER}"
END
END
BLOCK "VarFileInfo"
BEGIN
VALUE "Translation", 0x409, 1200
END
END
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tutil.h"
#include "tnote.h"
#include "trpc.h"
#include "tscLog.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsched.h"
#include "qTableMeta.h"
#include "tsclient.h"
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
/*
* Proxy function to perform sequentially query&retrieve operation.
* If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection
* query), it will sequentially query&retrieve data for all vnodes
*/
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* param, const char* sqlstr, size_t sqlLen) {
SSqlCmd* pCmd = &pSql->cmd;
pSql->signature = pSql;
pSql->param = param;
pSql->pTscObj = pObj;
pSql->parseRetry= 0;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp;
pSql->fetchFp = fp;
registerSqlObj(pSql);
pSql->sqlstr = calloc(1, sqlLen + 1);
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
pSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
tscAsyncResultOnError(pSql);
return;
}
strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pCmd->resColumnId = TSDB_RES_COL_ID;
taosAcquireRef(tscObjRef, pSql->self);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscAsyncResultOnError(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
taosReleaseRef(tscObjRef, pSql->self);
}
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
taos_query_ra(taos, sqlstr, fp, param);
}
TAOS_RES * taos_query_ra(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("pObj:%p is NULL or freed", pObj);
terrno = TSDB_CODE_TSC_DISCONNECTED;
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
return NULL;
}
int32_t sqlLen = (int32_t)strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
tscQueueAsyncError(fp, param, terrno);
return NULL;
}
nPrintTsc("%s", sqlstr);
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_OUT_OF_MEMORY);
return NULL;
}
doAsyncQuery(pObj, pSql, fp, param, sqlstr, sqlLen);
return pSql;
}
static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
if (tres == NULL) {
return;
}
SSqlObj *pSql = (SSqlObj *)tres;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
if (numOfRows == 0) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
} else {
/*
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling;
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
(*pSql->fetchFp)(param, pSql, 0);
}
return;
}
// local merge has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE) {
pRes->numOfClauseTotal += pRes->numOfRows;
}
(*pSql->fetchFp)(param, tres, numOfRows);
}
// actual continue retrieve function with user-specified callback function
static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, __async_cb_func_t fp) {
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL) { // error
tscError("sql object is NULL");
return;
}
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
if ((pRes->qId == 0 || numOfRows != 0) && pCmd->command < TSDB_SQL_LOCAL) {
if (pRes->qId == 0 && numOfRows != 0) {
tscError("qhandle is NULL");
} else {
pRes->code = numOfRows;
}
tscAsyncResultOnError(pSql);
return;
}
pSql->fp = fp;
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else {
tscBuildAndSendRequest(pSql, NULL);
}
}
/*
* retrieve callback for fetch rows proxy.
* The below two functions both serve as the callback function of query virtual node.
* query callback first, and then followed by retrieve callback
*/
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) {
// query completed, continue to retrieve
tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscAsyncFetchRowsProxy);
}
void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
SSqlObj *pSql = (SSqlObj *)tres;
if (pSql == NULL || pSql->signature != pSql) {
tscError("sql object is NULL");
tscQueueAsyncError(fp, param, TSDB_CODE_TSC_DISCONNECTED);
return;
}
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param;
tscResetForNextRetrieve(pRes);
// handle outer query based on the already retrieved nest query results.
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
return;
}
if (pRes->qId == 0) {
tscError("qhandle is invalid");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscAsyncResultOnError(pSql);
return;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
} else {
/*
* all available virtual nodes in current clause has been checked already, now try the
* next one in the following union subclause
*/
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling; // todo refactor
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
(*pSql->fetchFp)(param, pSql, 0);
}
return;
} else if (pCmd->command == TSDB_SQL_RETRIEVE_MNODE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) {
// in case of show command, return no data
(*pSql->fetchFp)(param, pSql, 0);
} else {
assert(0);
}
} else { // current query is not completed, continue retrieve from node
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE_MNODE : TSDB_SQL_FETCH;
}
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd);
tscBuildAndSendRequest(pSql, pQueryInfo1);
}
}
// this function will be executed by queue task threads, so the terrno is not valid
static void tscProcessAsyncError(SSchedMsg *pMsg) {
void (*fp)() = pMsg->ahandle;
terrno = *(int32_t*) pMsg->msg;
tfree(pMsg->msg);
(*fp)(pMsg->thandle, NULL, terrno);
}
void tscQueueAsyncError(void(*fp), void *param, int32_t code) {
int32_t* c = malloc(sizeof(int32_t));
*c = code;
SSchedMsg schedMsg = {0};
schedMsg.fp = tscProcessAsyncError;
schedMsg.ahandle = fp;
schedMsg.thandle = param;
schedMsg.msg = c;
taosScheduleTask(tscQhandle, &schedMsg);
}
static void tscAsyncResultCallback(SSchedMsg *pMsg) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pMsg->ahandle);
if (pSql == NULL || pSql->signature != pSql) {
tscDebug("%p SqlObj is freed, not add into queue async res", pMsg->ahandle);
return;
}
assert(pSql->res.code != TSDB_CODE_SUCCESS);
tscError("0x%"PRIx64" async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
SSqlRes *pRes = &pSql->res;
if (pSql->fp == NULL || pSql->fetchFp == NULL){
taosReleaseRef(tscObjRef, pSql->self);
return;
}
pSql->fp = pSql->fetchFp;
(*pSql->fp)(pSql->param, pSql, pRes->code);
taosReleaseRef(tscObjRef, pSql->self);
}
void tscAsyncResultOnError(SSqlObj* pSql) {
SSchedMsg schedMsg = {0};
schedMsg.fp = tscAsyncResultCallback;
schedMsg.ahandle = (void *)pSql->self;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
}
int tscSendMsgToServer(SSqlObj *pSql);
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
if (pSql == NULL) return;
assert(pSql->signature == pSql && (int64_t)param == pSql->self);
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
pRes->code = code;
SSqlObj *sub = (SSqlObj*) res;
const char* msg = (sub->cmd.command == TSDB_SQL_STABLEVGROUP)? "vgroup-list":"multi-tableMeta";
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" get %s failed, code:%s", pSql->self, msg, tstrerror(code));
if (code == TSDB_CODE_RPC_FQDN_ERROR) {
size_t sz = strlen(tscGetErrorMsgPayload(&sub->cmd));
tscAllocPayload(&pSql->cmd, (int)sz + 1);
memcpy(tscGetErrorMsgPayload(&pSql->cmd), tscGetErrorMsgPayload(&sub->cmd), sz);
}
goto _error;
}
tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
if (pSql->pStream == NULL) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL && TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) { // stmt insert
(*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { // file insert
tscImportDataFromFile(pSql);
} else { // sql string insert
tscHandleMultivnodeInsert(pSql);
}
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
}
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo1);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // stream computing
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscNumOfExprs(pQueryInfo) == 0) {
tsParseSql(pSql, false);
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
_error:
pRes->code = code;
tscAsyncResultOnError(pSql);
taosReleaseRef(tscObjRef, pSql->self);
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tscLog.h"
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"
#include "tmsg.h"
#include "tcq.h"
#include "../../../include/client/taos.h"
#include "tscUtil.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId);
TAOS *tscSlowQueryConn = NULL;
bool tscSlowQueryConnInitialized = false;
void tscInitConnCb(void *param, TAOS_RES *result, int code) {
char *sql = param;
if (code < 0) {
tscError("taos:%p, slow query connect failed, code:%d", tscSlowQueryConn, code);
taos_close(tscSlowQueryConn);
tscSlowQueryConn = NULL;
tscSlowQueryConnInitialized = false;
free(sql);
} else {
tscDebug("taos:%p, slow query connect success, code:%d", tscSlowQueryConn, code);
tscSlowQueryConnInitialized = true;
tscSaveSlowQueryFp(sql, NULL);
}
taos_free_result(result);
}
void tscAddIntoSqlList(SSqlObj *pSql) {
static uint32_t queryId = 1;
STscObj *pObj = pSql->pTscObj;
if (pSql->listed) return;
pthread_mutex_lock(&pObj->mutex);
assert(pSql != pObj->sqlList);
pSql->next = pObj->sqlList;
if (pObj->sqlList) pObj->sqlList->prev = pSql;
pObj->sqlList = pSql;
pSql->queryId = atomic_fetch_add_32(&queryId, 1);
pthread_mutex_unlock(&pObj->mutex);
pSql->stime = taosGetTimestampMs();
pSql->listed = 1;
tscDebug("0x%"PRIx64" added into sqlList, queryId:%u", pSql->self, pSql->queryId);
}
void tscSaveSlowQueryFpCb(void *param, TAOS_RES *result, int code) {
if (code < 0) {
tscError("failed to save slow query, code:%d", code);
} else {
tscDebug("success to save slow query, code:%d", code);
}
taos_free_result(result);
}
void tscSaveSlowQueryFp(void *handle, void *tmrId) {
char *sql = handle;
if (!tscSlowQueryConnInitialized) {
if (tscSlowQueryConn == NULL) {
tscDebug("start to init slow query connect");
taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, tscInitConnCb, sql, &tscSlowQueryConn);
} else {
tscError("taos:%p, slow query connect is already initialized", tscSlowQueryConn);
free(sql);
}
} else {
tscDebug("taos:%p, save slow query:%s", tscSlowQueryConn, sql);
taos_query_a(tscSlowQueryConn, sql, tscSaveSlowQueryFpCb, NULL);
free(sql);
}
}
void tscSaveSlowQuery(SSqlObj *pSql) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable
size_t size = 200; // other part of sql string, expect the main sql str
if (pSql->res.useconds < SLOW_QUERY_INTERVAL) {
return;
}
tscDebug("0x%"PRIx64" query time:%" PRId64 " sql:%s", pSql->self, pSql->res.useconds, pSql->sqlstr);
int32_t sqlSize = (int32_t)(TSDB_SLOW_QUERY_SQL_LEN + size);
char *sql = malloc(sqlSize);
if (sql == NULL) {
tscError("0x%"PRIx64" failed to allocate memory to sent slow query to dnode", pSql->self);
return;
}
int len = snprintf(sql, size, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName,
pSql->pTscObj->user, pSql->stime, pSql->res.useconds);
int sqlLen = snprintf(sql + len, TSDB_SLOW_QUERY_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SLOW_QUERY_SQL_LEN - 1) {
sqlLen = len + TSDB_SLOW_QUERY_SQL_LEN - 1;
} else {
sqlLen += len;
}
strcpy(sql + sqlLen, "')");
taosTmrStart(tscSaveSlowQueryFp, 200, sql, tscTmr);
}
void tscRemoveFromSqlList(SSqlObj *pSql) {
STscObj *pObj = pSql->pTscObj;
if (pSql->listed == 0) return;
pthread_mutex_lock(&pObj->mutex);
if (pSql->prev)
pSql->prev->next = pSql->next;
else
pObj->sqlList = pSql->next;
if (pSql->next) pSql->next->prev = pSql->prev;
pthread_mutex_unlock(&pObj->mutex);
pSql->next = NULL;
pSql->prev = NULL;
pSql->listed = 0;
tscSaveSlowQuery(pSql);
tscDebug("0x%"PRIx64" removed from sqlList", pSql->self);
}
void tscKillQuery(STscObj *pObj, uint32_t killId) {
pthread_mutex_lock(&pObj->mutex);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
if (pSql->queryId == killId) break;
pSql = pSql->next;
}
pthread_mutex_unlock(&pObj->mutex);
if (pSql == NULL) {
tscError("failed to kill query, id:%d, it may have completed/terminated", killId);
} else {
tscDebug("0x%"PRIx64" query is killed, queryId:%d", pSql->self, killId);
taos_stop_query(pSql);
}
}
void tscAddIntoStreamList(SSqlStream *pStream) {
static uint32_t streamId = 1;
STscObj * pObj = pStream->pSql->pTscObj;
pthread_mutex_lock(&pObj->mutex);
pStream->next = pObj->streamList;
if (pObj->streamList) pObj->streamList->prev = pStream;
pObj->streamList = pStream;
pStream->streamId = streamId++;
pthread_mutex_unlock(&pObj->mutex);
pStream->listed = 1;
}
void tscRemoveFromStreamList(SSqlStream *pStream, SSqlObj *pSqlObj) {
if (pStream->listed == 0) return;
STscObj *pObj = pSqlObj->pTscObj;
pthread_mutex_lock(&pObj->mutex);
if (pStream->prev)
pStream->prev->next = pStream->next;
else
pObj->streamList = pStream->next;
if (pStream->next) pStream->next->prev = pStream->prev;
pthread_mutex_unlock(&pObj->mutex);
pStream->next = NULL;
pStream->prev = NULL;
pStream->listed = 0;
}
void tscKillStream(STscObj *pObj, uint32_t killId) {
pthread_mutex_lock(&pObj->mutex);
SSqlStream *pStream = pObj->streamList;
while (pStream) {
if (pStream->streamId == killId) break;
pStream = pStream->next;
}
pthread_mutex_unlock(&pObj->mutex);
if (pStream) {
tscDebug("0x%"PRIx64" stream:%p is killed, streamId:%d", pStream->pSql->self, pStream, killId);
if (pStream->callback) {
pStream->callback(pStream->param);
}
taos_close_stream(pStream);
} else {
tscError("failed to kill stream, streamId:%d not exist", killId);
}
}
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SHeartBeatReq *pHeartbeat = pMsg;
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
pHeartbeat->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
int64_t now = taosGetTimestampMs();
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
/*
* avoid sqlobj may not be correctly removed from sql list
* e.g., forgetting to free the sql result may cause the sql object still in sql list
*/
if (pSql->sqlstr == NULL) {
pSql = pSql->next;
continue;
}
tstrncpy(pQdesc->sql, pSql->sqlstr, sizeof(pQdesc->sql));
pQdesc->stime = htobe64(pSql->stime);
pQdesc->queryId = htonl(pSql->queryId);
pQdesc->useconds = htobe64(now - pSql->stime);
pQdesc->qId = htobe64(pSql->res.qId);
pQdesc->sqlObjId = htobe64(pSql->self);
pQdesc->pid = pHeartbeat->pid;
pQdesc->numOfSub = pSql->subState.numOfSub;
// todo race condition
pQdesc->stableQuery = 0;
char *p = pQdesc->subSqlInfo;
int32_t remainLen = sizeof(pQdesc->subSqlInfo);
if (pQdesc->numOfSub == 0) {
snprintf(p, remainLen, "N/A");
} else {
// SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
// if (pQueryInfo != NULL) {
// pQdesc->stableQuery = (pQueryInfo->stableQuery)?1:0;
// } else {
// pQdesc->stableQuery = 0;
// }
if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
SSqlObj *psub = pSql->pSubs[i];
int64_t self = (psub != NULL)? psub->self : 0;
int32_t len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i, self, pSql->subState.states[i] ? 'C' : 'I');
if (len > remainLen) {
break;
}
remainLen -= len;
p += len;
}
}
}
pQdesc->numOfSub = htonl(pQdesc->numOfSub);
taosGetFqdn(pQdesc->fqdn);
pHeartbeat->numOfQueries++;
pQdesc++;
pSql = pSql->next;
if (pHeartbeat->numOfQueries >= allocedQueriesNum) {
break;
}
}
pHeartbeat->numOfStreams = 0;
SStreamDesc *pSdesc = (SStreamDesc *)pQdesc;
SSqlStream *pStream = pObj->streamList;
while (pStream) {
tstrncpy(pSdesc->sql, pStream->pSql->sqlstr, sizeof(pSdesc->sql));
if (pStream->dstTable == NULL) {
pSdesc->dstTable[0] = 0;
} else {
tstrncpy(pSdesc->dstTable, pStream->dstTable, sizeof(pSdesc->dstTable));
}
pSdesc->streamId = htonl(pStream->streamId);
pSdesc->num = htobe64(pStream->num);
pSdesc->useconds = htobe64(pStream->useconds);
pSdesc->stime = (pStream->stime == INT64_MIN) ? htobe64(pStream->stime) : htobe64(pStream->stime - pStream->interval.interval);
pSdesc->ctime = htobe64(pStream->ctime);
pSdesc->slidingTime = htobe64(pStream->interval.sliding);
pSdesc->interval = htobe64(pStream->interval.interval);
pHeartbeat->numOfStreams++;
pSdesc++;
pStream = pStream->next;
if (pHeartbeat->numOfStreams >= allocedStreamsNum) break;
}
int32_t msgLen = pHeartbeat->numOfQueries * sizeof(SQueryDesc) + pHeartbeat->numOfStreams * sizeof(SStreamDesc) +
sizeof(SHeartBeatReq);
pHeartbeat->connId = htonl(pObj->connId);
pHeartbeat->numOfQueries = htonl(pHeartbeat->numOfQueries);
pHeartbeat->numOfStreams = htonl(pHeartbeat->numOfStreams);
return msgLen;
}
// cqContext->dbconn is killed then call this callback
void cqConnKilledNotify(void* handle, void* conn) {
if (handle == NULL || conn == NULL){
return ;
}
SCqContext* pContext = (SCqContext*) handle;
if (pContext->dbConn == conn){
atomic_store_ptr(&(pContext->dbConn), NULL);
}
}
void tscKillConnection(STscObj *pObj) {
// get stream header by locked
pthread_mutex_lock(&pObj->mutex);
SSqlStream *pStream = pObj->streamList;
pthread_mutex_unlock(&pObj->mutex);
while (pStream) {
SSqlStream *tmp = pStream->next;
// set associate variant to NULL
cqConnKilledNotify(pStream->cqhandle, pObj);
// taos_close_stream function call pObj->mutet lock , careful death-lock
taos_close_stream(pStream);
pStream = tmp;
}
tscDebug("connection:%p is killed", pObj);
taos_close(pObj);
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
FIND_PATH(HEADER_GTEST_INCLUDE_DIR gtest.h /usr/include/gtest /usr/local/include/gtest)
FIND_LIBRARY(LIB_GTEST_STATIC_DIR libgtest.a /usr/lib/ /usr/local/lib /usr/lib64)
FIND_LIBRARY(LIB_GTEST_SHARED_DIR libgtest.so /usr/lib/ /usr/local/lib /usr/lib64)
IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR))
MESSAGE(STATUS "gTest library found, build unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
INCLUDE_DIRECTORIES(/usr/include /usr/local/include)
LINK_DIRECTORIES(/usr/lib /usr/local/lib)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(cliTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(cliTest taos tutil common gtest pthread)
ENDIF()
此差异已折叠。
此差异已折叠。
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/tsdb/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(inc)
AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY(query ${SRC})
SET_SOURCE_FILES_PROPERTIES(src/sql.c PROPERTIES COMPILE_FLAGS -w)
TARGET_LINK_LIBRARIES(query tsdb tutil lua)
IF (TD_LINUX)
TARGET_LINK_LIBRARIES(query m rt lua)
ADD_SUBDIRECTORY(tests)
ENDIF ()
IF (TD_DARWIN)
TARGET_LINK_LIBRARIES(query m lua)
ADD_SUBDIRECTORY(tests)
ENDIF ()
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QFILL_H
#define TDENGINE_QFILL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "qExtbuffer.h"
#include "taosdef.h"
struct SSDataBlock;
typedef struct {
STColumn col; // column info
int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list
union {int64_t i; double d;} fillVal;
} SFillColInfo;
typedef struct {
SSchema col;
char* tagVal;
} SFillTagColInfo;
typedef struct SFillInfo {
TSKEY start; // start timestamp
TSKEY end; // endKey for fill
TSKEY currentKey; // current active timestamp, the value may be changed during the fill procedure.
int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC]
int32_t type; // fill type
int32_t numOfRows; // number of rows in the input data block
int32_t index; // active row index
int32_t numOfTotal; // number of filled rows in one round
int32_t numOfCurrent; // number of filled rows in current results
int32_t numOfTags; // number of tags
int32_t numOfCols; // number of columns, including the tags columns
int32_t rowSize; // size of each row
SInterval interval;
char * prevValues; // previous row of data, to generate the interpolation results
char * nextValues; // next row of data
char** pData; // original result data block involved in filling data
int32_t alloc; // data buffer size in rows
int8_t precision; // time resoluation
SFillColInfo* pFillCol; // column info for fill operations
SFillTagColInfo* pTags; // tags value for filling gap
void* handle; // for debug purpose
} SFillInfo;
typedef struct SPoint {
int64_t key;
void * val;
} SPoint;
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol, void* handle);
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp);
void* taosDestroyFillInfo(SFillInfo *pFillInfo);
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
bool taosFillHasMoreResults(SFillInfo* pFillInfo);
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, void** output, int32_t capacity);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QFILL_H
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
# msvcregex
ExternalProject_Add(msvcregex
GIT_REPOSITORY https://gitee.com/l0km/libgnurx-msvc.git
GIT_TAG master
SOURCE_DIR "${TD_CONTRIB_DIR}/msvcregex"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册