提交 fc68ac02 编写于 作者: S slguan

Merge branch '2.0' into refact/sync

# Conflicts:
#	src/dnode/inc/dnodeWrite.h
#	src/dnode/src/dnodeRead.c
#	src/dnode/src/dnodeWrite.c
......@@ -296,5 +296,6 @@ ENDIF ()
ADD_SUBDIRECTORY(deps)
ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(tests)
INCLUDE(CPack)
......@@ -6,6 +6,7 @@ INCLUDE_DIRECTORIES(jni)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
AUX_SOURCE_DIRECTORY(src SRC)
......
......@@ -20,9 +20,9 @@
extern "C" {
#endif
#include "qextbuffer.h"
#include "qinterpolation.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tinterpolation.h"
#include "tlosertree.h"
#include "tsclient.h"
......
......@@ -13,23 +13,33 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCCACHE_H
#define TDENGINE_TSCCACHE_H
#ifndef TDENGINE_TSCJOINPROCESS_H
#define TDENGINE_TSCJOINPROCESS_H
#ifdef __cplusplus
extern "C" {
#endif
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
#include "tscUtil.h"
#include "tsclient.h"
void taosCloseConnCache(void *handle);
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
int32_t tscHandleMasterJoinQuery(SSqlObj* pSql);
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql);
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCACHE_H
#endif // TDENGINE_TSCJOINPROCESS_H
......@@ -24,16 +24,16 @@ extern "C" {
* @date 2018/09/30
*/
#include "os.h"
#include "textbuffer.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "tscSecondaryMerge.h"
#include "tsclient.h"
#include "taosdef.h"
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE))
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo)))
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_CHILD_TABLE))
#define UTIL_TABLE_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
#define UTIL_TABLE_IS_NOMRAL_TABLE(metaInfo) (!(UTIL_TABLE_IS_SUPERTABLE(metaInfo)))
#define UTIL_TABLE_CREATE_FROM_STABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
......@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter {
} SJoinSubquerySupporter;
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
STableMeta* pMeterMeta, STableDataBlocks** dataBlocks);
STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
......@@ -81,11 +81,11 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pMeterMeta,
int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx);
STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/**
*
......@@ -104,7 +104,7 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscProjectionQueryOnTable(SQueryInfo* pQueryInfo);
bool tscIsTwoStageMergeMetricQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryOnMetric(SSqlCmd* pCmd);
bool tscQueryMetricTags(SQueryInfo* pQueryInfo);
bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
......@@ -114,7 +114,7 @@ void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex
void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
int32_t setMeterID(SMeterMetaInfo* pMeterMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
int32_t setMeterID(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertOrImportData(char* sqlstr);
......@@ -173,7 +173,7 @@ int32_t tscValidateName(SSQLToken* pToken);
void tscIncStreamExecutionCount(void* pStream);
bool tscValidateColumnId(SMeterMetaInfo* pMeterMetaInfo, int32_t colId);
bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId);
// get starter position of metric query condition (query on tags) in SSqlCmd.payload
SCond* tsGetMetricQueryCondPos(STagCond* pCond, uint64_t tableIndex);
......@@ -190,26 +190,26 @@ void tscCleanSqlCmd(SSqlCmd* pCmd);
bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql);
void tscRemoveAllMeterMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache);
SMeterMetaInfo* tscGetMeterMetaInfo(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
SMeterMetaInfo* tscGetMeterMetaInfoFromQueryInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
SQueryInfo *tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t subClauseIndex);
int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQueryInfo** pQueryInfo);
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache);
STableMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache);
SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pMeterMeta, SSuperTableMeta* pMetricMeta,
STableMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pTableMeta, SSuperTableMeta* pMetricMeta,
int16_t numOfTags, int16_t* tags);
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SQueryInfo *pQueryInfo);
STableMetaInfo* tscAddEmptyMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
void tscFreeSubqueryInfo(SSqlCmd* pCmd);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
void tscGetMetricMetaCacheKey(SQueryInfo* pQueryInfo, char* keyStr, uint64_t uid);
int tscGetMetricMeta(SSqlObj* pSql, int32_t clauseIndex);
int tscGetMeterMeta(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, SMeterMetaInfo* pMeterMetaInfo, bool createIfNotExists);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetMeterMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
......@@ -252,7 +252,6 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
#ifdef __cplusplus
}
#endif
......
......@@ -20,13 +20,56 @@
extern "C" {
#endif
#include <stdint.h>
#include "taosmsg.h"
#include "tstoken.h"
#include "tsclient.h"
#define VALIDNUMOFCOLS(x) ((x) >= TSDB_MIN_COLUMNS && (x) <= TSDB_MAX_COLUMNS)
struct SSchema;
//struct SSchema;
/**
* get the number of tags of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfTags(const STableMeta* pTableMeta);
/**
* get the number of columns of this table
* @param pTableMeta
* @return
*/
int32_t tscGetNumOfColumns(const STableMeta* pTableMeta);
/**
* get the basic info of this table
* @param pTableMeta
* @return
*/
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta);
/**
* get the schema
* @param pTableMeta
* @return
*/
SSchema* tscGetTableSchema(const STableMeta* pTableMeta);
/**
* get the tag schema
* @param pMeta
* @return
*/
SSchema *tscGetTableTagSchema(const STableMeta *pMeta);
/**
*
* @param pMeta
* @param startCol
* @return
*/
SSchema *tscGetTableColumnSchema(const STableMeta *pMeta, int32_t startCol);
/**
* check if the schema is valid or not, including following aspects:
......@@ -42,21 +85,23 @@ struct SSchema;
*/
bool isValidSchema(struct SSchema *pSchema, int32_t numOfCols);
struct SSchema *tsGetSchema(STableMeta *pMeta);
struct SSchema *tsGetTagSchema(STableMeta *pMeta);
/**
* get the schema for the "tbname" column. it is a built column
* @return
*/
SSchema tscGetTbnameColumnSchema();
struct SSchema *tsGetColumnSchema(STableMeta *pMeta, int32_t startCol);
struct SSchema tsGetTbnameColumnSchema();
/**
* create the table meta from the msg
* @param pTableMetaMsg
* @param size size of the table meta
* @return
*/
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size);
//todo tags value as well as the table id structure needs refactor
char *tsGetTagsValue(STableMeta *pMeta);
bool tsMeterMetaIdentical(STableMeta *p1, STableMeta *p2);
void extractTableName(char *tableId, char *name);
SSQLToken extractDBName(char *tableId, char *name);
void extractTableNameFromToken(SSQLToken *pToken, SSQLToken* pTable);
#ifdef __cplusplus
......
......@@ -21,17 +21,19 @@ extern "C" {
#endif
#include "os.h"
#include "qsqlparser.h"
#include "qsqltype.h"
#include "qtsbuf.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tscCache.h"
#include "tscSQLParser.h"
#include "taosdef.h"
#include "trpc.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "trpc.h"
#include "qsqltype.h"
#include "tarray.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
......@@ -46,8 +48,28 @@ typedef struct SSqlGroupbyExpr {
int16_t orderType; // order by type: asc/desc
} SSqlGroupbyExpr;
typedef struct SMeterMetaInfo {
STableMeta * pMeterMeta; // metermeta
typedef struct STableComInfo {
uint8_t numOfTags;
uint8_t precision;
int16_t numOfColumns;
int16_t rowSize;
} STableComInfo;
typedef struct STableMeta {
//super table if it is created according to super table, otherwise, tableInfo is used
union { struct STableMeta* pSTable; STableComInfo tableInfo; };
uint8_t tableType;
int8_t numOfVpeers;
int16_t sversion;
SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t vgId; // virtual group id, which current table belongs to
int32_t sid; // the index of one table in a virtual node
uint64_t uid; // unique id of a table
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
} STableMeta;
typedef struct STableMetaInfo {
STableMeta * pTableMeta; // table meta, cached in client side and acquried by name
SSuperTableMeta *pMetricMeta; // metricmeta
/*
......@@ -55,14 +77,14 @@ typedef struct SMeterMetaInfo {
* 2. keep the vnode index for multi-vnode insertion
*/
int32_t vnodeIndex;
char name[TSDB_TABLE_ID_LEN + 1]; // table(super table) name
char name[TSDB_TABLE_ID_LEN]; // (super) table name
int16_t numOfTags; // total required tags in query, including groupby tags
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
} SMeterMetaInfo;
} STableMetaInfo;
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN + 1]; // as aliasName
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
SColIndexEx colInfo;
int64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
......@@ -83,7 +105,6 @@ typedef struct SFieldInfo {
int16_t numOfOutputCols; // number of column in result
int16_t numOfAlloc; // allocated size
TAOS_FIELD *pFields;
// short * pOffset;
/*
* define if this column is belong to the queried result, it may be add by parser to faciliate
......@@ -161,7 +182,7 @@ typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_ID_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgid; // virtual group id
int64_t vgId; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfTables; // number of tables in current submit block
......@@ -174,7 +195,7 @@ typedef struct STableDataBlocks {
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* to avoid it to be removed from cache
*/
STableMeta *pMeterMeta;
STableMeta *pTableMeta;
union {
char *filename;
......@@ -199,7 +220,7 @@ typedef struct SDataBlockList {
typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint16_t type; // query/insert/import type
char intervalTimeUnit;
char slidingTimeUnit;
int64_t etime, stime;
int64_t intervalTime; // aggregation time interval
......@@ -215,7 +236,7 @@ typedef struct SQueryInfo {
SOrderVal order;
int16_t interpoType; // interpolate type
int16_t numOfTables;
SMeterMetaInfo **pMeterInfo;
STableMetaInfo **pTableMetaInfo;
struct STSBuf * tsBuf;
int64_t * defaultVal; // default value for interpolation
char * msg; // pointer to the pCmd->payload to keep error message temporarily
......@@ -238,7 +259,7 @@ typedef struct {
union {
bool existsCheck; // check if the table exists or not
bool inStream; // denote if current sql is executed in stream or not
bool createOnDemand; // if the table is missing, on-the-fly create it. during getmeterMeta
bool autoCreated; // if the table is missing, on-the-fly create it. during getmeterMeta
int8_t dataSourceType; // load data from file or not
};
......@@ -296,7 +317,7 @@ typedef struct {
struct SLocalReducer *pLocalReducer;
} SSqlRes;
typedef struct _tsc_obj {
typedef struct STscObj {
void * signature;
void * pTimer;
char mgmtIp[TSDB_USER_LEN];
......@@ -308,14 +329,14 @@ typedef struct _tsc_obj {
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
struct _sql_obj *pSql;
struct _sql_obj *pHb;
struct _sql_obj *sqlList;
struct _sstream *streamList;
struct SSqlObj *pSql;
struct SSqlObj *pHb;
struct SSqlObj *sqlList;
struct SSqlStream *streamList;
pthread_mutex_t mutex;
} STscObj;
typedef struct _sql_obj {
typedef struct SSqlObj {
void * signature;
STscObj *pTscObj;
void (*fp)();
......@@ -340,11 +361,11 @@ typedef struct _sql_obj {
uint8_t numOfSubs;
char * asyncTblPos;
void * pTableHashList;
struct _sql_obj **pSubs;
struct _sql_obj * prev, *next;
struct SSqlObj **pSubs;
struct SSqlObj * prev, *next;
} SSqlObj;
typedef struct _sstream {
typedef struct SSqlStream {
SSqlObj *pSql;
uint32_t streamId;
char listed;
......@@ -369,7 +390,7 @@ typedef struct _sstream {
void *param;
void (*callback)(void *); // Callback function when stream is stopped from client level
struct _sstream *prev, *next;
struct SSqlStream *prev, *next;
} SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret);
......@@ -377,14 +398,12 @@ int32_t tscInitRpc(const char *user, const char *secret);
// tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs();
void tscInitMsgsFp();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
void tscQueueAsyncRes(SSqlObj *pSql);
......@@ -442,10 +461,8 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
// transfer SSqlInfo to SqlCmd struct
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo);
extern void * pVnodeConn;
extern void * pTscMgmtConn;
......@@ -453,7 +470,6 @@ extern void * tscCacheHandle;
extern int32_t globalCode;
extern int slaveIndex;
extern void * tscTmr;
extern void * tscConnCache;
extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
......
......@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "com_taosdata_jdbc_TSDBJNIConnector.h"
#include "os.h"
#include "taos.h"
#include "tlog.h"
#include "tscJoinProcess.h"
#include "tsclient.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "ttime.h"
int __init = 0;
......
此差异已折叠。
......@@ -22,10 +22,10 @@
#include "tscUtil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "tscSQLParser.h"
#include "tutil.h"
#include "tnote.h"
#include "tsched.h"
#include "tschemautil.h"
static void tscProcessFetchRow(SSchedMsg *pMsg);
static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
......@@ -84,7 +84,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
}
// TODO return the correct error code to client in tscQueueAsyncError
void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) {
void taos_query_a(TAOS *taos, const char *sqlstr, __async_cb_func_t fp, void *param) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
tscError("bug!!! pObj:%p", pObj);
......@@ -397,51 +397,9 @@ void tscQueueAsyncFreeResult(SSqlObj *pSql) {
taosScheduleTask(tscQhandle, &schedMsg);
}
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows) {
SSqlObj *pSql = (SSqlObj *)param;
SSqlCmd *pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
assert(pCmd->dataSourceType != 0 && pSql->signature == pSql);
int32_t index = 0;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, index);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
if (pDataBlocks == NULL || pMeterMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
// restore user defined fp
pSql->fp = pSql->fetchFp;
tscTrace("%p Async insertion completed, destroy data block list", pSql);
// release data block data
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
// all data has been sent to vnode, call user function
(*pSql->fp)(pSql->param, tres, numOfRows);
} else {
do {
code = tscCopyDataBlockToPayload(pSql, pDataBlocks->pData[pMeterMetaInfo->vnodeIndex++]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d",
pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize, code);
}
} while (code != TSDB_CODE_SUCCESS && pMeterMetaInfo->vnodeIndex < pDataBlocks->nSize);
// build submit msg may fail
if (code == TSDB_CODE_SUCCESS) {
tscTrace("%p async insertion, vnodeIdx:%d, total:%d", pSql, pMeterMetaInfo->vnodeIndex - 1, pDataBlocks->nSize);
tscProcessSql(pSql);
}
}
}
int tscSendMsgToServer(SSqlObj *pSql);
void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
SSqlObj *pSql = (SSqlObj *)param;
if (pSql == NULL || pSql->signature != pSql) return;
......@@ -465,10 +423,10 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0);
assert(pTableMetaInfo->pTableMeta == NULL);
tscGetMeterMeta(pSql, pMeterMetaInfo);
tscGetTableMeta(pSql, pTableMetaInfo);
code = tscSendMsgToServer(pSql);
if (code != 0) {
pRes->code = code;
......@@ -490,18 +448,18 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_SUBQUERY) == TSDB_QUERY_TYPE_STABLE_SUBQUERY) {
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
assert(pMeterMetaInfo->pMeterMeta->numOfTags != 0 && pMeterMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert((tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0) && pTableMetaInfo->vnodeIndex >= 0 && pSql->param != NULL);
SRetrieveSupport *trs = (SRetrieveSupport *)pSql->param;
SSqlObj * pParObj = trs->pParentSqlObj;
assert(pParObj->signature == pParObj && trs->subqueryIndex == pMeterMetaInfo->vnodeIndex &&
pMeterMetaInfo->pMeterMeta->numOfTags != 0);
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vnodeIndex &&
tscGetNumOfTags(pTableMetaInfo->pTableMeta) != 0);
tscTrace("%p get metricMeta during super table query successfully", pSql);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
code = tscGetTableMeta(pSql, pTableMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
......@@ -514,11 +472,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->isParseFinish) {
tscTrace("%p resend data to vnode in metermeta callback since sql has been parsed completed", pSql);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_SUCCESS);
if (pMeterMetaInfo->pMeterMeta) {
if (pTableMetaInfo->pTableMeta) {
code = tscSendMsgToServer(pSql);
if (code == TSDB_CODE_SUCCESS) return;
}
......@@ -529,13 +487,13 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
}
} else { // stream computing
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
code = tscGetMeterMeta(pSql, pMeterMetaInfo);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
pRes->code = code;
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
if (code == TSDB_CODE_SUCCESS && UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
code = tscGetMetricMeta(pSql, pCmd->clauseIndex);
pRes->code = code;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tmempool.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
typedef struct _c_hash_t {
uint32_t ip;
uint16_t port;
struct _c_hash_t *prev;
struct _c_hash_t *next;
void * data;
uint64_t time;
} SConnHash;
typedef struct {
SConnHash ** connHashList;
mpool_h connHashMemPool;
int maxSessions;
int total;
int * count;
int64_t keepTimer;
pthread_mutex_t mutex;
void (*cleanFp)(void *);
void *tmrCtrl;
void *pTimer;
} SConnCache;
int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
SConnCache *pObj = (SConnCache *)handle;
int hash = 0;
// size_t user_len = strlen(user);
hash = ip >> 16;
hash += (unsigned short)(ip & 0xFFFF);
hash += port;
while (*user != '\0') {
hash += *user;
user++;
}
hash = hash % pObj->maxSessions;
return hash;
}
void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL) return;
if (time < pObj->keepTimer + pNode->time) return;
SConnHash *pPrev = pNode->prev, *pNext;
while (pNode) {
(*pObj->cleanFp)(pNode->data);
pNext = pNode->next;
pObj->total--;
pObj->count[hash]--;
tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pObj->count[hash]);
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pNode = pNext;
}
if (pPrev)
pPrev->next = NULL;
else
pObj->connHashList[hash] = NULL;
}
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
uint64_t time = taosGetTimestampMs();
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
if (data == NULL) {
tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port);
return NULL;
}
hash = taosHashConn(pObj, ip, port, user);
pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool);
pNode->ip = ip;
pNode->port = port;
pNode->data = data;
pNode->prev = NULL;
pNode->time = time;
pthread_mutex_lock(&pObj->mutex);
pNode->next = pObj->connHashList[hash];
if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode;
pObj->connHashList[hash] = pNode;
pObj->total++;
pObj->count[hash]++;
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
pthread_mutex_unlock(&pObj->mutex);
tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]);
return pObj;
}
void taosCleanConnCache(void *handle, void *tmrId) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
if (pObj->pTimer != tmrId) return;
uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pObj->maxSessions; ++hash) {
pthread_mutex_lock(&pObj->mutex);
pNode = pObj->connHashList[hash];
taosRemoveExpiredNodes(pObj, pNode, hash, time);
pthread_mutex_unlock(&pObj->mutex);
}
// tscTrace("timer, total connections in cache:%d", pObj->total);
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
}
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
void * pData = NULL;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
uint64_t time = taosGetTimestampMs();
hash = taosHashConn(pObj, ip, port, user);
pthread_mutex_lock(&pObj->mutex);
pNode = pObj->connHashList[hash];
while (pNode) {
if (time >= pObj->keepTimer + pNode->time) {
taosRemoveExpiredNodes(pObj, pNode, hash, time);
pNode = NULL;
break;
}
if (pNode->ip == ip && pNode->port == port) break;
pNode = pNode->next;
}
if (pNode) {
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
pObj->connHashList[hash] = pNode->next;
}
if (pNode->next) {
pNode->next->prev = pNode->prev;
}
pData = pNode->data;
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pObj->total--;
pObj->count[hash]--;
}
pthread_mutex_unlock(&pObj->mutex);
if (pData) {
tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]);
}
return pData;
}
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
SConnHash **connHashList;
mpool_h connHashMemPool;
SConnCache *pObj;
connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
if (connHashMemPool == 0) return NULL;
connHashList = calloc(sizeof(SConnHash *), maxSessions);
if (connHashList == 0) {
taosMemPoolCleanUp(connHashMemPool);
return NULL;
}
pObj = malloc(sizeof(SConnCache));
if (pObj == NULL) {
taosMemPoolCleanUp(connHashMemPool);
free(connHashList);
return NULL;
}
memset(pObj, 0, sizeof(SConnCache));
pObj->count = calloc(sizeof(int), maxSessions);
pObj->total = 0;
pObj->keepTimer = keepTimer;
pObj->maxSessions = maxSessions;
pObj->connHashMemPool = connHashMemPool;
pObj->connHashList = connHashList;
pObj->cleanFp = cleanFp;
pObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
pthread_mutex_init(&pObj->mutex, NULL);
return pObj;
}
void taosCloseConnCache(void *handle) {
SConnCache *pObj;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
pthread_mutex_lock(&pObj->mutex);
taosTmrStopA(&(pObj->pTimer));
if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool);
tfree(pObj->connHashList);
tfree(pObj->count)
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_destroy(&pObj->mutex);
memset(pObj, 0, sizeof(SConnCache));
free(pObj);
}
......@@ -14,20 +14,21 @@
*/
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "qhistogram.h"
#include "qinterpolation.h"
#include "qpercentile.h"
#include "qsyntaxtreefunction.h"
#include "qtsbuf.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "thistogram.h"
#include "tinterpolation.h"
#include "tlog.h"
#include "tscJoinProcess.h"
#include "tscSyntaxtreefunction.h"
#include "tscSubquery.h"
#include "tscompression.h"
#include "tsqlfunction.h"
#include "ttime.h"
#include "taosdef.h"
#include "tutil.h"
#include "tpercentile.h"
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
......@@ -387,6 +388,10 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
static bool usePreVal(SQLFunctionCtx *pCtx) {
return pCtx->preAggVals.isSet && pCtx->size == pCtx->preAggVals.size;
}
/*
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
......@@ -394,13 +399,14 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
static void count_function(SQLFunctionCtx *pCtx) {
int32_t numOfElem = 0;
if (IS_DATA_BLOCK_LOADED(pCtx->blockStatus)) {
/*
* In following cases, the data block is loaded:
* 1. A first/last file block for a query
* 2. Required to handle other queries, such as apercentile/twa/stddev etc.
* 3. A cache block
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->preAggVals.isSet == true;
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->preAggVals.isSet == true;
* 3. for primary key column, pCtx->hasNull always be false, pCtx->preAggVals.isSet == false;
*/
if (usePreVal(pCtx)) {
numOfElem = pCtx->size - pCtx->preAggVals.statis.numOfNull;
} else {
if (pCtx->hasNull) {
for (int32_t i = 0; i < pCtx->size; ++i) {
char *val = GET_INPUT_CHAR_INDEX(pCtx, i);
......@@ -413,18 +419,6 @@ static void count_function(SQLFunctionCtx *pCtx) {
} else {
numOfElem = pCtx->size;
}
} else {
/*
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->preAggVals.isSet == true;
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->preAggVals.isSet == true;
* 3. for primary key column, pCtx->hasNull always be false, pCtx->preAggVals.isSet == false;
*/
if (pCtx->preAggVals.isSet) {
numOfElem = pCtx->size - pCtx->preAggVals.numOfNull;
} else {
assert(pCtx->hasNull == false);
numOfElem = pCtx->size;
}
}
if (numOfElem > 0) {
......@@ -468,7 +462,7 @@ static void count_func_merge(SQLFunctionCtx *pCtx) {
* @param filterCols
* @return
*/
int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus) {
int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return BLK_DATA_NO_NEEDED;
} else {
......@@ -476,7 +470,7 @@ int32_t count_load_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32
}
}
int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus) {
int32_t no_data_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
return BLK_DATA_NO_NEEDED;
}
......@@ -531,16 +525,16 @@ static void do_sum(SQLFunctionCtx *pCtx) {
int32_t notNullElems = 0;
// Only the pre-computing information loaded and actual data does not loaded
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus) && pCtx->preAggVals.isSet) {
notNullElems = pCtx->size - pCtx->preAggVals.numOfNull;
assert(pCtx->size >= pCtx->preAggVals.numOfNull);
if (pCtx->preAggVals.isSet && pCtx->preAggVals.size == pCtx->size) {
notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
assert(pCtx->size >= pCtx->preAggVals.statis.numOfNull);
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
int64_t *retVal = (int64_t*) pCtx->aOutputBuf;
*retVal += pCtx->preAggVals.sum;
*retVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
double *retVal = (double*) pCtx->aOutputBuf;
*retVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.sum));
*retVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
}
} else { // computing based on the true data block
void *pData = GET_INPUT_CHAR(pCtx);
......@@ -683,16 +677,16 @@ static void sum_func_second_merge(SQLFunctionCtx *pCtx) {
}
}
static int32_t precal_req_load_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus) {
static int32_t precal_req_load_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
return BLK_DATA_FILEDS_NEEDED;
}
static int32_t data_req_load_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus) {
static int32_t data_req_load_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
return BLK_DATA_ALL_NEEDED;
}
// todo: if column in current data block are null, opt for this case
static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus) {
static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order == TSQL_SO_DESC) {
return BLK_DATA_NO_NEEDED;
}
......@@ -705,7 +699,7 @@ static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
}
}
static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, int32_t blockStatus) {
static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order == TSQL_SO_ASC) {
return BLK_DATA_NO_NEEDED;
}
......@@ -717,8 +711,7 @@ static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
}
}
static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId,
int32_t blockStatus) {
static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order == TSQL_SO_DESC) {
return BLK_DATA_NO_NEEDED;
}
......@@ -734,8 +727,7 @@ static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY
// }
}
static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId,
int32_t blockStatus) {
static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
if (pCtx->order == TSQL_SO_ASC) {
return BLK_DATA_NO_NEEDED;
}
......@@ -764,15 +756,15 @@ static void avg_function(SQLFunctionCtx *pCtx) {
SAvgInfo *pAvgInfo = (SAvgInfo *)pResInfo->interResultBuf;
double * pVal = &pAvgInfo->sum;
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus) && pCtx->preAggVals.isSet) {
if (usePreVal(pCtx)) {
// Pre-aggregation
notNullElems = pCtx->size - pCtx->preAggVals.numOfNull;
notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
assert(notNullElems >= 0);
if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) {
*pVal += pCtx->preAggVals.sum;
*pVal += pCtx->preAggVals.statis.sum;
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
*pVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.sum));
*pVal += GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.sum));
}
} else {
void *pData = GET_INPUT_CHAR(pCtx);
......@@ -927,20 +919,20 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
/////////////////////////////////////////////////////////////////////////////////////////////
static void minMax_function(SQLFunctionCtx *pCtx, char *pOutput, int32_t isMin, int32_t *notNullElems) {
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus) && pCtx->preAggVals.isSet) {
// data in current data block are qualified to the query
*notNullElems = pCtx->size - pCtx->preAggVals.numOfNull;
if (usePreVal(pCtx)) {
*notNullElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
assert(*notNullElems >= 0);
void * tval = NULL;
int16_t index = 0;
if (isMin) {
tval = &pCtx->preAggVals.min;
index = pCtx->preAggVals.minIndex;
tval = &pCtx->preAggVals.statis.min;
index = pCtx->preAggVals.statis.minIndex;
} else {
tval = &pCtx->preAggVals.max;
index = pCtx->preAggVals.maxIndex;
tval = &pCtx->preAggVals.statis.max;
index = pCtx->preAggVals.statis.maxIndex;
}
/**
......@@ -1487,7 +1479,7 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx) {
// todo opt for null block
static void first_function(SQLFunctionCtx *pCtx) {
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus) || pCtx->order == TSQL_SO_DESC) {
if (pCtx->order == TSQL_SO_DESC) {
return;
}
......@@ -1565,7 +1557,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
* 1. data block that are not loaded
* 2. scan data files in desc order
*/
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus) || pCtx->order == TSQL_SO_DESC) {
if (pCtx->order == TSQL_SO_DESC) {
return;
}
......@@ -1658,7 +1650,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) {
* least one data in this block that is not null.(TODO opt for this case)
*/
static void last_function(SQLFunctionCtx *pCtx) {
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus) || pCtx->order == TSQL_SO_ASC) {
if (pCtx->order == TSQL_SO_ASC) {
return;
}
......@@ -1734,7 +1726,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
* 1. for scan data in asc order, no need to check data
* 2. for data blocks that are not loaded, no need to check data
*/
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus) || pCtx->order == TSQL_SO_ASC) {
if (pCtx->order == TSQL_SO_ASC) {
return;
}
......@@ -3304,9 +3296,9 @@ char *arithmetic_callback_function(void *param, char *name, int32_t colId) {
SSqlFunctionExpr *pExpr = pSupport->pExpr;
int32_t colIndexInBuf = -1;
for (int32_t i = 0; i < pExpr->pBinExprInfo.numOfCols; ++i) {
if (colId == pExpr->pBinExprInfo.pReqColumns[i].colId) {
colIndexInBuf = pExpr->pBinExprInfo.pReqColumns[i].colIdxInBuf;
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) {
if (colId == pExpr->binExprInfo.pReqColumns[i].colId) {
colIndexInBuf = pExpr->binExprInfo.pReqColumns[i].colIdxInBuf;
break;
}
}
......@@ -3319,7 +3311,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size;
......@@ -3331,7 +3323,7 @@ static void arithmetic_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SArithmeticSupport *sas = (SArithmeticSupport *)pCtx->param[1].pz;
sas->offset = index;
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order,
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/;
......@@ -3381,9 +3373,8 @@ static void spread_function(SQLFunctionCtx *pCtx) {
int32_t numOfElems = pCtx->size;
// column missing cause the hasNull to be true
if (!IS_DATA_BLOCK_LOADED(pCtx->blockStatus)) {
if (pCtx->preAggVals.isSet) {
numOfElems = pCtx->size - pCtx->preAggVals.numOfNull;
if (usePreVal(pCtx)) {
numOfElems = pCtx->size - pCtx->preAggVals.statis.numOfNull;
// all data are null in current data block, ignore current data block
if (numOfElems == 0) {
......@@ -3392,20 +3383,20 @@ static void spread_function(SQLFunctionCtx *pCtx) {
if ((pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) ||
(pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP)) {
if (pInfo->min > pCtx->preAggVals.min) {
pInfo->min = pCtx->preAggVals.min;
if (pInfo->min > pCtx->preAggVals.statis.min) {
pInfo->min = pCtx->preAggVals.statis.min;
}
if (pInfo->max < pCtx->preAggVals.max) {
pInfo->max = pCtx->preAggVals.max;
if (pInfo->max < pCtx->preAggVals.statis.max) {
pInfo->max = pCtx->preAggVals.statis.max;
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE || pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
if (pInfo->min > GET_DOUBLE_VAL(&(pCtx->preAggVals.min))) {
pInfo->min = GET_DOUBLE_VAL(&(pCtx->preAggVals.min));
if (pInfo->min > GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min))) {
pInfo->min = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.min));
}
if (pInfo->max < GET_DOUBLE_VAL(&(pCtx->preAggVals.max))) {
pInfo->max = GET_DOUBLE_VAL(&(pCtx->preAggVals.max));
if (pInfo->max < GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max))) {
pInfo->max = GET_DOUBLE_VAL(&(pCtx->preAggVals.statis.max));
}
}
} else {
......@@ -3418,9 +3409,6 @@ static void spread_function(SQLFunctionCtx *pCtx) {
}
}
goto _spread_over;
}
void *pData = GET_INPUT_CHAR(pCtx);
numOfElems = 0;
......@@ -3442,7 +3430,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
assert(pCtx->size == numOfElems);
}
_spread_over:
_spread_over:
SET_VAL(pCtx, numOfElems, 1);
if (numOfElems > 0) {
......@@ -4043,8 +4031,6 @@ static void twa_function(SQLFunctionCtx *pCtx) {
void * data = GET_INPUT_CHAR(pCtx);
TSKEY *primaryKey = pCtx->ptsList;
assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus));
int32_t notNullElems = 0;
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
......@@ -4439,8 +4425,6 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) {
static void rate_function(SQLFunctionCtx *pCtx) {
assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus));
int32_t notNullElems = 0;
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
......@@ -4646,8 +4630,6 @@ static void rate_finalizer(SQLFunctionCtx *pCtx) {
static void irate_function(SQLFunctionCtx *pCtx) {
assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus));
int32_t notNullElems = 0;
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
......
......@@ -21,10 +21,10 @@
#include "tsclient.h"
#include "taosdef.h"
#include "textbuffer.h"
#include "qextbuffer.h"
#include "tscSecondaryMerge.h"
#include "tschemautil.h"
#include "tsocket.h"
#include "name.h"
static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, size_t valueLength);
......@@ -77,20 +77,22 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
* length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t)
*/
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
STableMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pMeterMeta;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
if (pMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE || pMeta->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE ||
pMeta->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) {
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
return 0;
}
char * pTagValue = tsGetTagsValue(pMeta);
SSchema *pTagsSchema = tsGetTagSchema(pMeta);
SSchema *pTagsSchema = tscGetTableTagSchema(pMeta);
int32_t len = getToStringLength(pTagValue, pTagsSchema[0].bytes, pTagsSchema[0].type);
pTagValue += pTagsSchema[0].bytes;
for (int32_t i = 1; i < pMeta->numOfTags; ++i) {
int32_t numOfTags = tscGetNumOfTags(pMeta);
for (int32_t i = 1; i < numOfTags; ++i) {
int32_t tLen = getToStringLength(pTagValue, pTagsSchema[i].bytes, pTagsSchema[i].type);
if (len < tLen) {
len = tLen;
......@@ -108,8 +110,8 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
// one column for each row
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
STableMeta * pMeta = pMeterMetaInfo->pMeterMeta;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
/*
* tagValueCnt is to denote the number of tags columns for meter, not metric. and is to show the column data.
......@@ -117,15 +119,15 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
* for metric, the value of tagValueCnt must be 0, but the numOfTags is not 0
*/
int32_t numOfRows = pMeta->numOfColumns;
int32_t totalNumOfRows = numOfRows + pMeta->numOfTags;
int32_t numOfRows = tscGetNumOfColumns(pMeta);
int32_t totalNumOfRows = numOfRows + tscGetNumOfTags(pMeta);
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
numOfRows = pMeta->numOfColumns + pMeta->numOfTags;
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
numOfRows = numOfRows + tscGetNumOfTags(pMeta);
}
tscInitResObjForLocalQuery(pSql, totalNumOfRows, rowLen);
SSchema *pSchema = tsGetSchema(pMeta);
SSchema *pSchema = tscGetTableSchema(pMeta);
for (int32_t i = 0; i < numOfRows; ++i) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0);
......@@ -146,13 +148,13 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
*(int32_t *)(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 2) * totalNumOfRows + pField->bytes * i) = bytes;
pField = tscFieldInfoGetField(pQueryInfo, 3);
if (i >= pMeta->numOfColumns && pMeta->numOfTags != 0) {
if (i >= tscGetNumOfColumns(pMeta) && tscGetNumOfTags(pMeta) != 0) {
strncpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, 3) * totalNumOfRows + pField->bytes * i, "tag",
strlen("tag") + 1);
}
}
if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) {
if (UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo)) {
return 0;
}
......@@ -265,7 +267,7 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
assert(tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0)->pMeterMeta != NULL);
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
const int32_t NUM_OF_DESCRIBE_TABLE_COLUMNS = 4;
const int32_t TYPE_COLUMN_LENGTH = 16;
......@@ -290,15 +292,15 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
SSqlRes * pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSuperTableMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
SSchema * pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta);
SSuperTableMeta *pMetricMeta = pTableMetaInfo->pMetricMeta;
SSchema * pSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
int32_t vOffset[TSDB_MAX_COLUMNS] = {0};
for (int32_t f = 1; f < pMeterMetaInfo->numOfTags; ++f) {
int16_t tagColumnIndex = pMeterMetaInfo->tagColumnIndex[f - 1];
for (int32_t f = 1; f < pTableMetaInfo->numOfTags; ++f) {
int16_t tagColumnIndex = pTableMetaInfo->tagColumnIndex[f - 1];
if (tagColumnIndex == -1) {
vOffset[f] = vOffset[f - 1] + TSDB_TABLE_NAME_LEN;
} else {
......@@ -316,15 +318,16 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]);
for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
STableSidExtInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
STableIdInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
SColIndexEx *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
int16_t offsetId = pColIndex->colIdx;
assert((pColIndex->flag & TSDB_COL_TAG) != 0);
assert(0);
char * val = pSidExt->tags + vOffset[offsetId];
char * val = NULL;//pSidExt->tags + vOffset[offsetId];
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, k) * totalNumOfResults + pField->bytes * rowIdx, val,
......@@ -343,7 +346,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SSuperTableMeta *pMetricMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0)->pMetricMeta;
SSuperTableMeta *pMetricMeta = tscGetMetaInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo);
......@@ -375,8 +378,8 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
STableMeta *pMeterMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0)->pMeterMeta;
if (pMeterMeta == NULL || pMeterMeta->numOfTags == 0 || pMeterMeta->numOfColumns == 0) {
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
if (pTableMeta == NULL || tscGetNumOfTags(pTableMeta) == 0 || tscGetNumOfColumns(pTableMeta) == 0) {
strcpy(pCmd->payload, "invalid table");
pSql->res.code = TSDB_CODE_INVALID_TABLE;
return pSql->res.code;
......@@ -484,7 +487,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosClearDataCache(tscCacheHandle);
taosCacheEmpty(tscCacheHandle);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
......
此差异已折叠。
......@@ -15,7 +15,6 @@
#include "taos.h"
#include "tsclient.h"
#include "tscSQLParser.h"
#include "tscUtil.h"
#include "ttimer.h"
#include "taosmsg.h"
......@@ -408,8 +407,8 @@ static int insertStmtReset(STscStmt* pStmt) {
}
pCmd->batchSize = 0;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
pMeterMetaInfo->vnodeIndex = 0;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
pTableMetaInfo->vnodeIndex = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -422,7 +421,7 @@ static int insertStmtExecute(STscStmt* stmt) {
++pCmd->batchSize;
}
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
assert(pCmd->numOfClause == 1);
if (pCmd->pDataBlocks->nSize > 0) {
......@@ -439,7 +438,7 @@ static int insertStmtExecute(STscStmt* stmt) {
}
// set the next sent data vnode index in data block arraylist
pMeterMetaInfo->vnodeIndex = 1;
pTableMetaInfo->vnodeIndex = 1;
} else {
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
}
......
此差异已折叠。
......@@ -19,6 +19,68 @@
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "tsclient.h"
int32_t tscGetNumOfTags(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
if (pTableMeta->tableType == TSDB_NORMAL_TABLE) {
assert(tinfo.numOfTags == 0);
return 0;
}
if (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert(tinfo.numOfTags > 0);
return tinfo.numOfTags;
}
assert(tinfo.numOfTags == 0);
return 0;
}
int32_t tscGetNumOfColumns(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
// table created according to super table, use data from super table
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
return tinfo.numOfColumns;
}
SSchema *tscGetTableSchema(const STableMeta *pTableMeta) {
assert(pTableMeta != NULL);
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
STableMeta* pSTableMeta = pTableMeta->pSTable;
assert (pSTableMeta != NULL);
return pSTableMeta->schema;
}
return pTableMeta->schema;
}
SSchema* tscGetTableTagSchema(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL && (pTableMeta->tableType == TSDB_SUPER_TABLE || pTableMeta->tableType == TSDB_CHILD_TABLE));
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
assert(tinfo.numOfTags > 0);
return tscGetTableColumnSchema(pTableMeta, tinfo.numOfColumns);
}
STableComInfo tscGetTableInfo(const STableMeta* pTableMeta) {
assert(pTableMeta != NULL);
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert (pTableMeta->pSTable != NULL);
return pTableMeta->pSTable->tableInfo;
}
return pTableMeta->tableInfo;
}
bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
if (!VALIDNUMOFCOLS(numOfCols)) {
......@@ -64,34 +126,66 @@ bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
return (rowLen <= TSDB_MAX_BYTES_PER_ROW);
}
struct SSchema* tsGetSchema(STableMeta* pMeta) {
if (pMeta == NULL) {
return NULL;
}
return tsGetColumnSchema(pMeta, 0);
}
SSchema* tscGetTableColumnSchema(const STableMeta* pTableMeta, int32_t startCol) {
assert(pTableMeta != NULL);
SSchema* pSchema = pTableMeta->schema;
struct SSchema* tsGetTagSchema(STableMeta* pMeta) {
if (pMeta == NULL || pMeta->numOfTags == 0) {
return NULL;
if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
assert (pTableMeta->pSTable != NULL);
pSchema = pTableMeta->pSTable->schema;
}
return tsGetColumnSchema(pMeta, pMeta->numOfColumns);
return &pSchema[startCol];
}
struct SSchema* tsGetColumnSchema(STableMeta* pMeta, int32_t startCol) {
return (SSchema*)(((char*)pMeta + sizeof(STableMeta)) + startCol * sizeof(SSchema));
}
struct SSchema tscGetTbnameColumnSchema() {
struct SSchema s = {
.colId = TSDB_TBNAME_COLUMN_INDEX,
.type = TSDB_DATA_TYPE_BINARY,
.bytes = TSDB_TABLE_NAME_LEN
};
struct SSchema tsGetTbnameColumnSchema() {
struct SSchema s = {.colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN};
strcpy(s.name, TSQL_TBNAME_L);
return s;
}
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
assert(pTableMetaMsg != NULL);
int32_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema);
STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize);
pTableMeta->tableType = pTableMetaMsg->tableType;
pTableMeta->tableInfo = (STableComInfo) {
.numOfTags = pTableMetaMsg->numOfTags,
.numOfColumns = pTableMetaMsg->numOfColumns,
.precision = pTableMetaMsg->precision
};
pTableMeta->sid = pTableMetaMsg->sid;
pTableMeta->uid = pTableMetaMsg->uid;
pTableMeta->vgId = pTableMetaMsg->vgId;
pTableMeta->numOfVpeers = pTableMetaMsg->numOfVpeers;
memcpy(pTableMeta->vpeerDesc, pTableMetaMsg->vpeerDesc, sizeof(SVnodeDesc) * pTableMeta->numOfVpeers);
memcpy(pTableMeta->schema, pTableMetaMsg->schema, schemaSize);
int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags;
for(int32_t i = 0; i < numOfTotalCols; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
}
if (size != NULL) {
*size = sizeof(STableMeta) + schemaSize;
}
return pTableMeta;
}
/**
* the MeterMeta data format in memory is as follows:
* the TableMeta data format in memory is as follows:
*
* +--------------------+
* |STableMeta Body data| sizeof(STableMeta)
......@@ -101,33 +195,15 @@ struct SSchema tsGetTbnameColumnSchema() {
* |Tags data | tag_col_1.bytes + tag_col_2.bytes + ....
* +--------------------+
*
* @param pMeta
* @param pTableMeta
* @return
*/
char* tsGetTagsValue(STableMeta* pMeta) {
int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
uint32_t offset = sizeof(STableMeta) + numOfTotalCols * sizeof(SSchema);
return ((char*)pMeta + offset);
}
char* tsGetTagsValue(STableMeta* pTableMeta) {
int32_t offset = 0;
// int32_t numOfTotalCols = pTableMeta->numOfColumns + pTableMeta->numOfTags;
// uint32_t offset = sizeof(STableMeta) + numOfTotalCols * sizeof(SSchema);
bool tsMeterMetaIdentical(STableMeta* p1, STableMeta* p2) {
if (p1 == NULL || p2 == NULL || p1->uid != p2->uid || p1->sversion != p2->sversion) {
return false;
}
if (p1 == p2) {
return true;
}
size_t size = sizeof(STableMeta) + p1->numOfColumns * sizeof(SSchema);
for (int32_t i = 0; i < p1->numOfTags; ++i) {
SSchema* pColSchema = tsGetColumnSchema(p1, i + p1->numOfColumns);
size += pColSchema->bytes;
}
return memcmp(p1, p2, size) == 0;
return ((char*)pTableMeta + offset);
}
// todo refactor
......@@ -149,24 +225,6 @@ static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
return len;
}
/**
* extract table name from meterid, which the format of userid.dbname.metername
* @param tableId
* @return
*/
void extractTableName(char* tableId, char* name) {
char* r = skipSegments(tableId, TS_PATH_DELIMITER[0], 2);
copy(name, r, TS_PATH_DELIMITER[0]);
}
SSQLToken extractDBName(char* tableId, char* name) {
char* r = skipSegments(tableId, TS_PATH_DELIMITER[0], 1);
size_t len = copy(name, r, TS_PATH_DELIMITER[0]);
SSQLToken token = {.z = name, .n = len, .type = TK_STRING};
return token;
}
/*
* tablePrefix.columnName
* extract table name and save it in pTable, with only column name in pToken
......
......@@ -319,12 +319,13 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pRes->pLocalReducer = pReducer;
pRes->numOfGroups = 0;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
SInterpolationInfo *pInterpoInfo = &pReducer->interpolationInfo;
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
......@@ -602,9 +603,9 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
*pFinalModel = NULL;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pMeterMetaInfo->pMetricMeta->numOfVnodes);
(*pMemBuffer) = (tExtMemBuffer **)malloc(POINTER_BYTES * pTableMetaInfo->pMetricMeta->numOfVnodes);
if (*pMemBuffer == NULL) {
tscError("%p failed to allocate memory", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -635,7 +636,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
pModel = createColumnModel(pSchema, pQueryInfo->exprsInfo.numOfExprs, capacity);
for (int32_t i = 0; i < pMeterMetaInfo->pMetricMeta->numOfVnodes; ++i) {
for (int32_t i = 0; i < pTableMetaInfo->pMetricMeta->numOfVnodes; ++i) {
(*pMemBuffer)[i] = createExtMemBuffer(nBufferSizes, rlen, pModel);
(*pMemBuffer)[i]->flushModel = MULTIPLE_APPEND_MODEL;
}
......@@ -650,7 +651,7 @@ int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOr
for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) {
SSqlExpr *pExpr = tscSqlExprGet(pQueryInfo, i);
SSchema *p1 = tsGetColumnSchema(pMeterMetaInfo->pMeterMeta, pExpr->colInfo.colIdx);
SSchema *p1 = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, pExpr->colInfo.colIdx);
int16_t inter = 0;
int16_t type = -1;
......@@ -774,12 +775,14 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo,
SInterpolationInfo *pInterpoInfo) {
// discard following dataset in the same group and reset the interpolation information
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int16_t prec = pMeterMetaInfo->pMeterMeta->precision;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int16_t prec = tinfo.precision;
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t revisedSTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, prec);
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
taosInitInterpoInfo(pInterpoInfo, pQueryInfo->order.order, revisedSTime, pQueryInfo->groupbyExpr.numOfGroupCols,
pLocalReducer->rowSize);
......@@ -917,13 +920,15 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
functions[i] = tscSqlExprGet(pQueryInfo, i)->functionId;
}
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int8_t precision = pMeterMetaInfo->pMeterMeta->precision;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t precision = tinfo.precision;
while (1) {
int32_t remains = taosNumOfRemainPoints(pInterpoInfo);
TSKEY etime = taosGetRevisedEndKey(actualETime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision);
pQueryInfo->slidingTimeUnit, precision);
int32_t nrows = taosGetNumOfResultWithInterpo(pInterpoInfo, pPrimaryKeys, remains, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
......@@ -1268,14 +1273,16 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
pQueryInfo->limit.offset = pLocalReducer->offset;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
int16_t precision = pMeterMetaInfo->pMeterMeta->precision;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t precision = tinfo.precision;
// for group result interpolation, do not return if not data is generated
if (pQueryInfo->interpoType != TSDB_INTERPO_NONE) {
int64_t stime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->stime : pQueryInfo->etime;
int64_t newTime =
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, precision);
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
taosInitInterpoInfo(&pLocalReducer->interpolationInfo, pQueryInfo->order.order, newTime,
pQueryInfo->groupbyExpr.numOfGroupCols, pLocalReducer->rowSize);
......@@ -1294,8 +1301,10 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
SLocalReducer * pLocalReducer = pRes->pLocalReducer;
SInterpolationInfo *pInterpoInfo = &pLocalReducer->interpolationInfo;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int8_t p = pMeterMetaInfo->pMeterMeta->precision;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t p = tinfo.precision;
if (taosHasRemainsDataForInterpolation(pInterpoInfo)) {
assert(pQueryInfo->interpoType != TSDB_INTERPO_NONE);
......@@ -1305,7 +1314,7 @@ static bool doInterpolationForCurrentGroup(SSqlObj *pSql) {
int32_t remain = taosNumOfRemainPoints(pInterpoInfo);
TSKEY ekey =
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->intervalTimeUnit, p);
taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, p);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, (TSKEY *)pLocalReducer->pBufForInterpo, remain,
pQueryInfo->intervalTime, ekey, pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
......@@ -1328,8 +1337,10 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
bool prevGroupCompleted = (!pLocalReducer->discard) && pLocalReducer->hasUnprocessedRow;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
int8_t precision = pMeterMetaInfo->pMeterMeta->precision;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int8_t precision = tinfo.precision;
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
......@@ -1338,7 +1349,7 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
int64_t etime = (pQueryInfo->stime < pQueryInfo->etime) ? pQueryInfo->etime : pQueryInfo->stime;
etime = taosGetRevisedEndKey(etime, pQueryInfo->order.order, pQueryInfo->intervalTime,
pQueryInfo->intervalTimeUnit, precision);
pQueryInfo->slidingTimeUnit, precision);
int32_t rows = taosGetNumOfResultWithInterpo(pInterpoInfo, NULL, 0, pQueryInfo->intervalTime, etime,
pLocalReducer->resColModel->capacity);
if (rows > 0) { // do interpo
......
此差异已折叠。
......@@ -13,52 +13,58 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tast.h>
#include "hash.h"
#include "os.h"
#include "qast.h"
#include "tcache.h"
#include "tlog.h"
#include "tnote.h"
#include "trpc.h"
#include "tscJoinProcess.h"
#include "tscProfile.h"
#include "tscSQLParser.h"
#include "tscSecondaryMerge.h"
#include "tscSubquery.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscompression.h"
#include "tsocket.h"
#include "ttimer.h"
#include "ttokendef.h"
#include "tutil.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
STscObj *pObj;
static bool validImpl(const char* str, size_t maxsize) {
if (str == NULL) {
return false;
}
size_t len = strlen(str);
if (len <= 0 || len > maxsize) {
return false;
}
return true;
}
static bool validUserName(const char* user) {
return validImpl(user, TSDB_USER_LEN);
}
static bool validPassword(const char* passwd) {
return validImpl(passwd, TSDB_PASSWORD_LEN);
}
STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
taos_init();
if (user == NULL) {
globalCode = TSDB_CODE_INVALID_ACCT;
return NULL;
} else {
size_t len = strlen(user);
if (len <= 0 || len > TSDB_USER_LEN) {
if (!validUserName(user)) {
globalCode = TSDB_CODE_INVALID_ACCT;
return NULL;
}
}
if (pass == NULL) {
globalCode = TSDB_CODE_INVALID_PASS;
return NULL;
} else {
size_t len = strlen(pass);
if (len <= 0 || len > TSDB_KEY_LEN) {
if (!validPassword(pass)) {
globalCode = TSDB_CODE_INVALID_PASS;
return NULL;
}
}
if (tscInitRpc(user, pass) != 0) {
globalCode = TSDB_CODE_NETWORK_UNAVAIL;
......@@ -67,7 +73,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
if (ip && ip[0]) {
tscMgmtIpList.inUse = 0;
tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.port = tsMnodeShellPort;
tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(ip);
......@@ -82,20 +88,19 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
}
tscMgmtIpList.port = port ? port : tsMgmtShellPort;
tscMgmtIpList.port = port ? port : tsMnodeShellPort;
pObj = (STscObj *)malloc(sizeof(STscObj));
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
return NULL;
}
memset(pObj, 0, sizeof(STscObj));
pObj->signature = pObj;
strncpy(pObj->user, user, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass);
pObj->mgmtPort = port ? port : tsMgmtShellPort;
pObj->mgmtPort = port ? port : tsMnodeShellPort;
if (db) {
int32_t len = strlen(db);
......@@ -115,18 +120,17 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
pthread_mutex_init(&pObj->mutex, NULL);
SSqlObj *pSql = (SSqlObj *)malloc(sizeof(SSqlObj));
SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj));
if (NULL == pSql) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
free(pObj);
return NULL;
}
memset(pSql, 0, sizeof(SSqlObj));
pSql->pTscObj = pObj;
pSql->signature = pSql;
tsem_init(&pSql->rspSem, 0, 0);
tsem_init(&pSql->emptyRspSem, 0, 1);
pObj->pSql = pSql;
pSql->fp = fp;
pSql->param = param;
......@@ -142,46 +146,71 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
return NULL;
}
pSql->res.code = tscProcessSql(pSql);
if (fp != NULL) {
tscTrace("%p DB async connection is opening", pObj);
// tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
return pObj;
}
}
if (pSql->res.code) {
taos_close(pObj);
return NULL;
}
static void syncConnCallback(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
assert(pObj != NULL && pObj->pSql != NULL);
tscTrace("%p DB connection is opened", pObj);
return pObj;
sem_post(&pObj->pSql->rspSem);
}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
if (ip == NULL || (ip != NULL && (strcmp("127.0.0.1", ip) == 0 || strcasecmp("localhost", ip) == 0))) {
ip = tsMasterIp;
}
tscTrace("try to create a connection to %s", ip);
void *taos = taos_connect_imp(ip, user, pass, db, port, NULL, NULL, NULL);
if (taos != NULL) {
STscObj *pObj = (STscObj *)taos;
STscObj *pObj = taosConnectImpl(ip, user, pass, db, port, NULL, NULL, NULL);
if (pObj != NULL) {
SSqlObj* pSql = pObj->pSql;
assert(pSql != NULL);
pSql->fp = syncConnCallback;
pSql->param = pObj;
tscProcessSql(pSql);
sem_wait(&pSql->rspSem);
if (pSql->res.code != TSDB_CODE_SUCCESS) {
taos_close(pObj);
return NULL;
}
tscTrace("%p DB connection is opening", pObj);
// version compare only requires the first 3 segments of the version string
int code = taosCheckVersion(version, taos_get_server_info(taos), 3);
int code = taosCheckVersion(version, taos_get_server_info(pObj), 3);
if (code != 0) {
pObj->pSql->res.code = code;
taos_close(taos);
pSql->res.code = code;
taos_close(pObj);
return NULL;
} else {
return pObj;
}
}
return taos;
return NULL;
}
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
void *param, void **taos) {
return taos_connect_imp(ip, user, pass, db, port, fp, param, taos);
STscObj* pObj = taosConnectImpl(ip, user, pass, db, port, fp, param, taos);
if (pObj == NULL) {
return NULL;
}
SSqlObj* pSql = pObj->pSql;
pSql->res.code = tscProcessSql(pSql);
tscTrace("%p DB async connection is opening", pObj);
return pObj;
}
void taos_close(TAOS *taos) {
......@@ -206,7 +235,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
pSql->asyncTblPos = NULL;
if (NULL != pSql->pTableHashList) {
taosCleanUpHashTable(pSql->pTableHashList);
taosHashCleanup(pSql->pTableHashList);
pSql->pTableHashList = NULL;
}
......@@ -408,14 +437,14 @@ static char *getArithemicInputSrc(void *param, char *name, int32_t colId) {
SSqlFunctionExpr * pExpr = pSupport->pExpr;
int32_t index = -1;
for (int32_t i = 0; i < pExpr->pBinExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->pBinExprInfo.pReqColumns[i].name) == 0) {
for (int32_t i = 0; i < pExpr->binExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->binExprInfo.pReqColumns[i].name) == 0) {
index = i;
break;
}
}
assert(index >= 0 && index < pExpr->pBinExprInfo.numOfCols);
assert(index >= 0 && index < pExpr->binExprInfo.numOfCols);
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
}
......@@ -465,21 +494,21 @@ static void **doSetResultRowData(SSqlObj *pSql) {
sas->offset = 0;
sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i];
sas->numOfCols = sas->pExpr->pBinExprInfo.numOfCols;
sas->numOfCols = sas->pExpr->binExprInfo.numOfCols;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
}
for(int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pExpr->pBinExprInfo.pReqColumns[k].colIdxInBuf;
int32_t columnIndex = sas->pExpr->binExprInfo.pReqColumns[k].colIdxInBuf;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
}
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
tSQLBinaryExprCalcTraverse(sas->pExpr->binExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
free(sas); //todo optimization
......@@ -509,7 +538,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd;
SQueryInfo * pQueryInfo1 = tscGetQueryInfoDetail(pCmd1, pCmd1->clauseIndex);
SMeterMetaInfo *pMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo1, 0);
STableMetaInfo *pMetaInfo = tscGetMetaInfo(pQueryInfo1, 0);
assert(pQueryInfo1->numOfTables == 1);
......@@ -705,8 +734,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
}
// current data are exhausted, fetch more data
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pCmd->command == TSDB_SQL_RETRIEVE)) {
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows &&
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
sem_wait(&pSql->rspSem);
}
......@@ -808,7 +839,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
/*
* case 1. Partial data have been retrieved from vnodes, but not all data has been retrieved yet.
......@@ -823,7 +854,7 @@ void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (pRes->code != TSDB_CODE_QUERY_CANCELLED &&
((pRes->numOfRows > 0 && pCmd->command < TSDB_SQL_LOCAL) ||
(pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows == 0 && pCmd->command == TSDB_SQL_SELECT &&
pSql->pStream == NULL && pMeterMetaInfo->pMeterMeta != NULL))) {
pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL))) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command);
......@@ -965,7 +996,7 @@ void taos_stop_query(TAOS_RES *res) {
pSql->res.code = TSDB_CODE_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
tscKillMetricQuery(pSql);
return;
}
......@@ -1079,7 +1110,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql->asyncTblPos = NULL;
if (NULL != pSql->pTableHashList) {
taosCleanUpHashTable(pSql->pTableHashList);
taosHashCleanup(pSql->pTableHashList);
pSql->pTableHashList = NULL;
}
......@@ -1107,7 +1138,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
SQueryInfo *pQueryInfo = NULL;
tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo);
SMeterMetaInfo *pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo);
STableMetaInfo *pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
return code;
......@@ -1142,7 +1173,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
return code;
}
if ((code = setMeterID(pMeterMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = setMeterID(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1152,7 +1183,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
return code;
}
if (payloadLen + strlen(pMeterMetaInfo->name) + 128 >= pCmd->allocSize) {
if (payloadLen + strlen(pTableMetaInfo->name) + 128 >= pCmd->allocSize) {
char *pNewMem = realloc(pCmd->payload, pCmd->allocSize + tblListLen);
if (pNewMem == NULL) {
code = TSDB_CODE_CLI_OUT_OF_MEMORY;
......@@ -1165,7 +1196,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
pMsg = pCmd->payload;
}
payloadLen += sprintf(pMsg + payloadLen, "%s,", pMeterMetaInfo->name);
payloadLen += sprintf(pMsg + payloadLen, "%s,", pTableMetaInfo->name);
}
*(pMsg + payloadLen) = '\0';
......
此差异已折叠。
......@@ -175,10 +175,10 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
return 0;
}
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0, 0);
int numOfTables = 0;
if (!UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
SSuperTableMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
if (!UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
numOfTables += pVnodeSidList->numOfSids;
......@@ -191,19 +191,19 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
return 0;
}
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
if (UTIL_TABLE_IS_NOMRAL_TABLE(pTableMetaInfo)) {
numOfTables = 1;
int64_t uid = pMeterMetaInfo->pMeterMeta->uid;
int64_t uid = pTableMetaInfo->pTableMeta->uid;
progress[0].uid = uid;
progress[0].key = tscGetSubscriptionProgress(pSub, uid);
} else {
SSuperTableMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
SSuperTableMeta* pMetricMeta = pTableMetaInfo->pMetricMeta;
numOfTables = 0;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
STableSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pMeterInfo->uid;
STableIdInfo *pTableMetaInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pTableMetaInfo->uid;
progress[numOfTables].uid = uid;
progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
}
......@@ -371,7 +371,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->sqlstr = NULL;
taos_free_result_imp(pSql, 0);
pSql->sqlstr = sqlstr;
taosClearDataCache(tscCacheHandle);
taosCacheEmpty(tscCacheHandle);
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscTrace("meter synchronization completed");
} else {
......@@ -385,7 +385,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type;
tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->vnodeIndex = 0;
tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->vnodeIndex = 0;
}
tscDoQuery(pSql);
......
此差异已折叠。
......@@ -34,11 +34,9 @@ void * pTscMgmtConn;
void * pSlaveConn;
void * tscCacheHandle;
int32_t globalCode = 0;
int initialized = 0;
int slaveIndex;
void * tscTmr;
void * tscQhandle;
void * tscConnCache;
void * tscCheckDiskUsageTmr;
int tsInsertHeadSize;
......@@ -151,7 +149,7 @@ void taos_init_imp() {
}
tscMgmtIpList.inUse = 0;
tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.port = tsMnodeShellPort;
tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
......@@ -160,7 +158,7 @@ void taos_init_imp() {
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
}
tscInitMsgs();
tscInitMsgsFp();
slaveIndex = rand();
int queueSize = tsMaxVnodeConnections + tsMaxMeterConnections + tsMaxMgmtConnections + tsMaxMgmtConnections;
......@@ -186,13 +184,9 @@ void taos_init_imp() {
refreshTime = refreshTime > 2 ? 2 : refreshTime;
refreshTime = refreshTime < 1 ? 1 : refreshTime;
if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime);
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
initialized = 1;
tscTrace("client is initialized successfully");
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
}
void taos_init() { pthread_once(&tscinit, taos_init_imp); }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -22,6 +22,8 @@ extern "C" {
int32_t dnodeInitMClient();
void dnodeCleanupMClient();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
uint32_t dnodeGetMnodeMasteIp();
#ifdef __cplusplus
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册