提交 d4e6aef5 编写于 作者: H hzcheng

Merge branch '2.0' into feature/2.0tsdb

...@@ -191,9 +191,10 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic ...@@ -191,9 +191,10 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
``` ```
同时向表tb1_name和tb2_name中按列分别插入多条记录 同时向表tb1_name和tb2_name中按列分别插入多条记录
注意:对同一张表,插入的新记录的时间戳必须递增,否则会跳过插入该条记录。如果时间戳为0,系统将自动使用服务器当前时间作为该记录的时间戳。 注意:1、对同一张表,插入的新记录的时间戳必须递增,否则会跳过插入该条记录。如果时间戳为0,系统将自动使用服务器当前时间作为该记录的时间戳。
2、允许插入的最老记录的时间戳,是相对于当前服务器时间,减去配置的keep值(数据保留的天数),允许插入的最新记录的时间戳,是相对于当前服务器时间,加上配置的days值(数据文件存储数据的时间跨度,单位为天)。keep和days都是可以在创建数据库时指定的,缺省值分别是3650天和10天。
**IMPORT**:如果需要将时间戳小于最后一条记录时间的记录写入到数据库中,可使用IMPORT替代INSERT命令,IMPORT的语法与INSERT完全一样。如果同时IMPORT多条记录,需要保证一批记录是按时间戳排序好的。 **IMPORT**:如果需要将时间戳小于最后一条记录时间的记录写入到数据库中,可使用IMPORT替代INSERT命令,IMPORT的语法与INSERT完全一样。
## 数据查询 ## 数据查询
......
...@@ -181,9 +181,10 @@ All the keywords in a SQL statement are case-insensitive, but strings values are ...@@ -181,9 +181,10 @@ All the keywords in a SQL statement are case-insensitive, but strings values are
tb2_name (tb2_field1_name, ...) VALUES(field1_value1, ...) (field1_value2, ...) tb2_name (tb2_field1_name, ...) VALUES(field1_value1, ...) (field1_value2, ...)
``` ```
Note: For a table, the new record must have a timestamp bigger than the last data record, otherwise, it will be discarded and not inserted. If the timestamp is 0, the time stamp will be set to the system time on the server. Note: 1. For a table, the new record must have a timestamp bigger than the last data record, otherwise, it will be discarded and not inserted. If the timestamp is 0, the time stamp will be set to the system time on the server.
2.The timestamp of the oldest record allowed to be inserted is relative to the current server time, minus the configured keep value (the number of days the data is retained), and the timestamp of the latest record allowed to be inserted is relative to the current server time, plus the configured days value (the time span in which the data file stores data, in days). Both keep and days can be specified when creating the database. The default values are 3650 days and 10 days, respectively.
**IMPORT**: If you do want to insert a historical data record into a table, use IMPORT command instead of INSERT. IMPORT has the same syntax as INSERT. If you want to import a batch of historical records, the records must be ordered by the timestamp, otherwise, TDengine won't handle it in the right way.
**IMPORT**: If you do want to insert a historical data record into a table, use IMPORT command instead of INSERT. IMPORT has the same syntax as INSERT.
## Data Query ## Data Query
......
...@@ -86,15 +86,16 @@ enum _sql_cmd { ...@@ -86,15 +86,16 @@ enum _sql_cmd {
TSDB_SQL_MAX //48 TSDB_SQL_MAX //48
}; };
#define MAX_TOKEN_LEN 30
// token type
enum { enum {
TSQL_NODE_TYPE_EXPR = 0x1, TSQL_NODE_TYPE_EXPR = 0x1,
TSQL_NODE_TYPE_ID = 0x2, TSQL_NODE_TYPE_ID = 0x2,
TSQL_NODE_TYPE_VALUE = 0x4, TSQL_NODE_TYPE_VALUE = 0x4,
}; };
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
extern char tTokenTypeSwitcher[13]; extern char tTokenTypeSwitcher[13];
#define toTSDBType(x) \ #define toTSDBType(x) \
...@@ -112,7 +113,7 @@ typedef struct SLimitVal { ...@@ -112,7 +113,7 @@ typedef struct SLimitVal {
} SLimitVal; } SLimitVal;
typedef struct SOrderVal { typedef struct SOrderVal {
int32_t order; uint32_t order;
int32_t orderColId; int32_t orderColId;
} SOrderVal; } SOrderVal;
......
...@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter { ...@@ -67,7 +67,7 @@ typedef struct SJoinSubquerySupporter {
} SJoinSubquerySupporter; } SJoinSubquerySupporter;
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name,
SMeterMeta* pMeterMeta, STableDataBlocks** dataBlocks); STableMeta* pMeterMeta, STableDataBlocks** dataBlocks);
void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
...@@ -81,11 +81,11 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); ...@@ -81,11 +81,11 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
void tscFreeUnusedDataBlocks(SDataBlockList* pList); void tscFreeUnusedDataBlocks(SDataBlockList* pList);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList);
int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size,
int32_t startOffset, int32_t rowSize, const char* tableId, SMeterMeta* pMeterMeta, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pMeterMeta,
STableDataBlocks** dataBlocks); STableDataBlocks** dataBlocks);
SVnodeSidList* tscGetVnodeSidList(SMetricMeta* pMetricmeta, int32_t vnodeIdx); SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx);
SMeterSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); STableSidExtInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx);
/** /**
* *
...@@ -120,7 +120,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo); ...@@ -120,7 +120,7 @@ void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertOrImportData(char* sqlstr); bool tscIsInsertOrImportData(char* sqlstr);
/* use for keep current db info temporarily, for handle table with db prefix */ /* use for keep current db info temporarily, for handle table with db prefix */
void tscGetDBInfoFromMeterId(char* meterId, char* db); void tscGetDBInfoFromMeterId(char* tableId, char* db);
int tscAllocPayload(SSqlCmd* pCmd, int size); int tscAllocPayload(SSqlCmd* pCmd, int size);
...@@ -128,6 +128,8 @@ void tscFieldInfoSetValFromSchema(SFieldInfo* pFieldInfo, int32_t index, SSchema ...@@ -128,6 +128,8 @@ void tscFieldInfoSetValFromSchema(SFieldInfo* pFieldInfo, int32_t index, SSchema
void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* pField); void tscFieldInfoSetValFromField(SFieldInfo* pFieldInfo, int32_t index, TAOS_FIELD* pField);
void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes); void tscFieldInfoSetValue(SFieldInfo* pFieldInfo, int32_t index, int8_t type, const char* name, int16_t bytes);
void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible); void tscFieldInfoUpdateVisible(SFieldInfo* pFieldInfo, int32_t index, bool visible);
void tscFieldInfoSetExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlExpr* pExpr);
void tscFieldInfoSetBinExpr(SFieldInfo* pFieldInfo, int32_t index, SSqlFunctionExpr* pExpr);
void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo); void tscFieldInfoCalOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffsetForInterResult(SQueryInfo* pQueryInfo);
...@@ -149,9 +151,10 @@ SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t f ...@@ -149,9 +151,10 @@ SSqlExpr* tscSqlExprInsertEmpty(SQueryInfo* pQueryInfo, int32_t index, int16_t f
SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, SSqlExpr* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size); int16_t size);
int32_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); SSqlExpr* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid); void tscSqlExprCopy(SSqlExprInfo* dst, const SSqlExprInfo* src, uint64_t uid, bool deepcopy);
void* tscSqlExprDestroy(SSqlExpr* pExpr); void* tscSqlExprDestroy(SSqlExpr* pExpr);
void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo); void tscSqlExprInfoDestroy(SSqlExprInfo* pExprInfo);
...@@ -196,7 +199,7 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQuer ...@@ -196,7 +199,7 @@ int32_t tscGetQueryInfoDetailSafely(SSqlCmd *pCmd, int32_t subClauseIndex, SQuer
SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index); SMeterMetaInfo* tscGetMeterMetaInfoByUid(SQueryInfo* pQueryInfo, uint64_t uid, int32_t* index);
void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache); void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache);
SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SMeterMeta* pMeterMeta, SMetricMeta* pMetricMeta, SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, STableMeta* pMeterMeta, SSuperTableMeta* pMetricMeta,
int16_t numOfTags, int16_t* tags); int16_t numOfTags, int16_t* tags);
SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SQueryInfo *pQueryInfo); SMeterMetaInfo* tscAddEmptyMeterMetaInfo(SQueryInfo *pQueryInfo);
int32_t tscAddSubqueryInfo(SSqlCmd *pCmd); int32_t tscAddSubqueryInfo(SSqlCmd *pCmd);
......
...@@ -32,9 +32,8 @@ extern "C" { ...@@ -32,9 +32,8 @@ extern "C" {
#include "tutil.h" #include "tutil.h"
#include "trpc.h" #include "trpc.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col, ord) \ #define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
(res->data + tscFieldInfoGetOffset(_queryinfo, col) * res->numOfRows)
// forward declaration // forward declaration
struct SSqlInfo; struct SSqlInfo;
...@@ -47,8 +46,8 @@ typedef struct SSqlGroupbyExpr { ...@@ -47,8 +46,8 @@ typedef struct SSqlGroupbyExpr {
} SSqlGroupbyExpr; } SSqlGroupbyExpr;
typedef struct SMeterMetaInfo { typedef struct SMeterMetaInfo {
SMeterMeta * pMeterMeta; // metermeta STableMeta * pMeterMeta; // metermeta
SMetricMeta *pMetricMeta; // metricmeta SSuperTableMeta *pMetricMeta; // metricmeta
/* /*
* 1. keep the vnode index during the multi-vnode super table projection query * 1. keep the vnode index during the multi-vnode super table projection query
...@@ -71,13 +70,19 @@ typedef struct SSqlExpr { ...@@ -71,13 +70,19 @@ typedef struct SSqlExpr {
int16_t interResBytes; // inter result buffer size int16_t interResBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function int16_t numOfParams; // argument value of each function
tVariant param[3]; // parameters are not more than 3 tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression.
} SSqlExpr; } SSqlExpr;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SFieldInfo { typedef struct SFieldInfo {
int16_t numOfOutputCols; // number of column in result int16_t numOfOutputCols; // number of column in result
int16_t numOfAlloc; // allocated size int16_t numOfAlloc; // allocated size
TAOS_FIELD *pFields; TAOS_FIELD *pFields;
short * pOffset; // short * pOffset;
/* /*
* define if this column is belong to the queried result, it may be add by parser to faciliate * define if this column is belong to the queried result, it may be add by parser to faciliate
...@@ -86,20 +91,17 @@ typedef struct SFieldInfo { ...@@ -86,20 +91,17 @@ typedef struct SFieldInfo {
* NOTE: these hidden columns always locate at the end of the output columns * NOTE: these hidden columns always locate at the end of the output columns
*/ */
bool * pVisibleCols; bool * pVisibleCols;
int32_t numOfHiddenCols; // the number of column not belongs to the queried result columns int32_t numOfHiddenCols; // the number of column not belongs to the queried result columns
SSqlFunctionExpr** pExpr; // used for aggregation arithmetic express,such as count(*)+count(*)
SSqlExpr** pSqlExpr;
} SFieldInfo; } SFieldInfo;
typedef struct SSqlExprInfo { typedef struct SSqlExprInfo {
int16_t numOfAlloc; int16_t numOfAlloc;
int16_t numOfExprs; int16_t numOfExprs;
SSqlExpr *pExprs; SSqlExpr** pExprs;
} SSqlExprInfo; } SSqlExprInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SColumnBase { typedef struct SColumnBase {
SColumnIndex colIndex; SColumnIndex colIndex;
int32_t numOfFilters; int32_t numOfFilters;
...@@ -120,7 +122,7 @@ typedef struct SCond { ...@@ -120,7 +122,7 @@ typedef struct SCond {
} SCond; } SCond;
typedef struct SJoinNode { typedef struct SJoinNode {
char meterId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN];
uint64_t uid; uint64_t uid;
int16_t tagCol; int16_t tagCol;
} SJoinNode; } SJoinNode;
...@@ -155,23 +157,23 @@ typedef struct SParamInfo { ...@@ -155,23 +157,23 @@ typedef struct SParamInfo {
} SParamInfo; } SParamInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
char meterId[TSDB_TABLE_ID_LEN]; char tableId[TSDB_TABLE_ID_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not bool ordered; // if current rows are ordered or not
int64_t vgid; // virtual group id int64_t vgid; // virtual group id
int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending int64_t prevTS; // previous timestamp, recorded to decide if the records array is ts ascending
int32_t numOfMeters; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
int32_t rowSize; // row size for current table int32_t rowSize; // row size for current table
uint32_t nAllocSize; uint32_t nAllocSize;
uint32_t headerSize; // header for metadata (submit metadata) uint32_t headerSize; // header for metadata (submit metadata)
uint32_t size; uint32_t size;
/* /*
* the metermeta for current table, the metermeta will be used during submit stage, keep a ref * the metermeta for current table, the metermeta will be used during submit stage, keep a ref
* to avoid it to be removed from cache * to avoid it to be removed from cache
*/ */
SMeterMeta *pMeterMeta; STableMeta *pMeterMeta;
union { union {
char *filename; char *filename;
...@@ -199,9 +201,9 @@ typedef struct SQueryInfo { ...@@ -199,9 +201,9 @@ typedef struct SQueryInfo {
char intervalTimeUnit; char intervalTimeUnit;
int64_t etime, stime; int64_t etime, stime;
int64_t nAggTimeInterval; // aggregation time interval int64_t intervalTime; // aggregation time interval
int64_t nSlidingTime; // sliding window in mseconds int64_t slidingTime; // sliding window in mseconds
SSqlGroupbyExpr groupbyExpr; // group by tags info SSqlGroupbyExpr groupbyExpr; // group by tags info
SColumnBaseInfo colList; SColumnBaseInfo colList;
SFieldInfo fieldsInfo; SFieldInfo fieldsInfo;
...@@ -217,9 +219,9 @@ typedef struct SQueryInfo { ...@@ -217,9 +219,9 @@ typedef struct SQueryInfo {
int64_t * defaultVal; // default value for interpolation int64_t * defaultVal; // default value for interpolation
char * msg; // pointer to the pCmd->payload to keep error message temporarily char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause int64_t clauseLimit; // limit for current sub clause
// offset value in the original sql expression, NOT sent to virtual node, only applied at client side // offset value in the original sql expression, NOT sent to virtual node, only applied at client side
int64_t prjOffset; int64_t prjOffset;
} SQueryInfo; } SQueryInfo;
// data source from sql string or from file // data source from sql string or from file
...@@ -270,29 +272,27 @@ typedef struct SResRec { ...@@ -270,29 +272,27 @@ typedef struct SResRec {
struct STSBuf; struct STSBuf;
typedef struct { typedef struct {
uint8_t code; int32_t code;
int64_t numOfRows; // num of results in current retrieved int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause int64_t numOfTotalInCurrentClause; // num of total result in current subclause
char * pRsp;
char * pRsp; int rspType;
int rspType; int rspLen;
int rspLen; uint64_t qhandle;
uint64_t qhandle; int64_t uid;
int64_t uid; int64_t useconds;
int64_t useconds; int64_t offset; // offset value from vnode during projection query of stable
int64_t offset; // offset value from vnode during projection query of stable int row;
int row; int16_t numOfCols;
int16_t numOfnchar; int16_t precision;
int16_t precision; int32_t numOfGroups;
int32_t numOfGroups; SResRec * pGroupRec;
SResRec * pGroupRec; char * data;
char * data; void ** tsrow;
short * bytes; char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
void ** tsrow; SColumnIndex *pColumnIndex;
char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t)
struct SLocalReducer *pLocalReducer; struct SLocalReducer *pLocalReducer;
SColumnIndex * pColumnIndex;
} SSqlRes; } SSqlRes;
typedef struct _tsc_obj { typedef struct _tsc_obj {
...@@ -324,14 +324,12 @@ typedef struct _sql_obj { ...@@ -324,14 +324,12 @@ typedef struct _sql_obj {
short vnode; short vnode;
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
void * thandle;
SRpcIpSet ipSet;
void * pStream; void * pStream;
void * pSubscription; void * pSubscription;
char * sqlstr; char * sqlstr;
char retry; char retry;
char maxRetry; char maxRetry;
uint8_t index; SRpcIpSet *ipList;
char freed : 4; char freed : 4;
char listed : 4; char listed : 4;
tsem_t rspSem; tsem_t rspSem;
...@@ -373,18 +371,20 @@ typedef struct _sstream { ...@@ -373,18 +371,20 @@ typedef struct _sstream {
struct _sstream *prev, *next; struct _sstream *prev, *next;
} SSqlStream; } SSqlStream;
int32_t tscInitRpc(const char *user, const char *secret);
// tscSql API // tscSql API
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion); int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
void tscInitMsgs(); void tscInitMsgs();
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle); void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int tscProcessSql(SSqlObj *pSql); int tscProcessSql(SSqlObj *pSql);
void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows); void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);
int tscRenewMeterMeta(SSqlObj *pSql, char *meterId); int tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
void tscQueueAsyncRes(SSqlObj *pSql); void tscQueueAsyncRes(SSqlObj *pSql);
void tscQueueAsyncError(void(*fp), void *param); void tscQueueAsyncError(void(*fp), void *param);
...@@ -400,19 +400,17 @@ int taos_retrieve(TAOS_RES *res); ...@@ -400,19 +400,17 @@ int taos_retrieve(TAOS_RES *res);
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo); int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo); void tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
void tscClearSqlMetaInfoForce(SSqlCmd *pCmd);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscDestroyResPointerInfo(SSqlRes *pRes); void tscDestroyResPointerInfo(SSqlRes *pRes);
void tscFreeSqlCmdData(SSqlCmd *pCmd); void tscFreeSqlCmdData(SSqlCmd *pCmd);
void tscFreeResData(SSqlObj* pSql); void tscFreeResData(SSqlObj *pSql);
/** /**
* free query result of the sql object * free query result of the sql object
* @param pObj * @param pObj
*/ */
void tscFreeSqlResult(SSqlObj* pSql); void tscFreeSqlResult(SSqlObj *pSql);
/** /**
* only free part of resources allocated during query. * only free part of resources allocated during query.
......
...@@ -139,7 +139,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, ...@@ -139,7 +139,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
} else { } else {
pNode->colId = -1; pNode->colId = -1;
pNode->pSchema->type = TSDB_DATA_TYPE_BINARY; pNode->pSchema->type = TSDB_DATA_TYPE_BINARY;
pNode->pSchema->bytes = TSDB_METER_NAME_LEN; pNode->pSchema->bytes = TSDB_TABLE_NAME_LEN;
strcpy(pNode->pSchema->name, TSQL_TBNAME_L); strcpy(pNode->pSchema->name, TSQL_TBNAME_L);
pNode->pSchema->colId = -1; pNode->pSchema->colId = -1;
} }
...@@ -158,7 +158,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, ...@@ -158,7 +158,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols,
return pNode; return pNode;
} }
static uint8_t getBinaryExprOptr(SSQLToken *pToken) { uint8_t getBinaryExprOptr(SSQLToken *pToken) {
switch (pToken->type) { switch (pToken->type) {
case TK_LT: case TK_LT:
return TSDB_RELATION_LESS; return TSDB_RELATION_LESS;
...@@ -183,6 +183,7 @@ static uint8_t getBinaryExprOptr(SSQLToken *pToken) { ...@@ -183,6 +183,7 @@ static uint8_t getBinaryExprOptr(SSQLToken *pToken) {
case TK_STAR: case TK_STAR:
return TSDB_BINARY_OP_MULTIPLY; return TSDB_BINARY_OP_MULTIPLY;
case TK_SLASH: case TK_SLASH:
case TK_DIVIDE:
return TSDB_BINARY_OP_DIVIDE; return TSDB_BINARY_OP_DIVIDE;
case TK_REM: case TK_REM:
return TSDB_BINARY_OP_REMAINDER; return TSDB_BINARY_OP_REMAINDER;
......
...@@ -284,8 +284,15 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) { ...@@ -284,8 +284,15 @@ void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) {
return; return;
} }
for (int i = 0; i < pCmd->numOfCols; ++i) for (int i = 0; i < pCmd->numOfCols; ++i){
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
if (pExpr != NULL) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
} else {
//todo add
}
}
pRes->row++; pRes->row++;
(*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow); (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow);
...@@ -299,7 +306,12 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { ...@@ -299,7 +306,12 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
for (int i = 0; i < pCmd->numOfCols; ++i) { for (int i = 0; i < pCmd->numOfCols; ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
if (pExpr != NULL) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
} else {
//todo add
}
} }
pRes->row++; pRes->row++;
...@@ -318,13 +330,6 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) { ...@@ -318,13 +330,6 @@ void tscProcessAsyncRes(SSchedMsg *pMsg) {
int cmd = pCmd->command; int cmd = pCmd->command;
int code = pRes->code ? -pRes->code : pRes->numOfRows; int code = pRes->code ? -pRes->code : pRes->numOfRows;
if ((tscKeepConn[cmd] == 0 || (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS)) &&
pSql->pStream == NULL) {
if (pSql->thandle) taosAddConnIntoCache(tscConnCache, pSql->thandle, pSql->ip, pSql->vnode, pTscObj->user);
pSql->thandle = NULL;
}
// in case of async insert, restore the user specified callback function // in case of async insert, restore the user specified callback function
bool shouldFree = tscShouldFreeAsyncSqlObj(pSql); bool shouldFree = tscShouldFreeAsyncSqlObj(pSql);
...@@ -451,11 +456,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -451,11 +456,11 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
if (code != 0) { if (code != 0) {
code = abs(code); code = abs(code);
pRes->code = code; pRes->code = code;
tscTrace("%p failed to renew meterMeta", pSql); tscTrace("%p failed to renew tableMeta", pSql);
tsem_post(&pSql->rspSem); tsem_post(&pSql->rspSem);
} else { } else {
tscTrace("%p renew meterMeta successfully, command:%d, code:%d, thandle:%p, retry:%d", tscTrace("%p renew tableMeta successfully, command:%d, code:%d, retry:%d",
pSql, pSql->cmd.command, pSql->res.code, pSql->thandle, pSql->retry); pSql, pSql->cmd.command, pSql->res.code, pSql->retry);
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL); assert(pMeterMetaInfo->pMeterMeta == NULL);
...@@ -553,7 +558,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) { ...@@ -553,7 +558,7 @@ void tscMeterMetaCallBack(void *param, TAOS_RES *res, int code) {
tscTansformSQLFunctionForSTableQuery(pQueryInfo); tscTansformSQLFunctionForSTableQuery(pQueryInfo);
tscIncStreamExecutionCount(pSql->pStream); tscIncStreamExecutionCount(pSql->pStream);
} else { } else {
tscTrace("%p get meterMeta/metricMeta successfully", pSql); tscTrace("%p get tableMeta/metricMeta successfully", pSql);
} }
tscDoQuery(pSql); tscDoQuery(pSql);
......
...@@ -322,6 +322,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -322,6 +322,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool stableQueryFunctChanged(int32_t funcId) {
return (aAggs[funcId].stableFuncId != funcId);
}
/** /**
* the numOfRes should be kept, since it may be used later * the numOfRes should be kept, since it may be used later
* and allow the ResultInfo to be re initialized * and allow the ResultInfo to be re initialized
...@@ -719,12 +723,15 @@ static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY ...@@ -719,12 +723,15 @@ static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); // result buffer has not been set yet.
if (pInfo->hasResult != DATA_SET_FLAG) { return BLK_DATA_ALL_NEEDED;
return BLK_DATA_ALL_NEEDED; //todo optimize the filter info
} else { // data in current block is not earlier than current result // SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; // if (pInfo->hasResult != DATA_SET_FLAG) {
} // return BLK_DATA_ALL_NEEDED;
// } else { // data in current block is not earlier than current result
// return (pInfo->ts <= start) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
// }
} }
static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId, static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId,
...@@ -733,12 +740,13 @@ static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY ...@@ -733,12 +740,13 @@ static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY
return BLK_DATA_NO_NEEDED; return BLK_DATA_NO_NEEDED;
} }
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); return BLK_DATA_ALL_NEEDED;
if (pInfo->hasResult != DATA_SET_FLAG) { // SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
return BLK_DATA_ALL_NEEDED; // if (pInfo->hasResult != DATA_SET_FLAG) {
} else { // return BLK_DATA_ALL_NEEDED;
return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; // } else {
} // return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
// }
} }
////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////
...@@ -1437,7 +1445,9 @@ static void stddev_next_step(SQLFunctionCtx *pCtx) { ...@@ -1437,7 +1445,9 @@ static void stddev_next_step(SQLFunctionCtx *pCtx) {
*/ */
pStd->stage++; pStd->stage++;
avg_finalizer(pCtx); avg_finalizer(pCtx);
pResInfo->initialized = true; // set it initialized to avoid re-initialization
// save average value into tmpBuf, for second stage scan // save average value into tmpBuf, for second stage scan
SAvgInfo *pAvg = pResInfo->interResultBuf; SAvgInfo *pAvg = pResInfo->interResultBuf;
...@@ -2184,7 +2194,7 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) { ...@@ -2184,7 +2194,7 @@ static STopBotInfo *getTopBotOutputInfo(SQLFunctionCtx *pCtx) {
// only the first_stage_merge is directly written data into final output buffer // only the first_stage_merge is directly written data into final output buffer
if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) { if (pResInfo->superTableQ && pCtx->currentStage != SECONDARY_STAGE_MERGE) {
return (STopBotInfo*) pCtx->aOutputBuf; return (STopBotInfo*) pCtx->aOutputBuf;
} else { // for normal table query and super table at the secondary_stage, result is written to intermediate buffer } else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
return pResInfo->interResultBuf; return pResInfo->interResultBuf;
} }
} }
...@@ -3312,7 +3322,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) { ...@@ -3312,7 +3322,7 @@ static void arithmetic_function(SQLFunctionCtx *pCtx) {
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order, tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, pCtx->size, pCtx->aOutputBuf, sas, pCtx->order,
arithmetic_callback_function); arithmetic_callback_function);
pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; pCtx->aOutputBuf += pCtx->outputBytes * pCtx->size;
pCtx->param[1].pz = NULL; pCtx->param[1].pz = NULL;
} }
...@@ -3573,6 +3583,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { ...@@ -3573,6 +3583,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
} }
GET_RES_INFO(pCtx)->numOfRes = 1; // todo add test case GET_RES_INFO(pCtx)->numOfRes = 1; // todo add test case
doFinalizer(pCtx);
} }
/* /*
......
...@@ -100,7 +100,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor ...@@ -100,7 +100,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
* in case of stable query, limit/offset is not applied here. the limit/offset is applied to the * in case of stable query, limit/offset is not applied here. the limit/offset is applied to the
* final results which is acquired after the secondry merge of in the client. * final results which is acquired after the secondry merge of in the client.
*/ */
if (pLimit->offset == 0 || pQueryInfo->nAggTimeInterval > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) { if (pLimit->offset == 0 || pQueryInfo->intervalTime > 0 || QUERY_IS_STABLE_QUERY(pQueryInfo->type)) {
if (*st > elem1.ts) { if (*st > elem1.ts) {
*st = elem1.ts; *st = elem1.ts;
} }
...@@ -165,7 +165,7 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS ...@@ -165,7 +165,7 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS
pSupporter->subqueryIndex = index; pSupporter->subqueryIndex = index;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
pSupporter->interval = pQueryInfo->nAggTimeInterval; pSupporter->interval = pQueryInfo->intervalTime;
pSupporter->limit = pQueryInfo->limit; pSupporter->limit = pQueryInfo->limit;
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, index); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, index);
...@@ -275,6 +275,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { ...@@ -275,6 +275,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
pSubQueryInfo->tsBuf = NULL; pSubQueryInfo->tsBuf = NULL;
// free result for async object will also free sqlObj // free result for async object will also free sqlObj
assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns
taos_free_result(pPrevSub); taos_free_result(pPrevSub);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL); SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL);
...@@ -293,24 +294,26 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { ...@@ -293,24 +294,26 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
// set the second stage sub query for join process // set the second stage sub query for join process
pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE; pQueryInfo->type |= TSDB_QUERY_TYPE_JOIN_SEC_STAGE;
pQueryInfo->nAggTimeInterval = pSupporter->interval; pQueryInfo->intervalTime = pSupporter->interval;
pQueryInfo->groupbyExpr = pSupporter->groupbyExpr; pQueryInfo->groupbyExpr = pSupporter->groupbyExpr;
tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0); tscColumnBaseInfoCopy(&pQueryInfo->colList, &pSupporter->colList, 0);
tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond);
tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid); tscSqlExprCopy(&pQueryInfo->exprsInfo, &pSupporter->exprsInfo, pSupporter->uid, false);
tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo); tscFieldInfoCopyAll(&pQueryInfo->fieldsInfo, &pSupporter->fieldsInfo);
pSupporter->exprsInfo.numOfExprs = 0;
pSupporter->fieldsInfo.numOfOutputCols = 0;
/* /*
* if the first column of the secondary query is not ts function, add this function. * if the first column of the secondary query is not ts function, add this function.
* Because this column is required to filter with timestamp after intersecting. * Because this column is required to filter with timestamp after intersecting.
*/ */
if (pSupporter->exprsInfo.pExprs[0].functionId != TSDB_FUNC_TS) { if (pSupporter->exprsInfo.pExprs[0]->functionId != TSDB_FUNC_TS) {
tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0);
} }
// todo refactor function name
SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1);
......
...@@ -77,7 +77,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type ...@@ -77,7 +77,7 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
* length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t) * length((uint64_t) 123456789011) > 12, greater than sizsof(uint64_t)
*/ */
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) { static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
SMeterMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pMeterMeta; STableMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pMeterMeta;
if (pMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE || pMeta->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE || if (pMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE || pMeta->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE ||
pMeta->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) { pMeta->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) {
...@@ -109,7 +109,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { ...@@ -109,7 +109,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SMeterMeta * pMeta = pMeterMetaInfo->pMeterMeta; STableMeta * pMeta = pMeterMetaInfo->pMeterMeta;
/* /*
* tagValueCnt is to denote the number of tags columns for meter, not metric. and is to show the column data. * tagValueCnt is to denote the number of tags columns for meter, not metric. and is to show the column data.
...@@ -251,6 +251,13 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols, ...@@ -251,6 +251,13 @@ static int32_t tscBuildMeterSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 3, TSDB_DATA_TYPE_BINARY, "Note", noteColLength); tscFieldInfoSetValue(&pQueryInfo->fieldsInfo, 3, TSDB_DATA_TYPE_BINARY, "Note", noteColLength);
rowLen += noteColLength; rowLen += noteColLength;
//set the sqlexpr part
SColumnIndex index = {0};
pQueryInfo->fieldsInfo.pSqlExpr[0] = tscSqlExprInsert(pQueryInfo, 0, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, TSDB_COL_NAME_LEN, TSDB_COL_NAME_LEN);
pQueryInfo->fieldsInfo.pSqlExpr[1] = tscSqlExprInsert(pQueryInfo, 1, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, typeColLength, typeColLength);
pQueryInfo->fieldsInfo.pSqlExpr[2] = tscSqlExprInsert(pQueryInfo, 2, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t), sizeof(int32_t));
pQueryInfo->fieldsInfo.pSqlExpr[3] = tscSqlExprInsert(pQueryInfo, 3, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, noteColLength, noteColLength);
return rowLen; return rowLen;
} }
...@@ -285,7 +292,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) { ...@@ -285,7 +292,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; SSuperTableMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
SSchema * pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta); SSchema * pSchema = tsGetTagSchema(pMeterMetaInfo->pMeterMeta);
int32_t vOffset[TSDB_MAX_COLUMNS] = {0}; int32_t vOffset[TSDB_MAX_COLUMNS] = {0};
...@@ -293,13 +300,13 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) { ...@@ -293,13 +300,13 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
for (int32_t f = 1; f < pMeterMetaInfo->numOfTags; ++f) { for (int32_t f = 1; f < pMeterMetaInfo->numOfTags; ++f) {
int16_t tagColumnIndex = pMeterMetaInfo->tagColumnIndex[f - 1]; int16_t tagColumnIndex = pMeterMetaInfo->tagColumnIndex[f - 1];
if (tagColumnIndex == -1) { if (tagColumnIndex == -1) {
vOffset[f] = vOffset[f - 1] + TSDB_METER_NAME_LEN; vOffset[f] = vOffset[f - 1] + TSDB_TABLE_NAME_LEN;
} else { } else {
vOffset[f] = vOffset[f - 1] + pSchema[tagColumnIndex].bytes; vOffset[f] = vOffset[f - 1] + pSchema[tagColumnIndex].bytes;
} }
} }
int32_t totalNumOfResults = pMetricMeta->numOfMeters; int32_t totalNumOfResults = pMetricMeta->numOfTables;
int32_t rowLen = tscGetResRowLength(pQueryInfo); int32_t rowLen = tscGetResRowLength(pQueryInfo);
tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen); tscInitResObjForLocalQuery(pSql, totalNumOfResults, rowLen);
...@@ -309,7 +316,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) { ...@@ -309,7 +316,7 @@ static int tscBuildMetricTagProjectionResult(SSqlObj *pSql) {
SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]); SVnodeSidList *pSidList = (SVnodeSidList *)((char *)pMetricMeta + pMetricMeta->list[i]);
for (int32_t j = 0; j < pSidList->numOfSids; ++j) { for (int32_t j = 0; j < pSidList->numOfSids; ++j) {
SMeterSidExtInfo *pSidExt = tscGetMeterSidInfo(pSidList, j); STableSidExtInfo *pSidExt = tscGetMeterSidInfo(pSidList, j);
for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) { for (int32_t k = 0; k < pQueryInfo->fieldsInfo.numOfOutputCols; ++k) {
SColIndexEx *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo; SColIndexEx *pColIndex = &tscSqlExprGet(pQueryInfo, k)->colInfo;
...@@ -336,7 +343,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) { ...@@ -336,7 +343,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMetricMeta *pMetricMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0)->pMetricMeta; SSuperTableMeta *pMetricMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0)->pMetricMeta;
int32_t totalNumOfResults = 1; // count function only produce one result int32_t totalNumOfResults = 1; // count function only produce one result
int32_t rowLen = tscGetResRowLength(pQueryInfo); int32_t rowLen = tscGetResRowLength(pQueryInfo);
...@@ -351,7 +358,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) { ...@@ -351,7 +358,7 @@ static int tscBuildMetricTagSqlFunctionResult(SSqlObj *pSql) {
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k); TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, k);
memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, i) * totalNumOfResults + pField->bytes * rowIdx, memcpy(pRes->data + tscFieldInfoGetOffset(pQueryInfo, i) * totalNumOfResults + pField->bytes * rowIdx,
&pMetricMeta->numOfMeters, sizeof(pMetricMeta->numOfMeters)); &pMetricMeta->numOfTables, sizeof(pMetricMeta->numOfTables));
} else { } else {
tscError("not support operations"); tscError("not support operations");
continue; continue;
...@@ -368,7 +375,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) { ...@@ -368,7 +375,7 @@ static int tscProcessQueryTags(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMeta *pMeterMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0)->pMeterMeta; STableMeta *pMeterMeta = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0)->pMeterMeta;
if (pMeterMeta == NULL || pMeterMeta->numOfTags == 0 || pMeterMeta->numOfColumns == 0) { if (pMeterMeta == NULL || pMeterMeta->numOfTags == 0 || pMeterMeta->numOfColumns == 0) {
strcpy(pCmd->payload, "invalid table"); strcpy(pCmd->payload, "invalid table");
pSql->res.code = TSDB_CODE_INVALID_TABLE; pSql->res.code = TSDB_CODE_INVALID_TABLE;
...@@ -455,6 +462,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa ...@@ -455,6 +462,8 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
tscInitResObjForLocalQuery(pSql, 1, valueLength); tscInitResObjForLocalQuery(pSql, 1, valueLength);
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0); TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, 0);
pQueryInfo->fieldsInfo.pSqlExpr[0] = pQueryInfo->exprsInfo.pExprs[0];
strncpy(pRes->data, val, pField->bytes); strncpy(pRes->data, val, pField->bytes);
} }
......
...@@ -496,7 +496,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) { ...@@ -496,7 +496,7 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
} }
} }
int tsParseValues(char **str, STableDataBlocks *pDataBlock, SMeterMeta *pMeterMeta, int maxRows, int tsParseValues(char **str, STableDataBlocks *pDataBlock, STableMeta *pMeterMeta, int maxRows,
SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) { SParsedDataColInfo *spd, char *error, int32_t *code, char *tmpTokenBuf) {
int32_t index = 0; int32_t index = 0;
SSQLToken sToken; SSQLToken sToken;
...@@ -601,7 +601,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3 ...@@ -601,7 +601,7 @@ int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const SMeterMeta *pMeterMeta, int32_t numOfRows) { static void tsSetBlockInfo(SShellSubmitBlock *pBlocks, const STableMeta *pMeterMeta, int32_t numOfRows) {
pBlocks->sid = pMeterMeta->sid; pBlocks->sid = pMeterMeta->sid;
pBlocks->uid = pMeterMeta->uid; pBlocks->uid = pMeterMeta->uid;
pBlocks->sversion = pMeterMeta->sversion; pBlocks->sversion = pMeterMeta->sversion;
...@@ -655,7 +655,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char ...@@ -655,7 +655,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
int32_t *totalNum) { int32_t *totalNum) {
SSqlCmd * pCmd = &pSql->cmd; SSqlCmd * pCmd = &pSql->cmd;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; STableMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
STableDataBlocks *dataBuf = NULL; STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, int32_t ret = tscGetDataBlockFromList(pTableHashList, pCmd->pDataBlocks, pMeterMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
...@@ -695,7 +695,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char ...@@ -695,7 +695,7 @@ static int32_t doParseInsertStatement(SSqlObj *pSql, void *pTableHashList, char
tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows); tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
dataBuf->vgid = pMeterMeta->vgid; dataBuf->vgid = pMeterMeta->vgid;
dataBuf->numOfMeters = 1; dataBuf->numOfTables = 1;
/* /*
* the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS, * the value of pRes->numOfRows does not affect the true result of AFFECTED ROWS,
...@@ -1136,7 +1136,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1136,7 +1136,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
wordfree(&full_path); wordfree(&full_path);
STableDataBlocks *pDataBlock = NULL; STableDataBlocks *pDataBlock = NULL;
SMeterMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta; STableMeta* pMeterMeta = pMeterMetaInfo->pMeterMeta;
int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name, int32_t ret = tscCreateDataBlock(PATH_MAX, pMeterMeta->rowSize, sizeof(SShellSubmitBlock), pMeterMetaInfo->name,
pMeterMeta, &pDataBlock); pMeterMeta, &pDataBlock);
...@@ -1148,7 +1148,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { ...@@ -1148,7 +1148,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
strcpy(pDataBlock->filename, fname); strcpy(pDataBlock->filename, fname);
} else if (sToken.type == TK_LP) { } else if (sToken.type == TK_LP) {
/* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */ /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */
SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta; STableMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta;
SSchema * pSchema = tsGetSchema(pMeterMeta); SSchema * pSchema = tsGetSchema(pMeterMeta);
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) { if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
...@@ -1349,7 +1349,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock ...@@ -1349,7 +1349,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
SMeterMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta; STableMeta *pMeterMeta = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0)->pMeterMeta;
SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData); SShellSubmitBlock *pBlocks = (SShellSubmitBlock *)(pTableDataBlocks->pData);
tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows); tsSetBlockInfo(pBlocks, pMeterMeta, numOfRows);
...@@ -1383,7 +1383,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { ...@@ -1383,7 +1383,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) {
int nrows = 0; int nrows = 0;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
SMeterMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta; STableMeta * pMeterMeta = pMeterMetaInfo->pMeterMeta;
assert(pCmd->numOfClause == 1); assert(pCmd->numOfClause == 1);
int32_t rowSize = pMeterMeta->rowSize; int32_t rowSize = pMeterMeta->rowSize;
...@@ -1544,7 +1544,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { ...@@ -1544,7 +1544,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
continue; continue;
} }
strncpy(pMeterMetaInfo->name, pDataBlock->meterId, TSDB_TABLE_ID_LEN); strncpy(pMeterMetaInfo->name, pDataBlock->tableId, TSDB_TABLE_ID_LEN);
memset(pDataBlock->pData, 0, pDataBlock->nAllocSize); memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);
int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo); int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo);
......
...@@ -451,7 +451,6 @@ static int insertStmtExecute(STscStmt* stmt) { ...@@ -451,7 +451,6 @@ static int insertStmtExecute(STscStmt* stmt) {
pRes->numOfTotalInCurrentClause = 0; pRes->numOfTotalInCurrentClause = 0;
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
tscDoQuery(pSql); tscDoQuery(pSql);
......
...@@ -145,7 +145,7 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) { ...@@ -145,7 +145,7 @@ void tscKillQuery(STscObj *pObj, uint32_t killId) {
if (pSql == NULL) return; if (pSql == NULL) return;
tscTrace("%p query is killed, queryId:%d thandle:%p", pSql, killId, pSql->thandle); tscTrace("%p query is killed, queryId:%d", pSql, killId);
taos_stop_query(pSql); taos_stop_query(pSql);
} }
...@@ -209,16 +209,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { ...@@ -209,16 +209,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
} }
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
SQList *pQList = (SQList *)pMsg; SQqueryList *pQList = (SQqueryList *)pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SQDesc *pQdesc = pQList->qdesc; SQueryDesc *pQdesc = pQList->qdesc;
pQList->numOfQueries = 0; pQList->numOfQueries = 0;
// We extract the lock to tscBuildHeartBeatMsg function. // We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */ /* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQList); pMsg += sizeof(SQqueryList);
SSqlObj *pSql = pObj->sqlList; SSqlObj *pSql = pObj->sqlList;
while (pSql) { while (pSql) {
/* /*
...@@ -239,15 +239,15 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { ...@@ -239,15 +239,15 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pQList->numOfQueries++; pQList->numOfQueries++;
pQdesc++; pQdesc++;
pSql = pSql->next; pSql = pSql->next;
pMsg += sizeof(SQDesc); pMsg += sizeof(SQueryDesc);
if (pMsg > pMax) break; if (pMsg > pMax) break;
} }
SSList *pSList = (SSList *)pMsg; SStreamList *pSList = (SStreamList *)pMsg;
SSDesc *pSdesc = pSList->sdesc; SStreamDesc *pSdesc = pSList->sdesc;
pSList->numOfStreams = 0; pSList->numOfStreams = 0;
pMsg += sizeof(SSList); pMsg += sizeof(SStreamList);
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
...@@ -265,7 +265,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { ...@@ -265,7 +265,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pSList->numOfStreams++; pSList->numOfStreams++;
pSdesc++; pSdesc++;
pStream = pStream->next; pStream = pStream->next;
pMsg += sizeof(SSDesc); pMsg += sizeof(SStreamDesc);
if (pMsg > pMax) break; if (pMsg > pMax) break;
} }
......
此差异已折叠。
...@@ -64,14 +64,14 @@ bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) { ...@@ -64,14 +64,14 @@ bool isValidSchema(struct SSchema* pSchema, int32_t numOfCols) {
return (rowLen <= TSDB_MAX_BYTES_PER_ROW); return (rowLen <= TSDB_MAX_BYTES_PER_ROW);
} }
struct SSchema* tsGetSchema(SMeterMeta* pMeta) { struct SSchema* tsGetSchema(STableMeta* pMeta) {
if (pMeta == NULL) { if (pMeta == NULL) {
return NULL; return NULL;
} }
return tsGetColumnSchema(pMeta, 0); return tsGetColumnSchema(pMeta, 0);
} }
struct SSchema* tsGetTagSchema(SMeterMeta* pMeta) { struct SSchema* tsGetTagSchema(STableMeta* pMeta) {
if (pMeta == NULL || pMeta->numOfTags == 0) { if (pMeta == NULL || pMeta->numOfTags == 0) {
return NULL; return NULL;
} }
...@@ -79,12 +79,12 @@ struct SSchema* tsGetTagSchema(SMeterMeta* pMeta) { ...@@ -79,12 +79,12 @@ struct SSchema* tsGetTagSchema(SMeterMeta* pMeta) {
return tsGetColumnSchema(pMeta, pMeta->numOfColumns); return tsGetColumnSchema(pMeta, pMeta->numOfColumns);
} }
struct SSchema* tsGetColumnSchema(SMeterMeta* pMeta, int32_t startCol) { struct SSchema* tsGetColumnSchema(STableMeta* pMeta, int32_t startCol) {
return (SSchema*)(((char*)pMeta + sizeof(SMeterMeta)) + startCol * sizeof(SSchema)); return (SSchema*)(((char*)pMeta + sizeof(STableMeta)) + startCol * sizeof(SSchema));
} }
struct SSchema tsGetTbnameColumnSchema() { struct SSchema tsGetTbnameColumnSchema() {
struct SSchema s = {.colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_METER_NAME_LEN}; struct SSchema s = {.colId = TSDB_TBNAME_COLUMN_INDEX, .type = TSDB_DATA_TYPE_BINARY, .bytes = TSDB_TABLE_NAME_LEN};
strcpy(s.name, TSQL_TBNAME_L); strcpy(s.name, TSQL_TBNAME_L);
return s; return s;
...@@ -94,7 +94,7 @@ struct SSchema tsGetTbnameColumnSchema() { ...@@ -94,7 +94,7 @@ struct SSchema tsGetTbnameColumnSchema() {
* the MeterMeta data format in memory is as follows: * the MeterMeta data format in memory is as follows:
* *
* +--------------------+ * +--------------------+
* |SMeterMeta Body data| sizeof(SMeterMeta) * |STableMeta Body data| sizeof(STableMeta)
* +--------------------+ * +--------------------+
* |Schema data | numOfTotalColumns * sizeof(SSchema) * |Schema data | numOfTotalColumns * sizeof(SSchema)
* +--------------------+ * +--------------------+
...@@ -104,14 +104,14 @@ struct SSchema tsGetTbnameColumnSchema() { ...@@ -104,14 +104,14 @@ struct SSchema tsGetTbnameColumnSchema() {
* @param pMeta * @param pMeta
* @return * @return
*/ */
char* tsGetTagsValue(SMeterMeta* pMeta) { char* tsGetTagsValue(STableMeta* pMeta) {
int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags; int32_t numOfTotalCols = pMeta->numOfColumns + pMeta->numOfTags;
uint32_t offset = sizeof(SMeterMeta) + numOfTotalCols * sizeof(SSchema); uint32_t offset = sizeof(STableMeta) + numOfTotalCols * sizeof(SSchema);
return ((char*)pMeta + offset); return ((char*)pMeta + offset);
} }
bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) { bool tsMeterMetaIdentical(STableMeta* p1, STableMeta* p2) {
if (p1 == NULL || p2 == NULL || p1->uid != p2->uid || p1->sversion != p2->sversion) { if (p1 == NULL || p2 == NULL || p1->uid != p2->uid || p1->sversion != p2->sversion) {
return false; return false;
} }
...@@ -120,7 +120,7 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) { ...@@ -120,7 +120,7 @@ bool tsMeterMetaIdentical(SMeterMeta* p1, SMeterMeta* p2) {
return true; return true;
} }
size_t size = sizeof(SMeterMeta) + p1->numOfColumns * sizeof(SSchema); size_t size = sizeof(STableMeta) + p1->numOfColumns * sizeof(SSchema);
for (int32_t i = 0; i < p1->numOfTags; ++i) { for (int32_t i = 0; i < p1->numOfTags; ++i) {
SSchema* pColSchema = tsGetColumnSchema(p1, i + p1->numOfColumns); SSchema* pColSchema = tsGetColumnSchema(p1, i + p1->numOfColumns);
...@@ -151,16 +151,16 @@ static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) { ...@@ -151,16 +151,16 @@ static FORCE_INLINE size_t copy(char* dst, const char* src, char delimiter) {
/** /**
* extract table name from meterid, which the format of userid.dbname.metername * extract table name from meterid, which the format of userid.dbname.metername
* @param meterId * @param tableId
* @return * @return
*/ */
void extractTableName(char* meterId, char* name) { void extractTableName(char* tableId, char* name) {
char* r = skipSegments(meterId, TS_PATH_DELIMITER[0], 2); char* r = skipSegments(tableId, TS_PATH_DELIMITER[0], 2);
copy(name, r, TS_PATH_DELIMITER[0]); copy(name, r, TS_PATH_DELIMITER[0]);
} }
SSQLToken extractDBName(char* meterId, char* name) { SSQLToken extractDBName(char* tableId, char* name) {
char* r = skipSegments(meterId, TS_PATH_DELIMITER[0], 1); char* r = skipSegments(tableId, TS_PATH_DELIMITER[0], 1);
size_t len = copy(name, r, TS_PATH_DELIMITER[0]); size_t len = copy(name, r, TS_PATH_DELIMITER[0]);
SSQLToken token = {.z = name, .n = len, .type = TK_STRING}; SSQLToken token = {.z = name, .n = len, .type = TK_STRING};
......
此差异已折叠。
此差异已折叠。
...@@ -13,8 +13,9 @@ ...@@ -13,8 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include <tast.h>
#include "hash.h" #include "hash.h"
#include "os.h"
#include "tcache.h" #include "tcache.h"
#include "tlog.h" #include "tlog.h"
#include "tnote.h" #include "tnote.h"
...@@ -34,12 +35,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -34,12 +35,8 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
STscObj *pObj; STscObj *pObj;
taos_init();
if (pTscMgmtConn == NULL || pVnodeConn == NULL) { taos_init();
globalCode = TSDB_CODE_APP_ERROR;
return NULL;
}
if (user == NULL) { if (user == NULL) {
globalCode = TSDB_CODE_INVALID_ACCT; globalCode = TSDB_CODE_INVALID_ACCT;
...@@ -63,15 +60,30 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const ...@@ -63,15 +60,30 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
} }
} }
if (tscInitRpc(user, pass) != 0) {
globalCode = TSDB_CODE_NETWORK_UNAVAIL;
return NULL;
}
if (ip && ip[0]) { if (ip && ip[0]) {
tscMgmtIpList.numOfIps = 3; tscMgmtIpList.inUse = 0;
tscMgmtIpList.ip[0] = inet_addr(ip);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort; tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(ip);
if (tsMasterIp[0] && strcmp(ip, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
}
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 3;
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
}
} }
tscMgmtIpList.port = port ? port : tsMgmtShellPort;
pObj = (STscObj *)malloc(sizeof(STscObj)); pObj = (STscObj *)malloc(sizeof(STscObj));
if (NULL == pObj) { if (NULL == pObj) {
globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY;
...@@ -197,7 +209,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -197,7 +209,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
taosCleanUpHashTable(pSql->pTableHashList); taosCleanUpHashTable(pSql->pTableHashList);
pSql->pTableHashList = NULL; pSql->pTableHashList = NULL;
} }
tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr); tscDump("%p pObj:%p, SQL: %s", pSql, pObj, pSql->sqlstr);
pRes->code = (uint8_t)tsParseSql(pSql, false); pRes->code = (uint8_t)tsParseSql(pSql, false);
...@@ -208,7 +220,6 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { ...@@ -208,7 +220,6 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/ */
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
tscDoQuery(pSql); tscDoQuery(pSql);
...@@ -364,9 +375,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -364,9 +375,7 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) {
// pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i);
// pRes->bytes[i] * (1 - pQueryInfo->order.order) * (pRes->numOfRows - 1);
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order);
} }
*rows = pRes->tsrow; *rows = pRes->tsrow;
...@@ -374,54 +383,115 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) { ...@@ -374,54 +383,115 @@ int taos_fetch_block_impl(TAOS_RES *res, TAOS_ROW *rows) {
return (pQueryInfo->order.order == TSQL_SO_DESC) ? pRes->numOfRows : -pRes->numOfRows; return (pQueryInfo->order.order == TSQL_SO_DESC) ? pRes->numOfRows : -pRes->numOfRows;
} }
static void transferNcharData(SSqlObj *pSql, int32_t columnIndex, TAOS_FIELD *pField) {
SSqlRes *pRes = &pSql->res;
if (isNull(pRes->tsrow[columnIndex], pField->type)) {
pRes->tsrow[columnIndex] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[columnIndex] == NULL) {
pRes->buffer[columnIndex] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/
memset(pRes->buffer[columnIndex], 0, pField->bytes + TSDB_NCHAR_SIZE);
if (taosUcs4ToMbs(pRes->tsrow[columnIndex], pField->bytes, pRes->buffer[columnIndex])) {
pRes->tsrow[columnIndex] = pRes->buffer[columnIndex];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow);
pRes->tsrow[columnIndex] = NULL;
}
}
}
static char *getArithemicInputSrc(void *param, char *name, int32_t colId) {
SArithmeticSupport *pSupport = (SArithmeticSupport *)param;
SSqlFunctionExpr * pExpr = pSupport->pExpr;
int32_t index = -1;
for (int32_t i = 0; i < pExpr->pBinExprInfo.numOfCols; ++i) {
if (strcmp(name, pExpr->pBinExprInfo.pReqColumns[i].name) == 0) {
index = i;
break;
}
}
assert(index >= 0 && index < pExpr->pBinExprInfo.numOfCols);
return pSupport->data[index] + pSupport->offset * pSupport->elemSize[index];
}
static void **doSetResultRowData(SSqlObj *pSql) { static void **doSetResultRowData(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows);
if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker if (pRes->row >= pRes->numOfRows) { // all the results has returned to invoker
tfree(pRes->tsrow); tfree(pRes->tsrow);
return pRes->tsrow; return pRes->tsrow;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
//todo refactor move away
for(int32_t k = 0; k < pQueryInfo->exprsInfo.numOfExprs; ++k) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
if (k > 0) {
SSqlExpr* pPrev = tscSqlExprGet(pQueryInfo, k - 1);
pExpr->offset = pPrev->offset + pPrev->resBytes;
}
}
int32_t num = 0; int32_t num = 0;
for (int i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { for (int i = 0; i < tscNumOfFields(pQueryInfo); ++i) {
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; if (pQueryInfo->fieldsInfo.pSqlExpr[i] != NULL) {
SSqlExpr* pExpr = pQueryInfo->fieldsInfo.pSqlExpr[i];
pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i) + pExpr->resBytes * pRes->row;
} else {
assert(0);
}
// primary key column cannot be null in interval query, no need to check // primary key column cannot be null in interval query, no need to check
if (i == 0 && pQueryInfo->nAggTimeInterval > 0) { if (i == 0 && pQueryInfo->intervalTime > 0) {
continue; continue;
} }
TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i); TAOS_FIELD *pField = tscFieldInfoGetField(pQueryInfo, i);
if (isNull(pRes->tsrow[i], pField->type)) { transferNcharData(pSql, i, pField);
pRes->tsrow[i] = NULL;
} else if (pField->type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (pRes->buffer[num] == NULL) {
pRes->buffer[num] = malloc(pField->bytes + TSDB_NCHAR_SIZE);
}
/* string terminated char for binary data*/ // calculate the result from serveral other columns
memset(pRes->buffer[num], 0, pField->bytes + TSDB_NCHAR_SIZE); if (pQueryInfo->fieldsInfo.pExpr != NULL && pQueryInfo->fieldsInfo.pExpr[i] != NULL) {
SArithmeticSupport *sas = (SArithmeticSupport *)calloc(1, sizeof(SArithmeticSupport));
if (taosUcs4ToMbs(pRes->tsrow[i], pField->bytes, pRes->buffer[num])) { sas->offset = 0;
pRes->tsrow[i] = pRes->buffer[num]; sas->pExpr = pQueryInfo->fieldsInfo.pExpr[i];
} else {
tscError("%p charset:%s to %s. val:%ls convert failed.", pSql, DEFAULT_UNICODE_ENCODEC, tsCharset, pRes->tsrow); sas->numOfCols = sas->pExpr->pBinExprInfo.numOfCols;
pRes->tsrow[i] = NULL;
if (pRes->buffer[i] == NULL) {
pRes->buffer[i] = malloc(tscFieldInfoGetField(pQueryInfo, i)->bytes);
}
for(int32_t k = 0; k < sas->numOfCols; ++k) {
int32_t columnIndex = sas->pExpr->pBinExprInfo.pReqColumns[k].colIdxInBuf;
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, columnIndex);
sas->elemSize[k] = pExpr->resBytes;
sas->data[k] = (pRes->data + pRes->numOfRows* pExpr->offset) + pRes->row*pExpr->resBytes;
} }
tSQLBinaryExprCalcTraverse(sas->pExpr->pBinExprInfo.pBinExpr, 1, pRes->buffer[i], sas, TSQL_SO_ASC, getArithemicInputSrc);
pRes->tsrow[i] = pRes->buffer[i];
num++; free(sas); //todo optimization
} }
} }
assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols); assert(num <= pQueryInfo->fieldsInfo.numOfOutputCols);
pRes->row++; // index increase one-step pRes->row++; // index increase one-step
return pRes->tsrow; return pRes->tsrow;
} }
...@@ -463,7 +533,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { ...@@ -463,7 +533,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) {
if (pSql->pSubs[i] == 0) { if (pSql->pSubs[i] == 0) {
continue; continue;
} }
SSqlRes * pRes1 = &pSql->pSubs[i]->res; SSqlRes * pRes1 = &pSql->pSubs[i]->res;
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0);
...@@ -499,11 +569,10 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) { ...@@ -499,11 +569,10 @@ static void **tscBuildResFromSubqueries(SSqlObj *pSql) {
if (numOfTableHasRes >= 2) { // do merge result if (numOfTableHasRes >= 2) { // do merge result
success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && success = (doSetResultRowData(pSql->pSubs[0]) != NULL) && (doSetResultRowData(pSql->pSubs[1]) != NULL);
(doSetResultRowData(pSql->pSubs[1]) != NULL); // TSKEY key1 = *(TSKEY *)pRes1->tsrow[0];
// TSKEY key1 = *(TSKEY *)pRes1->tsrow[0]; // TSKEY key2 = *(TSKEY *)pRes2->tsrow[0];
// TSKEY key2 = *(TSKEY *)pRes2->tsrow[0]; // printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
// printf("first:%" PRId64 ", second:%" PRId64 "\n", key1, key2);
} else { // only one subquery } else { // only one subquery
SSqlObj *pSub = pSql->pSubs[0]; SSqlObj *pSub = pSql->pSubs[0];
if (pSub == NULL) { if (pSub == NULL) {
...@@ -593,7 +662,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { ...@@ -593,7 +662,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) {
tscProcessSql(pSql); // retrieve data from virtual node tscProcessSql(pSql); // retrieve data from virtual node
//if failed to retrieve data from current virtual node, try next one if exists // if failed to retrieve data from current virtual node, try next one if exists
if (hasMoreVnodesToTry(pSql)) { if (hasMoreVnodesToTry(pSql)) {
tscTryQueryNextVnode(pSql, NULL); tscTryQueryNextVnode(pSql, NULL);
} }
...@@ -635,7 +704,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { ...@@ -635,7 +704,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
// current subclause is completed, try the next subclause // current subclause is completed, try the next subclause
while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, NULL); tscTryQueryNextClause(pSql, NULL);
// if the rows is not NULL, return immediately // if the rows is not NULL, return immediately
rows = taos_fetch_row_impl(res); rows = taos_fetch_row_impl(res);
} }
...@@ -698,7 +767,7 @@ int taos_select_db(TAOS *taos, const char *db) { ...@@ -698,7 +767,7 @@ int taos_select_db(TAOS *taos, const char *db) {
return taos_query(taos, sql); return taos_query(taos, sql);
} }
void taos_free_result_imp(TAOS_RES* res, int keepCmd) { void taos_free_result_imp(TAOS_RES *res, int keepCmd) {
if (res == NULL) return; if (res == NULL) return;
SSqlObj *pSql = (SSqlObj *)res; SSqlObj *pSql = (SSqlObj *)res;
...@@ -713,7 +782,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -713,7 +782,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
/* Query rsp is not received from vnode, so the qhandle is NULL */ /* Query rsp is not received from vnode, so the qhandle is NULL */
tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp); tscTrace("%p qhandle is null, abort free, fp:%p", pSql, pSql->fp);
if (pSql->fp != NULL) { if (pSql->fp != NULL) {
pSql->thandle = NULL;
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
tscTrace("%p Async SqlObj is freed by app", pSql); tscTrace("%p Async SqlObj is freed by app", pSql);
} else if (keepCmd) { } else if (keepCmd) {
...@@ -752,7 +820,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -752,7 +820,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command); tscTrace("%p code:%d, numOfRows:%d, command:%d", pSql, pRes->code, pRes->numOfRows, pCmd->command);
void *fp = pSql->fp; void *fp = pSql->fp;
if (fp != NULL) { if (fp != NULL) {
pSql->freed = 1; pSql->freed = 1;
...@@ -774,7 +842,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -774,7 +842,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
* *
* Then this object will be reused and no free operation is required. * Then this object will be reused and no free operation is required.
*/ */
pSql->thandle = NULL;
if (keepCmd) { if (keepCmd) {
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
tscTrace("%p sql result is freed by app while sql command is kept", pSql); tscTrace("%p sql result is freed by app while sql command is kept", pSql);
...@@ -785,7 +852,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -785,7 +852,6 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
} }
} else { } else {
// if no free resource msg is sent to vnode, we free this object immediately. // if no free resource msg is sent to vnode, we free this object immediately.
pSql->thandle = NULL;
if (pSql->fp) { if (pSql->fp) {
assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL)); assert(pRes->numOfRows == 0 || (pCmd->command > TSDB_SQL_LOCAL));
...@@ -801,9 +867,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) { ...@@ -801,9 +867,7 @@ void taos_free_result_imp(TAOS_RES* res, int keepCmd) {
} }
} }
void taos_free_result(TAOS_RES *res) { void taos_free_result(TAOS_RES *res) { taos_free_result_imp(res, 0); }
taos_free_result_imp(res, 0);
}
int taos_errno(TAOS *taos) { int taos_errno(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
...@@ -819,26 +883,24 @@ int taos_errno(TAOS *taos) { ...@@ -819,26 +883,24 @@ int taos_errno(TAOS *taos) {
return code; return code;
} }
static bool validErrorCode(int32_t code) { static bool validErrorCode(int32_t code) { return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE; }
return code >= TSDB_CODE_SUCCESS && code < TSDB_CODE_MAX_ERROR_CODE;
}
/* /*
* In case of invalid sql error, additional information is attached to explain * In case of invalid sql error, additional information is attached to explain
* why the sql is invalid * why the sql is invalid
*/ */
static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd* pCmd) { static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
if (code != TSDB_CODE_INVALID_SQL) { if (code != TSDB_CODE_INVALID_SQL) {
return false; return false;
} }
size_t len = strlen(pCmd->payload); size_t len = strlen(pCmd->payload);
char* z = NULL; char *z = NULL;
if (len > 0) { if (len > 0) {
z = strstr (pCmd->payload, "invalid SQL"); z = strstr(pCmd->payload, "invalid SQL");
} }
return z != NULL; return z != NULL;
} }
...@@ -846,20 +908,21 @@ char *taos_errstr(TAOS *taos) { ...@@ -846,20 +908,21 @@ char *taos_errstr(TAOS *taos) {
STscObj *pObj = (STscObj *)taos; STscObj *pObj = (STscObj *)taos;
uint8_t code; uint8_t code;
if (pObj == NULL || pObj->signature != pObj) return tstrerror(globalCode); if (pObj == NULL || pObj->signature != pObj)
return (char*)tstrerror(globalCode);
SSqlObj *pSql = pObj->pSql;
SSqlObj* pSql = pObj->pSql;
if (validErrorCode(pSql->res.code)) { if (validErrorCode(pSql->res.code)) {
code = pSql->res.code; code = pSql->res.code;
} else { } else {
code = TSDB_CODE_OTHERS; //unknown error code = TSDB_CODE_OTHERS; // unknown error
} }
if (hasAdditionalErrorInfo(code, &pSql->cmd)) { if (hasAdditionalErrorInfo(code, &pSql->cmd)) {
return pSql->cmd.payload; return pSql->cmd.payload;
} else { } else {
return tstrerror(code); return (char*)tstrerror(code);
} }
} }
...@@ -899,11 +962,6 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -899,11 +962,6 @@ void taos_stop_query(TAOS_RES *res) {
return; return;
} }
if (pSql->thandle == NULL) {
tscTrace("%p no connection, abort cancel", res);
return;
}
//taosStopRpcConn(pSql->thandle); //taosStopRpcConn(pSql->thandle);
tscTrace("%p query is cancelled", res); tscTrace("%p query is cancelled", res);
} }
...@@ -951,14 +1009,14 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) ...@@ -951,14 +1009,14 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
size_t xlen = 0; size_t xlen = 0;
for (xlen = 0; xlen <= fields[i].bytes; xlen++) { for (xlen = 0; xlen <= fields[i].bytes; xlen++) {
char c = ((char*)row[i])[xlen]; char c = ((char *)row[i])[xlen];
if (c == 0) break; if (c == 0) break;
str[len++] = c; str[len++] = c;
} }
str[len] = 0; str[len] = 0;
} break; } break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i])); len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
...@@ -1147,7 +1205,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -1147,7 +1205,6 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed. * to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/ */
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
free(str); free(str);
if (pRes->code != TSDB_CODE_SUCCESS) { if (pRes->code != TSDB_CODE_SUCCESS) {
......
...@@ -246,8 +246,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf ...@@ -246,8 +246,6 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
int32_t retry = tsProjectExecInterval; int32_t retry = tsProjectExecInterval;
tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry); tscError("%p stream:%p, retrieve no data, code:%d, retry in %" PRId64 "ms", pSql, pStream, numOfRows, retry);
tscClearSqlMetaInfoForce(&(pStream->pSql->cmd));
tscSetRetryTimer(pStream, pStream->pSql, retry); tscSetRetryTimer(pStream, pStream->pSql, retry);
return; return;
} }
...@@ -381,41 +379,41 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { ...@@ -381,41 +379,41 @@ static void tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
if (pQueryInfo->nAggTimeInterval < minIntervalTime) { if (pQueryInfo->intervalTime < minIntervalTime) {
tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64 "", pSql, pStream, tscWarn("%p stream:%p, original sample interval:%ld too small, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nAggTimeInterval, minIntervalTime); pQueryInfo->intervalTime, minIntervalTime);
pQueryInfo->nAggTimeInterval = minIntervalTime; pQueryInfo->intervalTime = minIntervalTime;
} }
pStream->interval = pQueryInfo->nAggTimeInterval; // it shall be derived from sql string pStream->interval = pQueryInfo->intervalTime; // it shall be derived from sql string
if (pQueryInfo->nSlidingTime == 0) { if (pQueryInfo->slidingTime == 0) {
pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval; pQueryInfo->slidingTime = pQueryInfo->intervalTime;
} }
int64_t minSlidingTime = int64_t minSlidingTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime; (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinSlidingTime * 1000L : tsMinSlidingTime;
if (pQueryInfo->nSlidingTime == -1) { if (pQueryInfo->slidingTime == -1) {
pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval; pQueryInfo->slidingTime = pQueryInfo->intervalTime;
} else if (pQueryInfo->nSlidingTime < minSlidingTime) { } else if (pQueryInfo->slidingTime < minSlidingTime) {
tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream, tscWarn("%p stream:%p, original sliding value:%" PRId64 " too small, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nSlidingTime, minSlidingTime); pQueryInfo->slidingTime, minSlidingTime);
pQueryInfo->nSlidingTime = minSlidingTime; pQueryInfo->slidingTime = minSlidingTime;
} }
if (pQueryInfo->nSlidingTime > pQueryInfo->nAggTimeInterval) { if (pQueryInfo->slidingTime > pQueryInfo->intervalTime) {
tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64 "", pSql, pStream, tscWarn("%p stream:%p, sliding value:%" PRId64 " can not be larger than interval range, reset to:%" PRId64 "", pSql, pStream,
pQueryInfo->nSlidingTime, pQueryInfo->nAggTimeInterval); pQueryInfo->slidingTime, pQueryInfo->intervalTime);
pQueryInfo->nSlidingTime = pQueryInfo->nAggTimeInterval; pQueryInfo->slidingTime = pQueryInfo->intervalTime;
} }
pStream->slidingTime = pQueryInfo->nSlidingTime; pStream->slidingTime = pQueryInfo->slidingTime;
pQueryInfo->nAggTimeInterval = 0; // clear the interval value to avoid the force time window split by query processor pQueryInfo->intervalTime = 0; // clear the interval value to avoid the force time window split by query processor
pQueryInfo->nSlidingTime = 0; pQueryInfo->slidingTime = 0;
} }
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
......
...@@ -44,7 +44,7 @@ typedef struct SSub { ...@@ -44,7 +44,7 @@ typedef struct SSub {
int interval; int interval;
TAOS_SUBSCRIBE_CALLBACK fp; TAOS_SUBSCRIBE_CALLBACK fp;
void * param; void * param;
int numOfMeters; int numOfTables;
SSubscriptionProgress * progress; SSubscriptionProgress * progress;
} SSub; } SSub;
...@@ -62,7 +62,7 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { ...@@ -62,7 +62,7 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) {
return 0; return 0;
SSub* pSub = (SSub*)sub; SSub* pSub = (SSub*)sub;
for (int s = 0, e = pSub->numOfMeters; s < e;) { for (int s = 0, e = pSub->numOfTables; s < e;) {
int m = (s + e) / 2; int m = (s + e) / 2;
SSubscriptionProgress* p = pSub->progress + m; SSubscriptionProgress* p = pSub->progress + m;
if (p->uid > uid) if (p->uid > uid)
...@@ -81,7 +81,7 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { ...@@ -81,7 +81,7 @@ void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) {
return; return;
SSub* pSub = (SSub*)sub; SSub* pSub = (SSub*)sub;
for (int s = 0, e = pSub->numOfMeters; s < e;) { for (int s = 0, e = pSub->numOfTables; s < e;) {
int m = (s + e) / 2; int m = (s + e) / 2;
SSubscriptionProgress* p = pSub->progress + m; SSubscriptionProgress* p = pSub->progress + m;
if (p->uid > uid) if (p->uid > uid)
...@@ -176,43 +176,43 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { ...@@ -176,43 +176,43 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
} }
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0); SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0, 0);
int numOfMeters = 0; int numOfTables = 0;
if (!UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { if (!UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; SSuperTableMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
numOfMeters += pVnodeSidList->numOfSids; numOfTables += pVnodeSidList->numOfSids;
} }
} }
SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfMeters, sizeof(SSubscriptionProgress)); SSubscriptionProgress* progress = (SSubscriptionProgress*)calloc(numOfTables, sizeof(SSubscriptionProgress));
if (progress == NULL) { if (progress == NULL) {
tscError("failed to allocate memory for progress: %s", pSub->topic); tscError("failed to allocate memory for progress: %s", pSub->topic);
return 0; return 0;
} }
if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) {
numOfMeters = 1; numOfTables = 1;
int64_t uid = pMeterMetaInfo->pMeterMeta->uid; int64_t uid = pMeterMetaInfo->pMeterMeta->uid;
progress[0].uid = uid; progress[0].uid = uid;
progress[0].key = tscGetSubscriptionProgress(pSub, uid); progress[0].key = tscGetSubscriptionProgress(pSub, uid);
} else { } else {
SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; SSuperTableMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta;
numOfMeters = 0; numOfTables = 0;
for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) {
SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i);
for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) {
SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j); STableSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j);
int64_t uid = pMeterInfo->uid; int64_t uid = pMeterInfo->uid;
progress[numOfMeters].uid = uid; progress[numOfTables].uid = uid;
progress[numOfMeters++].key = tscGetSubscriptionProgress(pSub, uid); progress[numOfTables++].key = tscGetSubscriptionProgress(pSub, uid);
} }
} }
qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
} }
free(pSub->progress); free(pSub->progress);
pSub->numOfMeters = numOfMeters; pSub->numOfTables = numOfTables;
pSub->progress = progress; pSub->progress = progress;
pSub->lastSyncTime = taosGetTimestampMs(); pSub->lastSyncTime = taosGetTimestampMs();
...@@ -257,9 +257,9 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -257,9 +257,9 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
return 0; return 0;
} }
int numOfMeters = atoi(buf); int numOfTables = atoi(buf);
SSubscriptionProgress* progress = calloc(numOfMeters, sizeof(SSubscriptionProgress)); SSubscriptionProgress* progress = calloc(numOfTables, sizeof(SSubscriptionProgress));
for (int i = 0; i < numOfMeters; i++) { for (int i = 0; i < numOfTables; i++) {
if (fgets(buf, sizeof(buf), fp) == NULL) { if (fgets(buf, sizeof(buf), fp) == NULL) {
fclose(fp); fclose(fp);
free(progress); free(progress);
...@@ -273,10 +273,10 @@ static int tscLoadSubscriptionProgress(SSub* pSub) { ...@@ -273,10 +273,10 @@ static int tscLoadSubscriptionProgress(SSub* pSub) {
fclose(fp); fclose(fp);
qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); qsort(progress, numOfTables, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress);
pSub->numOfMeters = numOfMeters; pSub->numOfTables = numOfTables;
pSub->progress = progress; pSub->progress = progress;
tscTrace("subscription progress loaded, %d tables: %s", numOfMeters, pSub->topic); tscTrace("subscription progress loaded, %d tables: %s", numOfTables, pSub->topic);
return 1; return 1;
} }
...@@ -297,8 +297,8 @@ void tscSaveSubscriptionProgress(void* sub) { ...@@ -297,8 +297,8 @@ void tscSaveSubscriptionProgress(void* sub) {
} }
fputs(pSub->pSql->sqlstr, fp); fputs(pSub->pSql->sqlstr, fp);
fprintf(fp, "\n%d\n", pSub->numOfMeters); fprintf(fp, "\n%d\n", pSub->numOfTables);
for (int i = 0; i < pSub->numOfMeters; i++) { for (int i = 0; i < pSub->numOfTables; i++) {
int64_t uid = pSub->progress[i].uid; int64_t uid = pSub->progress[i].uid;
TSKEY key = pSub->progress[i].key; TSKEY key = pSub->progress[i].key;
fprintf(fp, "%" PRId64 ":%" PRId64 "\n", uid, key); fprintf(fp, "%" PRId64 ":%" PRId64 "\n", uid, key);
...@@ -382,7 +382,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { ...@@ -382,7 +382,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pRes->numOfRows = 1; pRes->numOfRows = 1;
pRes->numOfTotal = 0; pRes->numOfTotal = 0;
pRes->qhandle = 0; pRes->qhandle = 0;
pSql->thandle = NULL;
pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.command = TSDB_SQL_SELECT;
pQueryInfo->type = type; pQueryInfo->type = type;
......
...@@ -45,6 +45,7 @@ int tsInsertHeadSize; ...@@ -45,6 +45,7 @@ int tsInsertHeadSize;
extern int tscEmbedded; extern int tscEmbedded;
int tscNumOfThreads; int tscNumOfThreads;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
static pthread_mutex_t tscMutex;
extern int tsTscEnableRecordSql; extern int tsTscEnableRecordSql;
extern int tsNumOfLogLines; extern int tsNumOfLogLines;
...@@ -56,11 +57,64 @@ void tscCheckDiskUsage(void *para, void *unused) { ...@@ -56,11 +57,64 @@ void tscCheckDiskUsage(void *para, void *unused) {
taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
} }
int32_t tscInitRpc(const char *user, const char *secret) {
SRpcInit rpcInit;
char secretEncrypt[32] = {0};
taosEncryptPass((uint8_t *)secret, strlen(secret), secretEncrypt);
pthread_mutex_lock(&tscMutex);
if (pVnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = user;
rpcInit.ckey = "key";
rpcInit.secret = secretEncrypt;
pVnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode");
pthread_mutex_unlock(&tscMutex);
return -1;
}
}
if (pTscMgmtConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 2000;
rpcInit.user = "root";
rpcInit.ckey = "key";
rpcInit.secret = secretEncrypt;
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
pthread_mutex_unlock(&tscMutex);
return -1;
}
}
pthread_mutex_unlock(&tscMutex);
return 0;
}
void taos_init_imp() { void taos_init_imp() {
char temp[128]; char temp[128];
struct stat dirstat; struct stat dirstat;
SRpcInit rpcInit;
pthread_mutex_init(&tscMutex, NULL);
srand(taosGetTimestampSec()); srand(taosGetTimestampSec());
deltaToUtcInitOnce(); deltaToUtcInitOnce();
...@@ -96,12 +150,12 @@ void taos_init_imp() { ...@@ -96,12 +150,12 @@ void taos_init_imp() {
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note"); taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
} }
tscMgmtIpList.index = 0; tscMgmtIpList.inUse = 0;
tscMgmtIpList.port = tsMgmtShellPort; tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.numOfIps = 1; tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) { if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
tscMgmtIpList.numOfIps = 2; tscMgmtIpList.numOfIps = 2;
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp); tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
} }
...@@ -124,34 +178,6 @@ void taos_init_imp() { ...@@ -124,34 +178,6 @@ void taos_init_imp() {
return; return;
} }
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.afp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
pVnodeConn = rpcOpen(&rpcInit);
if (pVnodeConn == NULL) {
tscError("failed to init connection to vnode");
return;
}
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localIp = tsLocalIp;
rpcInit.localPort = 0;
rpcInit.label = "TSC-mgmt";
rpcInit.numOfThreads = 1;
rpcInit.afp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxMgmtConnections;
rpcInit.connType = TAOS_CONN_SOCKET_TYPE_C();
pTscMgmtConn = rpcOpen(&rpcInit);
if (pTscMgmtConn == NULL) {
tscError("failed to init connection to mgmt");
return;
}
tscTmr = taosTmrInit(tsMaxMgmtConnections * 2, 200, 60000, "TSC"); tscTmr = taosTmrInit(tsMaxMgmtConnections * 2, 200, 60000, "TSC");
if(0 == tscEmbedded){ if(0 == tscEmbedded){
taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr); taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr);
...@@ -319,10 +345,10 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) { ...@@ -319,10 +345,10 @@ static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
assert(cfg != NULL); assert(cfg != NULL);
if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) { if (cfg->cfgStatus <= TSDB_CFG_CSTATUS_OPTION) {
if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) { // if (strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_UDP) != 0 && strcasecmp(pStr, TAOS_SOCKET_TYPE_NAME_TCP) != 0) {
tscError("only 'tcp' or 'udp' allowed for configuring the socket type"); // tscError("only 'tcp' or 'udp' allowed for configuring the socket type");
return -1; // return -1;
} // }
strncpy(tsSocketType, pStr, tListLen(tsSocketType)); strncpy(tsSocketType, pStr, tListLen(tsSocketType));
cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION; cfg->cfgStatus = TSDB_CFG_CSTATUS_OPTION;
......
此差异已折叠。
...@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED) ...@@ -8,7 +8,7 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME} ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-1.0.2-dist.jar ${LIBRARY_OUTPUT_PATH} COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-1.0.3-dist.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver") COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME}) ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
......
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>1.0.2</version> <version>1.0.3</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>JDBCDriver</name> <name>JDBCDriver</name>
......
...@@ -23,12 +23,17 @@ extern "C" { ...@@ -23,12 +23,17 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdbool.h> #include <stdbool.h>
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); int32_t dnodeInitMgmt();
void dnodeSendVpeerCfgMsg(int32_t vnode); void dnodeInitMgmtIp();
void dnodeSendMeterCfgMsg(int32_t vnode, int32_t sid);
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen);
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
void dnodeSendVnodeCfgMsg(int32_t vnode);
void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid);
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType);
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,31 +25,21 @@ extern "C" { ...@@ -25,31 +25,21 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h" #include "taosmsg.h"
/*
* Clear query information associated with this connection
*/
void dnodeFreeQInfo(void *pConn);
/*
* Clear all query informations
*/
void dnodeFreeQInfos();
/* /*
* handle query message, and the result is returned by callback function * handle query message, and the result is returned by callback function
*/ */
void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)); void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn));
/* /*
* Dispose retrieve msg, and the result will passed through callback function * Dispose retrieve msg, and the result will passed through callback function
*/ */
typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn); typedef void (*SDnodeRetrieveCallbackFp)(int32_t code, void *pQInfo, void *pConn);
void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp); void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp);
/* /*
* Fill retrieve result according to query info * Fill retrieve result according to query info
*/ */
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp); int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve);
/* /*
* Get the size of retrieve result according to query info * Get the size of retrieve result according to query info
......
...@@ -45,7 +45,7 @@ bool dnodeCheckVnodeExist(int32_t vid); ...@@ -45,7 +45,7 @@ bool dnodeCheckVnodeExist(int32_t vid);
* Create vnode with specified configuration and open it * Create vnode with specified configuration and open it
* if exist, config it * if exist, config it
*/ */
int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg); int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode);
/* /*
* Remove vnode from local repository * Remove vnode from local repository
...@@ -56,7 +56,7 @@ int32_t dnodeDropVnode(int32_t vnode); ...@@ -56,7 +56,7 @@ int32_t dnodeDropVnode(int32_t vnode);
* Get the vnode object that has been opened * Get the vnode object that has been opened
*/ */
//tsdb_repo_t* dnodeGetVnode(int vid); //tsdb_repo_t* dnodeGetVnode(int vid);
void* dnodeGetVnode(int vid); void* dnodeGetVnode(int32_t vnode);
/* /*
* get the status of vnode * get the status of vnode
......
...@@ -38,23 +38,23 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe ...@@ -38,23 +38,23 @@ void dnodeWriteData(SShellSubmitMsg *pSubmit, void *pConn, void (*callback)(SShe
* Create table with specified configuration and open it * Create table with specified configuration and open it
* if table already exist, update its schema and tag * if table already exist, update its schema and tag
*/ */
int32_t dnodeCreateTable(SDCreateTableMsg *table); int32_t dnodeCreateTable(SDCreateTableMsg *pTable);
/* /*
* Remove table from local repository * Remove table from local repository
*/ */
int32_t dnodeDropTable(int32_t vnode, int32_t sid, uint64_t uid); int32_t dnodeDropTable(SDRemoveTableMsg *pTable);
/* /*
* Create stream * Create stream
* if stream already exist, update it * if stream already exist, update it
*/ */
int32_t dnodeCreateStream(SAlterStreamMsg *stream); int32_t dnodeCreateStream(SDAlterStreamMsg *pStream);
/* /*
* Remove all child tables of supertable from local repository * Remove all child tables of supertable from local repository
*/ */
int32_t dnodeDropSuperTable(uint64_t stableUid); int32_t dnodeDropSuperTable(SDRemoveSuperTableMsg *pStable);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
此差异已折叠。
...@@ -121,10 +121,6 @@ void dnodeStartModulesImp() { ...@@ -121,10 +121,6 @@ void dnodeStartModulesImp() {
} }
} }
} }
if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
} }
void (*dnodeStartModules)() = dnodeStartModulesImp; void (*dnodeStartModules)() = dnodeStartModulesImp;
...@@ -22,34 +22,33 @@ ...@@ -22,34 +22,33 @@
#include "dnodeRead.h" #include "dnodeRead.h"
#include "dnodeSystem.h" #include "dnodeSystem.h"
void dnodeFreeQInfo(void *pConn) {} void dnodeQueryData(SQueryTableMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) {
dTrace("conn:%p, query msg is disposed", pConn);
void dnodeFreeQInfos() {} void *pQInfo = 100;
callback(TSDB_CODE_SUCCESS, pQInfo, pConn);
void dnodeQueryData(SQueryMeterMsg *pQuery, void *pConn, void (*callback)(int32_t code, void *pQInfo, void *pConn)) {
void *pQInfo = NULL;
int code = TSDB_CODE_SUCCESS;
callback(code, pConn, pQInfo);
} }
static void dnodeExecuteRetrieveData(SSchedMsg *pSched) { static void dnodeExecuteRetrieveData(SSchedMsg *pSched) {
//SRetrieveMeterMsg *pRetrieve = (SRetrieveMeterMsg *)pSched->msg;
SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle; SDnodeRetrieveCallbackFp callback = (SDnodeRetrieveCallbackFp)pSched->thandle;
SRetrieveTableMsg *pRetrieve = pSched->msg;
void *pConn = pSched->ahandle; void *pConn = pSched->ahandle;
dTrace("conn:%p, retrieve msg is disposed, qhandle:%" PRId64, pConn, pRetrieve->qhandle);
//examples //examples
int32_t code = TSDB_CODE_INVALID_QHANDLE; int32_t code = TSDB_CODE_SUCCESS;
void *pQInfo = NULL; //get from pConn void *pQInfo = (void*)pRetrieve->qhandle;
(*callback)(code, pQInfo, pConn);
//TODO build response here (*callback)(code, pQInfo, pConn);
free(pSched->msg); free(pSched->msg);
} }
void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) { void dnodeRetrieveData(SRetrieveTableMsg *pRetrieve, void *pConn, SDnodeRetrieveCallbackFp callbackFp) {
int8_t *msg = malloc(sizeof(SRetrieveMeterMsg)); dTrace("conn:%p, retrieve msg is received", pConn);
memcpy(msg, pRetrieve, sizeof(SRetrieveMeterMsg));
void *msg = malloc(sizeof(SRetrieveTableMsg));
memcpy(msg, pRetrieve, sizeof(SRetrieveTableMsg));
SSchedMsg schedMsg; SSchedMsg schedMsg;
schedMsg.msg = msg; schedMsg.msg = msg;
...@@ -59,12 +58,15 @@ void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieve ...@@ -59,12 +58,15 @@ void dnodeRetrieveData(SRetrieveMeterMsg *pRetrieve, void *pConn, SDnodeRetrieve
taosScheduleTask(tsQueryQhandle, &schedMsg); taosScheduleTask(tsQueryQhandle, &schedMsg);
} }
int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveMeterRsp *retrievalRsp) { int32_t dnodeGetRetrieveData(void *pQInfo, SRetrieveTableRsp *pRetrieve) {
dTrace("qInfo:%p, data is retrieved");
pRetrieve->numOfRows = 0;
return 0; return 0;
} }
int32_t dnodeGetRetrieveDataSize(void *pQInfo) { int32_t dnodeGetRetrieveDataSize(void *pQInfo) {
return 0; dTrace("qInfo:%p, contLen is 100");
return 100;
} }
此差异已折叠。
...@@ -33,16 +33,12 @@ ...@@ -33,16 +33,12 @@
#include "dnodeVnodeMgmt.h" #include "dnodeVnodeMgmt.h"
#ifdef CLUSTER #ifdef CLUSTER
#include "dnodeCluster.h" #include "acct.h"
#include "httpAdmin.h" #include "admin.h"
#include "mnodeAccount.h" #include "cluster.h"
#include "mnodeBalance.h" #include "grant.h"
#include "mnodeCluster.h" #include "replica.h"
#include "sdbReplica.h" #include "storage.h"
#include "multilevelStorage.h"
#include "vnodeCluster.h"
#include "vnodeReplica.h"
#include "dnodeGrant.h"
#endif #endif
static pthread_mutex_t tsDnodeMutex; static pthread_mutex_t tsDnodeMutex;
...@@ -120,16 +116,7 @@ void dnodeCheckDataDirOpenned(const char *dir) { ...@@ -120,16 +116,7 @@ void dnodeCheckDataDirOpenned(const char *dir) {
void dnodeInitPlugins() { void dnodeInitPlugins() {
#ifdef CLUSTER #ifdef CLUSTER
dnodeClusterInit(); acctInit();
httpAdminInit();
mnodeAccountInit();
mnodeBalanceInit();
mnodeClusterInit();
sdbReplicaInit();
multilevelStorageInit();
vnodeClusterInit();
vnodeReplicaInit();
dnodeGrantInit();
#endif #endif
} }
......
...@@ -20,34 +20,42 @@ ...@@ -20,34 +20,42 @@
#include "dnodeVnodeMgmt.h" #include "dnodeVnodeMgmt.h"
int32_t dnodeOpenVnodes() { int32_t dnodeOpenVnodes() {
return 0; dPrint("open all vnodes");
return TSDB_CODE_SUCCESS;
} }
int32_t dnodeCleanupVnodes() { int32_t dnodeCleanupVnodes() {
return 0; dPrint("clean all vnodes");
return TSDB_CODE_SUCCESS;
} }
bool dnodeCheckVnodeExist(int32_t vnode) { bool dnodeCheckVnodeExist(int32_t vnode) {
dPrint("vnode:%d, check vnode exist", vnode);
return true; return true;
} }
int32_t dnodeCreateVnode(int32_t vnode, SVPeersMsg *cfg) { int32_t dnodeCreateVnode(SCreateVnodeMsg *pVnode) {
return 0; dPrint("vnode:%d, is created", htonl(pVnode->vnode));
return TSDB_CODE_SUCCESS;
} }
int32_t dnodeDropVnode(int32_t vnode) { int32_t dnodeDropVnode(int32_t vnode) {
return 0; dPrint("vnode:%d, is dropped", vnode);
return TSDB_CODE_SUCCESS;
} }
void* dnodeGetVnode(int vid) { void* dnodeGetVnode(int32_t vnode) {
dPrint("vnode:%d, get vnode");
return NULL; return NULL;
} }
EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) { EVnodeStatus dnodeGetVnodeStatus(int32_t vnode) {
dPrint("vnode:%d, get vnode status");
return TSDB_VN_STATUS_MASTER; return TSDB_VN_STATUS_MASTER;
} }
bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) {
dPrint("vnode:%d, sid:%d, check table exist");
return true; return true;
} }
此差异已折叠。
此差异已折叠。
...@@ -44,13 +44,12 @@ extern uint32_t tsRebootTime; ...@@ -44,13 +44,12 @@ extern uint32_t tsRebootTime;
extern void (*dnodeStartModules)(); extern void (*dnodeStartModules)();
extern void (*dnodeParseParameterK)(); extern void (*dnodeParseParameterK)();
extern int32_t (*dnodeCheckSystem)(); extern int32_t (*dnodeCheckSystem)();
extern void (*dnodeInitMgmtIp)();
extern int (*dnodeInitMgmt)();
// dnodeMgmt
void dnodeProcessMsgFromMgmt(int8_t *pCont, int32_t contLen, int32_t msgType, void *pConn); // dnodeSystem
extern int32_t (*dnodeSendMsgToMnode)(int8_t *pCont, int32_t contLen, int8_t msgType); extern void *tsDnodeMgmtQhandle;
extern int32_t (*dnodeSendSimpleRspToMnode)(void *pConn, int32_t msgType, int32_t code);
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
// dnodeModule // dnodeModule
extern void (*dnodeStartModules)(); extern void (*dnodeStartModules)();
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -162,6 +162,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock ...@@ -162,6 +162,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 118, "table id mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased") TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 119, "query cache erased")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, 0, 120, "invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 121, "invalid table typee")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
}; };
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册