提交 77106595 编写于 作者: H Haojun Liao

other: merge 3.0

...@@ -16,3 +16,6 @@ ...@@ -16,3 +16,6 @@
[submodule "tools/taos-tools"] [submodule "tools/taos-tools"]
path = tools/taos-tools path = tools/taos-tools
url = https://github.com/taosdata/taos-tools url = https://github.com/taosdata/taos-tools
[submodule "tools/taosadapter"]
path = tools/taosadapter
url = https://github.com/taosdata/taosadapter.git
...@@ -38,6 +38,7 @@ def pre_test(){ ...@@ -38,6 +38,7 @@ def pre_test(){
sh ''' sh '''
hostname hostname
date date
env
''' '''
sh ''' sh '''
cd ${WK} cd ${WK}
...@@ -82,23 +83,33 @@ def pre_test(){ ...@@ -82,23 +83,33 @@ def pre_test(){
sh ''' sh '''
cd ${WKC} cd ${WKC}
git pull >/dev/null git pull >/dev/null
git log -5
echo "`date "+%Y%m%d-%H%M%S"` ${JOB_NAME}:${BRANCH_NAME}:${BUILD_ID}:${CHANGE_TARGET}" >>${WKDIR}/jenkins.log
echo "community log: `git log -5`" >>${WKDIR}/jenkins.log
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git log -5 git log -5
echo "community log merged: `git log -5`" >>${WKDIR}/jenkins.log
cd ${WK} cd ${WK}
git pull >/dev/null git pull >/dev/null
git log -5 git log -5
echo "tdinternal log: `git log -5`" >>${WKDIR}/jenkins.log
''' '''
} else if (env.CHANGE_URL =~ /\/TDinternal\//) { } else if (env.CHANGE_URL =~ /\/TDinternal\//) {
sh ''' sh '''
cd ${WK} cd ${WK}
git pull >/dev/null git pull >/dev/null
git log -5
echo "`date "+%Y%m%d-%H%M%S"` ${JOB_NAME}:${BRANCH_NAME}:${BUILD_ID}:${CHANGE_TARGET}" >>${WKDIR}/jenkins.log
echo "tdinternal log: `git log -5`" >>${WKDIR}/jenkins.log
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
git log -5 git log -5
echo "tdinternal log merged: `git log -5`" >>${WKDIR}/jenkins.log
cd ${WKC} cd ${WKC}
git pull >/dev/null git pull >/dev/null
git log -5 git log -5
echo "community log: `git log -5`" >>${WKDIR}/jenkins.log
''' '''
} else { } else {
sh ''' sh '''
...@@ -113,6 +124,9 @@ def pre_test(){ ...@@ -113,6 +124,9 @@ def pre_test(){
cd ${WKPY} cd ${WKPY}
git reset --hard git reset --hard
git pull git pull
git log -5
echo "python connector log: `git log -5`" >>${WKDIR}/jenkins.log
echo >>${WKDIR}/jenkins.log
''' '''
return 1 return 1
} }
......
...@@ -18,6 +18,33 @@ if (NOT DEFINED TD_GRANT) ...@@ -18,6 +18,33 @@ if (NOT DEFINED TD_GRANT)
SET(TD_GRANT FALSE) SET(TD_GRANT FALSE)
endif() endif()
IF ("${BUILD_HTTP}" STREQUAL "")
IF (TD_LINUX)
IF (TD_ARM_32)
SET(TD_BUILD_HTTP TRUE)
ELSE ()
SET(TD_BUILD_HTTP TRUE)
ENDIF ()
ELSEIF (TD_DARWIN)
SET(TD_BUILD_HTTP TRUE)
ELSE ()
SET(TD_BUILD_HTTP TRUE)
ENDIF ()
ELSEIF (${BUILD_HTTP} MATCHES "false")
SET(TD_BUILD_HTTP FALSE)
ELSEIF (${BUILD_HTTP} MATCHES "true")
SET(TD_BUILD_HTTP TRUE)
ELSEIF (${BUILD_HTTP} MATCHES "internal")
SET(TD_BUILD_HTTP FALSE)
SET(TD_BUILD_TAOSA_INTERNAL TRUE)
ELSE ()
SET(TD_BUILD_HTTP TRUE)
ENDIF ()
IF (TD_BUILD_HTTP)
ADD_DEFINITIONS(-DHTTP_EMBEDDED)
ENDIF ()
IF ("${BUILD_TOOLS}" STREQUAL "") IF ("${BUILD_TOOLS}" STREQUAL "")
IF (TD_LINUX) IF (TD_LINUX)
IF (TD_ARM_32) IF (TD_ARM_32)
......
...@@ -4,50 +4,275 @@ title: Keywords ...@@ -4,50 +4,275 @@ title: Keywords
There are about 200 keywords reserved by TDengine, they can't be used as the name of database, STable or table with either upper case, lower case or mixed case. There are about 200 keywords reserved by TDengine, they can't be used as the name of database, STable or table with either upper case, lower case or mixed case.
**Keywords List** ## Keywords List
| | | | | | ### A
| ----------- | ---------- | --------- | ---------- | ------------ |
| ABORT | CREATE | IGNORE | NULL | STAR | - ABORT
| ACCOUNT | CTIME | IMMEDIATE | OF | STATE | - ACCOUNT
| ACCOUNTS | DATABASE | IMPORT | OFFSET | STATEMENT | - ACCOUNTS
| ADD | DATABASES | IN | OR | STATE_WINDOW | - ADD
| AFTER | DAYS | INITIALLY | ORDER | STORAGE | - AFTER
| ALL | DBS | INSERT | PARTITIONS | STREAM | - ALL
| ALTER | DEFERRED | INSTEAD | PASS | STREAMS | - ALTER
| AND | DELIMITERS | INT | PLUS | STRING | - AND
| AS | DESC | INTEGER | PPS | SYNCDB | - AS
| ASC | DESCRIBE | INTERVAL | PRECISION | TABLE | - ASC
| ATTACH | DETACH | INTO | PREV | TABLES | - ATTACH
| BEFORE | DISTINCT | IS | PRIVILEGE | TAG |
| BEGIN | DIVIDE | ISNULL | QTIME | TAGS | ### B
| BETWEEN | DNODE | JOIN | QUERIES | TBNAME |
| BIGINT | DNODES | KEEP | QUERY | TIMES | - BEFORE
| BINARY | DOT | KEY | QUORUM | TIMESTAMP | - BEGIN
| BITAND | DOUBLE | KILL | RAISE | TINYINT | - BETWEEN
| BITNOT | DROP | LE | REM | TOPIC | - BIGINT
| BITOR | EACH | LIKE | REPLACE | TOPICS | - BINARY
| BLOCKS | END | LIMIT | REPLICA | TRIGGER | - BITAND
| BOOL | EQ | LINEAR | RESET | TSERIES | - BITNOT
| BY | EXISTS | LOCAL | RESTRICT | UMINUS | - BITOR
| CACHE | EXPLAIN | LP | ROW | UNION | - BLOCKS
| CACHELAST | FAIL | LSHIFT | RP | UNSIGNED | - BOOL
| CASCADE | FILE | LT | RSHIFT | UPDATE | - BY
| CHANGE | FILL | MATCH | SCORES | UPLUS |
| CLUSTER | FLOAT | MAXROWS | SELECT | USE | ### C
| COLON | FOR | MINROWS | SEMI | USER |
| COLUMN | FROM | MINUS | SESSION | USERS | - CACHE
| COMMA | FSYNC | MNODES | SET | USING | - CACHELAST
| COMP | GE | MODIFY | SHOW | VALUES | - CASCADE
| COMPACT | GLOB | MODULES | SLASH | VARIABLE | - CHANGE
| CONCAT | GRANTS | NCHAR | SLIDING | VARIABLES | - CLUSTER
| CONFLICT | GROUP | NE | SLIMIT | VGROUPS | - COLON
| CONNECTION | GT | NONE | SMALLINT | VIEW | - COLUMN
| CONNECTIONS | HAVING | NOT | SOFFSET | VNODES | - COMMA
| CONNS | ID | NOTNULL | STable | WAL | - COMP
| COPY | IF | NOW | STableS | WHERE | - COMPACT
| _C0 | _QSTART | _QSTOP | _QDURATION | _WSTART | - CONCAT
| _WSTOP | _WDURATION | _ROWTS | - CONFLICT
- CONNECTION
- CONNECTIONS
- CONNS
- COPY
- CREATE
- CTIME
### D
- DATABASE
- DATABASES
- DAYS
- DBS
- DEFERRED
- DELETE
- DELIMITERS
- DESC
- DESCRIBE
- DETACH
- DISTINCT
- DIVIDE
- DNODE
- DNODES
- DOT
- DOUBLE
- DROP
### E
- END
- EQ
- EXISTS
- EXPLAIN
### F
- FAIL
- FILE
- FILL
- FLOAT
- FOR
- FROM
- FSYNC
### G
- GE
- GLOB
- GRANTS
- GROUP
- GT
### H
- HAVING
### I
- ID
- IF
- IGNORE
- IMMEDIA
- IMPORT
- IN
- INITIAL
- INSERT
- INSTEAD
- INT
- INTEGER
- INTERVA
- INTO
- IS
- ISNULL
### J
- JOIN
### K
- KEEP
- KEY
- KILL
### L
- LE
- LIKE
- LIMIT
- LINEAR
- LOCAL
- LP
- LSHIFT
- LT
### M
- MATCH
- MAXROWS
- MINROWS
- MINUS
- MNODES
- MODIFY
- MODULES
### N
- NE
- NONE
- NOT
- NOTNULL
- NOW
- NULL
### O
- OF
- OFFSET
- OR
- ORDER
### P
- PARTITION
- PASS
- PLUS
- PPS
- PRECISION
- PREV
- PRIVILEGE
### Q
- QTIME
- QUERIE
- QUERY
- QUORUM
### R
- RAISE
- REM
- REPLACE
- REPLICA
- RESET
- RESTRIC
- ROW
- RP
- RSHIFT
### S
- SCORES
- SELECT
- SEMI
- SESSION
- SET
- SHOW
- SLASH
- SLIDING
- SLIMIT
- SMALLIN
- SOFFSET
- STable
- STableS
- STAR
- STATE
- STATEMEN
- STATE_WI
- STORAGE
- STREAM
- STREAMS
- STRING
- SYNCDB
### T
- TABLE
- TABLES
- TAG
- TAGS
- TBNAME
- TIMES
- TIMESTAMP
- TINYINT
- TOPIC
- TOPICS
- TRIGGER
- TSERIES
### U
- UMINUS
- UNION
- UNSIGNED
- UPDATE
- UPLUS
- USE
- USER
- USERS
- USING
### V
- VALUES
- VARIABLE
- VARIABLES
- VGROUPS
- VIEW
- VNODES
### W
- WAL
- WHERE
### _
- _C0
- _QSTART
- _QSTOP
- _QDURATION
- _WSTART
- _WSTOP
- _WDURATION
## Explanations ## Explanations
### TBNAME ### TBNAME
......
package main
import (
"database/sql"
"fmt"
_ "github.com/taosdata/driver-go/v2/taosRestful"
)
func createStable(taos *sql.DB) {
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
fmt.Println("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
fmt.Println("failed to create stable, err:", err)
}
}
func insertData(taos *sql.DB) {
sql := `INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
result, err := taos.Exec(sql)
if err != nil {
fmt.Println("failed to insert, err:", err)
return
}
rowsAffected, err := result.RowsAffected()
if err != nil {
fmt.Println("failed to get affected rows, err:", err)
return
}
fmt.Println("RowsAffected", rowsAffected)
}
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
defer taos.Close()
createStable(taos)
insertData(taos)
}
...@@ -45,48 +45,274 @@ title: TDengine 参数限制与保留关键字 ...@@ -45,48 +45,274 @@ title: TDengine 参数限制与保留关键字
目前 TDengine 有将近 200 个内部保留关键字,这些关键字无论大小写均不可以用作库名、表名、STable 名、数据列名及标签列名等。这些关键字列表如下: 目前 TDengine 有将近 200 个内部保留关键字,这些关键字无论大小写均不可以用作库名、表名、STable 名、数据列名及标签列名等。这些关键字列表如下:
| 关键字列表 | | | | | ### A
| ----------- | ---------- | --------- | ---------- | ------------ |
| ABORT | CREATE | IGNORE | NULL | STAR | - ABORT
| ACCOUNT | CTIME | IMMEDIATE | OF | STATE | - ACCOUNT
| ACCOUNTS | DATABASE | IMPORT | OFFSET | STATEMENT | - ACCOUNTS
| ADD | DATABASES | IN | OR | STATE_WINDOW | - ADD
| AFTER | DAYS | INITIALLY | ORDER | STORAGE | - AFTER
| ALL | DBS | INSERT | PARTITIONS | STREAM | - ALL
| ALTER | DEFERRED | INSTEAD | PASS | STREAMS | - ALTER
| AND | DELIMITERS | INT | PLUS | STRING | - AND
| AS | DESC | INTEGER | PPS | SYNCDB | - AS
| ASC | DESCRIBE | INTERVAL | PRECISION | TABLE | - ASC
| ATTACH | DETACH | INTO | PREV | TABLES | - ATTACH
| BEFORE | DISTINCT | IS | PRIVILEGE | TAG |
| BEGIN | DIVIDE | ISNULL | QTIME | TAGS | ### B
| BETWEEN | DNODE | JOIN | QUERIES | TBNAME |
| BIGINT | DNODES | KEEP | QUERY | TIMES | - BEFORE
| BINARY | DOT | KEY | QUORUM | TIMESTAMP | - BEGIN
| BITAND | DOUBLE | KILL | RAISE | TINYINT | - BETWEEN
| BITNOT | DROP | LE | REM | TOPIC | - BIGINT
| BITOR | EACH | LIKE | REPLACE | TOPICS | - BINARY
| BLOCKS | END | LIMIT | REPLICA | TRIGGER | - BITAND
| BOOL | EQ | LINEAR | RESET | TSERIES | - BITNOT
| BY | EXISTS | LOCAL | RESTRICT | UMINUS | - BITOR
| CACHE | EXPLAIN | LP | ROW | UNION | - BLOCKS
| CACHELAST | FAIL | LSHIFT | RP | UNSIGNED | - BOOL
| CASCADE | FILE | LT | RSHIFT | UPDATE | - BY
| CHANGE | FILL | MATCH | SCORES | UPLUS |
| CLUSTER | FLOAT | MAXROWS | SELECT | USE | ### C
| COLON | FOR | MINROWS | SEMI | USER |
| COLUMN | FROM | MINUS | SESSION | USERS | - CACHE
| COMMA | FSYNC | MNODES | SET | USING | - CACHELAST
| COMP | GE | MODIFY | SHOW | VALUES | - CASCADE
| COMPACT | GLOB | MODULES | SLASH | VARIABLE | - CHANGE
| CONCAT | GRANTS | NCHAR | SLIDING | VARIABLES | - CLUSTER
| CONFLICT | GROUP | NE | SLIMIT | VGROUPS | - COLON
| CONNECTION | GT | NONE | SMALLINT | VIEW | - COLUMN
| CONNECTIONS | HAVING | NOT | SOFFSET | VNODES | - COMMA
| CONNS | ID | NOTNULL | STABLE | WAL | - COMP
| COPY | IF | NOW | STABLES | WHERE | - COMPACT
| _C0 | _QSTART | _QSTOP | _QDURATION | _WSTART | - CONCAT
| _WSTOP | _WDURATION | _ROWTS | - CONFLICT
- CONNECTION
- CONNECTIONS
- CONNS
- COPY
- CREATE
- CTIME
### D
- DATABASE
- DATABASES
- DAYS
- DBS
- DEFERRED
- DELETE
- DELIMITERS
- DESC
- DESCRIBE
- DETACH
- DISTINCT
- DIVIDE
- DNODE
- DNODES
- DOT
- DOUBLE
- DROP
### E
- END
- EQ
- EXISTS
- EXPLAIN
### F
- FAIL
- FILE
- FILL
- FLOAT
- FOR
- FROM
- FSYNC
### G
- GE
- GLOB
- GRANTS
- GROUP
- GT
### H
- HAVING
### I
- ID
- IF
- IGNORE
- IMMEDIA
- IMPORT
- IN
- INITIAL
- INSERT
- INSTEAD
- INT
- INTEGER
- INTERVA
- INTO
- IS
- ISNULL
### J
- JOIN
### K
- KEEP
- KEY
- KILL
### L
- LE
- LIKE
- LIMIT
- LINEAR
- LOCAL
- LP
- LSHIFT
- LT
### M
- MATCH
- MAXROWS
- MINROWS
- MINUS
- MNODES
- MODIFY
- MODULES
### N
- NE
- NONE
- NOT
- NOTNULL
- NOW
- NULL
### O
- OF
- OFFSET
- OR
- ORDER
### P
- PARTITION
- PASS
- PLUS
- PPS
- PRECISION
- PREV
- PRIVILEGE
### Q
- QTIME
- QUERIE
- QUERY
- QUORUM
### R
- RAISE
- REM
- REPLACE
- REPLICA
- RESET
- RESTRIC
- ROW
- RP
- RSHIFT
### S
- SCORES
- SELECT
- SEMI
- SESSION
- SET
- SHOW
- SLASH
- SLIDING
- SLIMIT
- SMALLIN
- SOFFSET
- STable
- STableS
- STAR
- STATE
- STATEMEN
- STATE_WI
- STORAGE
- STREAM
- STREAMS
- STRING
- SYNCDB
### T
- TABLE
- TABLES
- TAG
- TAGS
- TBNAME
- TIMES
- TIMESTAMP
- TINYINT
- TOPIC
- TOPICS
- TRIGGER
- TSERIES
### U
- UMINUS
- UNION
- UNSIGNED
- UPDATE
- UPLUS
- USE
- USER
- USERS
- USING
### V
- VALUES
- VARIABLE
- VARIABLES
- VGROUPS
- VIEW
- VNODES
### W
- WAL
- WHERE
### _
- _C0
- _QSTART
- _QSTOP
- _QDURATION
- _WSTART
- _WSTOP
- _WDURATION
## 特殊说明 ## 特殊说明
### TBNAME ### TBNAME
......
...@@ -52,6 +52,7 @@ extern "C" { ...@@ -52,6 +52,7 @@ extern "C" {
#define TSDB_PERFS_TABLE_OFFSETS "offsets" #define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_PERFS_TABLE_TRANS "trans" #define TSDB_PERFS_TABLE_TRANS "trans"
#define TSDB_PERFS_TABLE_STREAMS "streams" #define TSDB_PERFS_TABLE_STREAMS "streams"
#define TSDB_PERFS_TABLE_APPS "apps"
typedef struct SSysDbTableSchema { typedef struct SSysDbTableSchema {
const char* name; const char* name;
......
...@@ -234,9 +234,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -234,9 +234,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock->info.numOfCols) + blockDataGetSize(pBlock); return blockDataGetSerialMetaSize(pBlock->info.numOfCols) + blockDataGetSize(pBlock);
} }
......
...@@ -1134,14 +1134,16 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp); ...@@ -1134,14 +1134,16 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(STableMetaRsp* pRsp); void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
void tFreeSTableIndexRsp(void* info);
typedef struct { typedef struct {
SArray* pArray; // Array of STableMetaRsp SArray* pMetaRsp; // Array of STableMetaRsp
} STableMetaBatchRsp; SArray* pIndexRsp; // Array of STableIndexRsp;
} SSTbHbRsp;
int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp); int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp); int32_t tDeserializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp); void tFreeSSTbHbRsp(SSTbHbRsp* pRsp);
typedef struct { typedef struct {
int32_t numOfTables; int32_t numOfTables;
...@@ -1295,8 +1297,17 @@ int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* ...@@ -1295,8 +1297,17 @@ int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq*
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
typedef struct { typedef struct {
int32_t connId; int32_t dnodeId;
int32_t queryId; int8_t standby;
} SSetStandbyReq;
int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
typedef struct {
int32_t connId; // todo remove
int32_t queryId; // todo remove
char queryStrId[TSDB_QUERY_ID_LEN];
} SKillQueryReq; } SKillQueryReq;
int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq); int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
...@@ -1506,6 +1517,7 @@ typedef struct { ...@@ -1506,6 +1517,7 @@ typedef struct {
char* sql; char* sql;
char* ast; char* ast;
int8_t triggerType; int8_t triggerType;
int64_t maxDelay;
int64_t watermark; int64_t watermark;
} SCMCreateStreamReq; } SCMCreateStreamReq;
...@@ -2297,6 +2309,29 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset); ...@@ -2297,6 +2309,29 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq); int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
// tqOffset
enum {
TMQ_OFFSET__SNAPSHOT = 1,
TMQ_OFFSET__LOG,
};
typedef struct {
int8_t type;
union {
struct {
int64_t uid;
int64_t ts;
};
struct {
int64_t version;
};
};
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
} STqOffset;
int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset);
int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN];
...@@ -2393,6 +2428,7 @@ typedef struct { ...@@ -2393,6 +2428,7 @@ typedef struct {
static FORCE_INLINE void tDestroyTSma(STSma* pSma) { static FORCE_INLINE void tDestroyTSma(STSma* pSma) {
if (pSma) { if (pSma) {
taosMemoryFreeClear(pSma->dstTbName);
taosMemoryFreeClear(pSma->expr); taosMemoryFreeClear(pSma->expr);
taosMemoryFreeClear(pSma->tagsFilter); taosMemoryFreeClear(pSma->tagsFilter);
} }
...@@ -2421,7 +2457,7 @@ int32_t tEncodeSVCreateTSmaReq(SEncoder* pCoder, const SVCreateTSmaReq* pReq); ...@@ -2421,7 +2457,7 @@ int32_t tEncodeSVCreateTSmaReq(SEncoder* pCoder, const SVCreateTSmaReq* pReq);
int32_t tDecodeSVCreateTSmaReq(SDecoder* pCoder, SVCreateTSmaReq* pReq); int32_t tDecodeSVCreateTSmaReq(SDecoder* pCoder, SVCreateTSmaReq* pReq);
int32_t tEncodeTSma(SEncoder* pCoder, const STSma* pSma); int32_t tEncodeTSma(SEncoder* pCoder, const STSma* pSma);
int32_t tDecodeTSma(SDecoder* pCoder, STSma* pSma); int32_t tDecodeTSma(SDecoder* pCoder, STSma* pSma, bool deepCopy);
static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq) { static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq) {
if (tEncodeI32(pEncoder, pReq->number) < 0) return -1; if (tEncodeI32(pEncoder, pReq->number) < 0) return -1;
...@@ -2431,10 +2467,10 @@ static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq) ...@@ -2431,10 +2467,10 @@ static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq)
return 0; return 0;
} }
static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) { static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq, bool deepCopy) {
if (tDecodeI32(pDecoder, &pReq->number) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->number) < 0) return -1;
for (int32_t i = 0; i < pReq->number; ++i) { for (int32_t i = 0; i < pReq->number; ++i) {
tDecodeTSma(pDecoder, pReq->tSma + i); tDecodeTSma(pDecoder, pReq->tSma + i, deepCopy);
} }
return 0; return 0;
} }
...@@ -2493,6 +2529,10 @@ typedef struct { ...@@ -2493,6 +2529,10 @@ typedef struct {
} STableIndexInfo; } STableIndexInfo;
typedef struct { typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t suid;
int32_t version;
SArray* pIndex; SArray* pIndex;
} STableIndexRsp; } STableIndexRsp;
......
...@@ -140,7 +140,7 @@ enum { ...@@ -140,7 +140,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
...@@ -176,6 +176,7 @@ enum { ...@@ -176,6 +176,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
...@@ -237,6 +238,8 @@ enum { ...@@ -237,6 +238,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL)
#if defined(TD_MSG_NUMBER_) #if defined(TD_MSG_NUMBER_)
TDMT_MAX TDMT_MAX
......
...@@ -78,7 +78,7 @@ ...@@ -78,7 +78,7 @@
#define TK_BUFFER 60 #define TK_BUFFER 60
#define TK_CACHELAST 61 #define TK_CACHELAST 61
#define TK_COMP 62 #define TK_COMP 62
#define TK_DAYS 63 #define TK_DURATION 63
#define TK_NK_VARIABLE 64 #define TK_NK_VARIABLE 64
#define TK_FSYNC 65 #define TK_FSYNC 65
#define TK_MAXROWS 66 #define TK_MAXROWS 66
...@@ -184,76 +184,77 @@ ...@@ -184,76 +184,77 @@
#define TK_TRIGGER 166 #define TK_TRIGGER 166
#define TK_AT_ONCE 167 #define TK_AT_ONCE 167
#define TK_WINDOW_CLOSE 168 #define TK_WINDOW_CLOSE 168
#define TK_WATERMARK 169 #define TK_MAX_DELAY 169
#define TK_KILL 170 #define TK_WATERMARK 170
#define TK_CONNECTION 171 #define TK_KILL 171
#define TK_TRANSACTION 172 #define TK_CONNECTION 172
#define TK_BALANCE 173 #define TK_TRANSACTION 173
#define TK_VGROUP 174 #define TK_BALANCE 174
#define TK_MERGE 175 #define TK_VGROUP 175
#define TK_REDISTRIBUTE 176 #define TK_MERGE 176
#define TK_SPLIT 177 #define TK_REDISTRIBUTE 177
#define TK_SYNCDB 178 #define TK_SPLIT 178
#define TK_DELETE 179 #define TK_SYNCDB 179
#define TK_NULL 180 #define TK_DELETE 180
#define TK_NK_QUESTION 181 #define TK_NULL 181
#define TK_NK_ARROW 182 #define TK_NK_QUESTION 182
#define TK_ROWTS 183 #define TK_NK_ARROW 183
#define TK_TBNAME 184 #define TK_ROWTS 184
#define TK_QSTARTTS 185 #define TK_TBNAME 185
#define TK_QENDTS 186 #define TK_QSTARTTS 186
#define TK_WSTARTTS 187 #define TK_QENDTS 187
#define TK_WENDTS 188 #define TK_WSTARTTS 188
#define TK_WDURATION 189 #define TK_WENDTS 189
#define TK_CAST 190 #define TK_WDURATION 190
#define TK_NOW 191 #define TK_CAST 191
#define TK_TODAY 192 #define TK_NOW 192
#define TK_TIMEZONE 193 #define TK_TODAY 193
#define TK_COUNT 194 #define TK_TIMEZONE 194
#define TK_FIRST 195 #define TK_COUNT 195
#define TK_LAST 196 #define TK_FIRST 196
#define TK_LAST_ROW 197 #define TK_LAST 197
#define TK_BETWEEN 198 #define TK_LAST_ROW 198
#define TK_IS 199 #define TK_BETWEEN 199
#define TK_NK_LT 200 #define TK_IS 200
#define TK_NK_GT 201 #define TK_NK_LT 201
#define TK_NK_LE 202 #define TK_NK_GT 202
#define TK_NK_GE 203 #define TK_NK_LE 203
#define TK_NK_NE 204 #define TK_NK_GE 204
#define TK_MATCH 205 #define TK_NK_NE 205
#define TK_NMATCH 206 #define TK_MATCH 206
#define TK_CONTAINS 207 #define TK_NMATCH 207
#define TK_JOIN 208 #define TK_CONTAINS 208
#define TK_INNER 209 #define TK_JOIN 209
#define TK_SELECT 210 #define TK_INNER 210
#define TK_DISTINCT 211 #define TK_SELECT 211
#define TK_WHERE 212 #define TK_DISTINCT 212
#define TK_PARTITION 213 #define TK_WHERE 213
#define TK_BY 214 #define TK_PARTITION 214
#define TK_SESSION 215 #define TK_BY 215
#define TK_STATE_WINDOW 216 #define TK_SESSION 216
#define TK_SLIDING 217 #define TK_STATE_WINDOW 217
#define TK_FILL 218 #define TK_SLIDING 218
#define TK_VALUE 219 #define TK_FILL 219
#define TK_NONE 220 #define TK_VALUE 220
#define TK_PREV 221 #define TK_NONE 221
#define TK_LINEAR 222 #define TK_PREV 222
#define TK_NEXT 223 #define TK_LINEAR 223
#define TK_HAVING 224 #define TK_NEXT 224
#define TK_ORDER 225 #define TK_HAVING 225
#define TK_SLIMIT 226 #define TK_ORDER 226
#define TK_SOFFSET 227 #define TK_SLIMIT 227
#define TK_LIMIT 228 #define TK_SOFFSET 228
#define TK_OFFSET 229 #define TK_LIMIT 229
#define TK_ASC 230 #define TK_OFFSET 230
#define TK_NULLS 231 #define TK_ASC 231
#define TK_ID 232 #define TK_NULLS 232
#define TK_NK_BITNOT 233 #define TK_ID 233
#define TK_INSERT 234 #define TK_NK_BITNOT 234
#define TK_VALUES 235 #define TK_INSERT 235
#define TK_IMPORT 236 #define TK_VALUES 236
#define TK_NK_SEMI 237 #define TK_IMPORT 237
#define TK_FILE 238 #define TK_NK_SEMI 238
#define TK_FILE 239
#define TK_NK_SPACE 300 #define TK_NK_SPACE 300
#define TK_NK_COMMENT 301 #define TK_NK_COMMENT 301
......
...@@ -32,9 +32,7 @@ typedef struct { ...@@ -32,9 +32,7 @@ typedef struct {
int32_t dnodeId; int32_t dnodeId;
bool standby; bool standby;
bool deploy; bool deploy;
int8_t replica; SReplica replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SMsgCb msgCb; SMsgCb msgCb;
} SMnodeOpt; } SMnodeOpt;
...@@ -61,6 +59,12 @@ void mndClose(SMnode *pMnode); ...@@ -61,6 +59,12 @@ void mndClose(SMnode *pMnode);
* @param pMnode The mnode object. * @param pMnode The mnode object.
*/ */
int32_t mndStart(SMnode *pMnode); int32_t mndStart(SMnode *pMnode);
/**
* @brief Stop mnode
*
* @param pMnode The mnode object.
*/
void mndStop(SMnode *pMnode); void mndStop(SMnode *pMnode);
/** /**
...@@ -73,16 +77,25 @@ void mndStop(SMnode *pMnode); ...@@ -73,16 +77,25 @@ void mndStop(SMnode *pMnode);
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pCluster, SMonVgroupInfo *pVgroup, SMonGrantInfo *pGrant); int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pCluster, SMonVgroupInfo *pVgroup, SMonGrantInfo *pGrant);
/**
* @brief Get mnode loads for status msg.
*
* @param pMnode The mnode object.
* @param pLoad
* @return int32_t 0 for success, -1 for failure.
*/
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
/** /**
* @brief Process the read, write, sync request. * @brief Process the rpc, sync request.
* *
* @param pMsg The request msg. * @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
int32_t mndPreProcessMsg(SRpcMsg *pMsg);
/** /**
* @brief Generate machine code * @brief Generate machine code
......
...@@ -98,14 +98,15 @@ typedef struct SCatalogCfg { ...@@ -98,14 +98,15 @@ typedef struct SCatalogCfg {
uint32_t stbRentSec; uint32_t stbRentSec;
} SCatalogCfg; } SCatalogCfg;
typedef struct SSTableMetaVersion { typedef struct SSTableVersion {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId; uint64_t dbId;
uint64_t suid; uint64_t suid;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
} SSTableMetaVersion; int32_t smaVer;
} SSTableVersion;
typedef struct SDbVgVersion { typedef struct SDbVgVersion {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
...@@ -267,7 +268,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t ...@@ -267,7 +268,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList); int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion** stables, uint32_t* num); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num); int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num);
...@@ -279,6 +280,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char* ...@@ -279,6 +280,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes); int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp);
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
......
...@@ -139,6 +139,14 @@ typedef enum EFunctionType { ...@@ -139,6 +139,14 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TOP_MERGE, FUNCTION_TYPE_TOP_MERGE,
FUNCTION_TYPE_BOTTOM_PARTIAL, FUNCTION_TYPE_BOTTOM_PARTIAL,
FUNCTION_TYPE_BOTTOM_MERGE, FUNCTION_TYPE_BOTTOM_MERGE,
FUNCTION_TYPE_FIRST_PARTIAL,
FUNCTION_TYPE_FIRST_MERGE,
FUNCTION_TYPE_LAST_PARTIAL,
FUNCTION_TYPE_LAST_MERGE,
FUNCTION_TYPE_AVG_PARTIAL,
FUNCTION_TYPE_AVG_MERGE,
FUNCTION_TYPE_STDDEV_PARTIAL,
FUNCTION_TYPE_STDDEV_MERGE,
// user defined funcion // user defined funcion
FUNCTION_TYPE_UDF = 10000 FUNCTION_TYPE_UDF = 10000
...@@ -176,6 +184,7 @@ bool fmIsRepeatScanFunc(int32_t funcId); ...@@ -176,6 +184,7 @@ bool fmIsRepeatScanFunc(int32_t funcId);
bool fmIsUserDefinedFunc(int32_t funcId); bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId); bool fmIsDistExecFunc(int32_t funcId);
bool fmIsForbidFillFunc(int32_t funcId); bool fmIsForbidFillFunc(int32_t funcId);
bool fmIsForbidStreamFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId); bool fmIsIntervalInterpoFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
......
...@@ -89,6 +89,7 @@ typedef struct STableOptions { ...@@ -89,6 +89,7 @@ typedef struct STableOptions {
ENodeType type; ENodeType type;
char comment[TSDB_TB_COMMENT_LEN]; char comment[TSDB_TB_COMMENT_LEN];
double filesFactor; double filesFactor;
int32_t delay;
SNodeList* pRollupFuncs; SNodeList* pRollupFuncs;
int32_t ttl; int32_t ttl;
SNodeList* pSma; SNodeList* pSma;
...@@ -286,9 +287,15 @@ typedef struct SKillStmt { ...@@ -286,9 +287,15 @@ typedef struct SKillStmt {
int32_t targetId; int32_t targetId;
} SKillStmt; } SKillStmt;
typedef struct SKillQueryStmt {
ENodeType type;
char queryId[TSDB_QUERY_ID_LEN];
} SKillQueryStmt;
typedef struct SStreamOptions { typedef struct SStreamOptions {
ENodeType type; ENodeType type;
int8_t triggerType; int8_t triggerType;
SNode* pDelay;
SNode* pWatermark; SNode* pWatermark;
} SStreamOptions; } SStreamOptions;
......
...@@ -204,6 +204,7 @@ typedef enum ENodeType { ...@@ -204,6 +204,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_PROJECT,
...@@ -252,22 +253,20 @@ typedef struct SNodeList { ...@@ -252,22 +253,20 @@ typedef struct SNodeList {
SListCell* pTail; SListCell* pTail;
} SNodeList; } SNodeList;
#define SNodeptr void* SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
SNodeptr nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNodeptr pNode);
SNodeList* nodesMakeList(); SNodeList* nodesMakeList();
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode); int32_t nodesListAppend(SNodeList* pList, SNode* pNode);
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode); int32_t nodesListStrictAppend(SNodeList* pList, SNode* pNode);
int32_t nodesListMakeAppend(SNodeList** pList, SNodeptr pNode); int32_t nodesListMakeAppend(SNodeList** pList, SNode* pNode);
int32_t nodesListMakeStrictAppend(SNodeList** pList, SNodeptr pNode); int32_t nodesListMakeStrictAppend(SNodeList** pList, SNode* pNode);
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc); int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc); int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc);
int32_t nodesListPushFront(SNodeList* pList, SNodeptr pNode); int32_t nodesListPushFront(SNodeList* pList, SNode* pNode);
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell); SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc); void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc);
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index); SNode* nodesListGetNode(SNodeList* pList, int32_t index);
void nodesDestroyList(SNodeList* pList); void nodesDestroyList(SNodeList* pList);
// Only clear the linked list structure, without releasing the elements inside // Only clear the linked list structure, without releasing the elements inside
void nodesClearList(SNodeList* pList); void nodesClearList(SNodeList* pList);
...@@ -275,9 +274,9 @@ void nodesClearList(SNodeList* pList); ...@@ -275,9 +274,9 @@ void nodesClearList(SNodeList* pList);
typedef enum EDealRes { DEAL_RES_CONTINUE = 1, DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, DEAL_RES_END } EDealRes; typedef enum EDealRes { DEAL_RES_CONTINUE = 1, DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, DEAL_RES_END } EDealRes;
typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext); typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext);
void nodesWalkExpr(SNodeptr pNode, FNodeWalker walker, void* pContext); void nodesWalkExpr(SNode* pNode, FNodeWalker walker, void* pContext);
void nodesWalkExprs(SNodeList* pList, FNodeWalker walker, void* pContext); void nodesWalkExprs(SNodeList* pList, FNodeWalker walker, void* pContext);
void nodesWalkExprPostOrder(SNodeptr pNode, FNodeWalker walker, void* pContext); void nodesWalkExprPostOrder(SNode* pNode, FNodeWalker walker, void* pContext);
void nodesWalkExprsPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext); void nodesWalkExprsPostOrder(SNodeList* pList, FNodeWalker walker, void* pContext);
typedef EDealRes (*FNodeRewriter)(SNode** pNode, void* pContext); typedef EDealRes (*FNodeRewriter)(SNode** pNode, void* pContext);
...@@ -286,13 +285,13 @@ void nodesRewriteExprs(SNodeList* pList, FNodeRewriter rewriter, void* pContext) ...@@ -286,13 +285,13 @@ void nodesRewriteExprs(SNodeList* pList, FNodeRewriter rewriter, void* pContext)
void nodesRewriteExprPostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext); void nodesRewriteExprPostOrder(SNode** pNode, FNodeRewriter rewriter, void* pContext);
void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext); void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext);
bool nodesEqualNode(const SNodeptr a, const SNodeptr b); bool nodesEqualNode(const SNode* a, const SNode* b);
SNodeptr nodesCloneNode(const SNodeptr pNode); SNode* nodesCloneNode(const SNode* pNode);
SNodeList* nodesCloneList(const SNodeList* pList); SNodeList* nodesCloneList(const SNodeList* pList);
const char* nodesNodeName(ENodeType type); const char* nodesNodeName(ENodeType type);
int32_t nodesNodeToString(const SNodeptr pNode, bool format, char** pStr, int32_t* pLen); int32_t nodesNodeToString(const SNode* pNode, bool format, char** pStr, int32_t* pLen);
int32_t nodesStringToNode(const char* pStr, SNode** pNode); int32_t nodesStringToNode(const char* pStr, SNode** pNode);
int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen); int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen);
......
...@@ -34,7 +34,13 @@ typedef struct SLogicNode { ...@@ -34,7 +34,13 @@ typedef struct SLogicNode {
uint8_t precision; uint8_t precision;
} SLogicNode; } SLogicNode;
typedef enum EScanType { SCAN_TYPE_TAG = 1, SCAN_TYPE_TABLE, SCAN_TYPE_SYSTEM_TABLE, SCAN_TYPE_STREAM } EScanType; typedef enum EScanType {
SCAN_TYPE_TAG = 1,
SCAN_TYPE_TABLE,
SCAN_TYPE_SYSTEM_TABLE,
SCAN_TYPE_STREAM,
SCAN_TYPE_TABLE_MERGE
} EScanType;
typedef struct SScanLogicNode { typedef struct SScanLogicNode {
SLogicNode node; SLogicNode node;
...@@ -262,6 +268,7 @@ typedef struct STableScanPhysiNode { ...@@ -262,6 +268,7 @@ typedef struct STableScanPhysiNode {
} STableScanPhysiNode; } STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode;
typedef STableScanPhysiNode STableMergeScanPhysiNode;
typedef STableScanPhysiNode SStreamScanPhysiNode; typedef STableScanPhysiNode SStreamScanPhysiNode;
typedef struct SProjectPhysiNode { typedef struct SProjectPhysiNode {
......
...@@ -351,7 +351,6 @@ typedef struct SQuery { ...@@ -351,7 +351,6 @@ typedef struct SQuery {
int32_t placeholderNum; int32_t placeholderNum;
SArray* pPlaceholderValues; SArray* pPlaceholderValues;
SNode* pPrepareRoot; SNode* pPrepareRoot;
struct SParseMetaCache* pMetaCache;
} SQuery; } SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext); void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
...@@ -65,6 +65,7 @@ void qDestroyQuery(SQuery* pQueryNode); ...@@ -65,6 +65,7 @@ void qDestroyQuery(SQuery* pQueryNode);
int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid); int32_t qSetSTableIdForRsma(SNode* pStmt, int64_t uid);
void qCleanupKeywordsTable();
int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash); int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash);
int32_t qResetStmtDataBlock(void* block, bool keepBuf); int32_t qResetStmtDataBlock(void* block, bool keepBuf);
......
...@@ -47,8 +47,9 @@ typedef enum { ...@@ -47,8 +47,9 @@ typedef enum {
typedef enum { typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0, TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER = 1, TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2, TAOS_SYNC_ONLY_ONE_REPLICA = 2,
TAOS_SYNC_ONLY_ONE_REPLICA = 3, TAOS_SYNC_NOT_IN_NEW_CONFIG = 3,
TAOS_SYNC_OTHER_ERROR = 100,
} ESyncProposeCode; } ESyncProposeCode;
typedef enum { typedef enum {
...@@ -110,6 +111,7 @@ typedef struct SSyncFSM { ...@@ -110,6 +111,7 @@ typedef struct SSyncFSM {
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
...@@ -199,15 +201,13 @@ bool syncIsRestoreFinish(int64_t rid); ...@@ -199,15 +201,13 @@ bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
// build SRpcMsg, need to call syncPropose with SRpcMsg
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
int32_t syncLeaderTransfer(int64_t rid); int32_t syncLeaderTransfer(int64_t rid);
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
// to be moved to static
void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -467,6 +467,7 @@ typedef struct SyncLeaderTransfer { ...@@ -467,6 +467,7 @@ typedef struct SyncLeaderTransfer {
SRaftId srcId; SRaftId srcId;
SRaftId destId; SRaftId destId;
*/ */
SNodeInfo newNodeInfo;
SRaftId newLeaderId; SRaftId newLeaderId;
} SyncLeaderTransfer; } SyncLeaderTransfer;
......
...@@ -653,6 +653,7 @@ int32_t* taosGetErrno(); ...@@ -653,6 +653,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_DELETE_WHERE TAOS_DEF_ERROR_CODE(0, 0x2655) #define TSDB_CODE_PAR_INVALID_DELETE_WHERE TAOS_DEF_ERROR_CODE(0, 0x2655)
#define TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG TAOS_DEF_ERROR_CODE(0, 0x2656) #define TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG TAOS_DEF_ERROR_CODE(0, 0x2656)
#define TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2657) #define TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2657)
#define TSDB_CODE_PAR_INVALID_WINDOW_PC TAOS_DEF_ERROR_CODE(0, 0x2658)
//planner //planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
...@@ -684,17 +685,23 @@ int32_t* taosGetErrno(); ...@@ -684,17 +685,23 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003) #define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
//tsma //tsma
#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3100) #define TSDB_CODE_TSMA_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x3100)
#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3101) #define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3101)
#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3102) #define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3102)
#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103) #define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3103)
#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3104) #define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3104)
#define TSDB_CODE_TSMA_RM_SKEY_IN_HASH TAOS_DEF_ERROR_CODE(0, 0x3105) #define TSDB_CODE_TSMA_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x3105)
#define TSDB_CODE_TSMA_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x3106)
#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3107)
//rsma //rsma
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -222,6 +222,8 @@ typedef enum ELogicConditionType { ...@@ -222,6 +222,8 @@ typedef enum ELogicConditionType {
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025 #define TSDB_TB_COMMENT_LEN 1025
#define TSDB_QUERY_ID_LEN 26
/** /**
* In some scenarios uint16_t (0~65535) is used to store the row len. * In some scenarios uint16_t (0~65535) is used to store the row len.
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header. * - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
...@@ -341,6 +343,9 @@ typedef enum ELogicConditionType { ...@@ -341,6 +343,9 @@ typedef enum ELogicConditionType {
#define TSDB_DB_SCHEMALESS_OFF 0 #define TSDB_DB_SCHEMALESS_OFF 0
#define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF
// #define TSDB_MIN_ROLLUP_DELAY 1
// #define TSDB_MAX_ROLLUP_DELAY 10
// #define TSDB_DEFAULT_ROLLUP_DELAY 1
#define TSDB_MIN_ROLLUP_FILE_FACTOR 0 #define TSDB_MIN_ROLLUP_FILE_FACTOR 0
#define TSDB_MAX_ROLLUP_FILE_FACTOR 10 #define TSDB_MAX_ROLLUP_FILE_FACTOR 10
#define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1 #define TSDB_DEFAULT_ROLLUP_FILE_FACTOR 0.1
......
...@@ -99,15 +99,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog ...@@ -99,15 +99,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0; int32_t code = 0;
STableMetaBatchRsp batchMetaRsp = {0}; SSTbHbRsp hbRsp = {0};
if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) { if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray); int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
for (int32_t i = 0; i < numOfBatchs; ++i) { for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i); STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
if (rsp->numOfColumns < 0) { if (rsp->numOfColumns < 0) {
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
...@@ -116,7 +116,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo ...@@ -116,7 +116,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId); tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
tFreeSTableMetaBatchRsp(&batchMetaRsp); tFreeSSTbHbRsp(&hbRsp);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
...@@ -124,7 +124,17 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo ...@@ -124,7 +124,17 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
} }
} }
tFreeSTableMetaBatchRsp(&batchMetaRsp); int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
catalogUpdateTableIndex(pCatalog, rsp);
}
taosArrayDestroy(hbRsp.pIndexRsp);
hbRsp.pIndexRsp = NULL;
tFreeSSTbHbRsp(&hbRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -455,7 +465,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl ...@@ -455,7 +465,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
} }
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SSTableMetaVersion *stbs = NULL; SSTableVersion *stbs = NULL;
uint32_t stbNum = 0; uint32_t stbNum = 0;
int32_t code = 0; int32_t code = 0;
...@@ -469,15 +479,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC ...@@ -469,15 +479,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
} }
for (int32_t i = 0; i < stbNum; ++i) { for (int32_t i = 0; i < stbNum; ++i) {
SSTableMetaVersion *stb = &stbs[i]; SSTableVersion *stb = &stbs[i];
stb->suid = htobe64(stb->suid); stb->suid = htobe64(stb->suid);
stb->sversion = htons(stb->sversion); stb->sversion = htons(stb->sversion);
stb->tversion = htons(stb->tversion); stb->tversion = htons(stb->tversion);
stb->smaVer = htonl(stb->smaVer);
} }
SKv kv = { SKv kv = {
.key = HEARTBEAT_KEY_STBINFO, .key = HEARTBEAT_KEY_STBINFO,
.valueLen = sizeof(SSTableMetaVersion) * stbNum, .valueLen = sizeof(SSTableVersion) * stbNum,
.value = stbs, .value = stbs,
}; };
...@@ -698,7 +709,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { ...@@ -698,7 +709,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
return NULL; return NULL;
} }
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq); // taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
taosThreadMutexLock(&clientHbMgr.lock); taosThreadMutexLock(&clientHbMgr.lock);
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
......
...@@ -206,7 +206,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -206,7 +206,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL; SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp); int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false); code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
} }
return code; return code;
...@@ -233,9 +233,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -233,9 +233,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
return pRequest->pTscObj->pAppInfo;
}
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL; SRetrieveTableRsp* pRsp = NULL;
...@@ -259,7 +257,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -259,7 +257,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
} }
pRequest->body.queryFp(pRequest->body.param, pRequest, 0); pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); // pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
} }
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
...@@ -404,16 +402,14 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod ...@@ -404,16 +402,14 @@ int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNod
SQueryResult res = {.code = 0, .numOfRows = 0}; SQueryResult res = {.code = 0, .numOfRows = 0};
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self .requestObjRefId = pRequest->self};
};
SSchedulerReq req = {.pConn = &conn, SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList, .pNodeList = pNodeList,
.pDag = pDag, .pDag = pDag,
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.fp = schdExecCallback, .fp = schdExecCallback,
.cbParam = &res .cbParam = &res};
};
int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
...@@ -458,18 +454,16 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -458,18 +454,16 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {0}; SQueryResult res = {0};
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self .requestObjRefId = pRequest->self};
};
SSchedulerReq req = {.pConn = &conn, SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList, .pNodeList = pNodeList,
.pDag = pDag, .pDag = pDag,
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.fp = NULL, .fp = NULL,
.cbParam = NULL .cbParam = NULL};
};
int32_t code = schedulerExecJob(&req,&pRequest->body.queryJob, &res); int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
pRequest->body.resInfo.execRes = res.res; pRequest->body.resInfo.execRes = res.res;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -482,7 +476,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -482,7 +476,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code; return pRequest->code;
} }
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows; pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.queryJob != 0) { if (pRequest->body.queryJob != 0) {
...@@ -495,7 +490,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -495,7 +490,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
return pRequest->code; return pRequest->code;
} }
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) { int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
int32_t code = 0; int32_t code = 0;
SArray* pArray = NULL; SArray* pArray = NULL;
SSubmitRsp* pRsp = (SSubmitRsp*)res; SSubmitRsp* pRsp = (SSubmitRsp*)res;
...@@ -532,7 +527,7 @@ _return: ...@@ -532,7 +527,7 @@ _return:
return code; return code;
} }
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) { int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
int32_t code = 0; int32_t code = 0;
SArray* pArray = NULL; SArray* pArray = NULL;
SArray* pTbArray = (SArray*)res; SArray* pTbArray = (SArray*)res;
...@@ -601,7 +596,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -601,7 +596,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
break; break;
} }
default: default:
tscError("0x%"PRIx64", invalid exec result for request type %d, reqId:0x%"PRIx64, pRequest->self, tscError("0x%" PRIx64 ", invalid exec result for request type %d, reqId:0x%" PRIx64, pRequest->self,
pRequest->type, pRequest->requestId); pRequest->type, pRequest->requestId);
code = TSDB_CODE_APP_ERROR; code = TSDB_CODE_APP_ERROR;
} }
...@@ -610,13 +605,13 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -610,13 +605,13 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
} }
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param; SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code; pRequest->code = code;
STscObj* pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; pRequest->prevCode = code;
doAsyncQuery(pRequest, true); doAsyncQuery(pRequest, true);
return; return;
...@@ -727,30 +722,29 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { ...@@ -727,30 +722,29 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList); code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
if (code) { if (code) {
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SRequestConnInfo conn = {.pTrans = pAppInfo->pTransporter, SRequestConnInfo conn = {
.requestId = pRequest->requestId, .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
.requestObjRefId = pRequest->self
};
SSchedulerReq req = {.pConn = &conn, SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList, .pNodeList = pNodeList,
.pDag = pRequest->body.pDag, .pDag = pRequest->body.pDag,
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.fp = schedulerExecCb, .fp = schedulerExecCb,
.cbParam = pRequest .cbParam = pRequest};
};
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
} else { } else {
tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
pRequest->requestId);
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
//todo not to be released here // todo not to be released here
taosArrayDestroy(pNodeList); taosArrayDestroy(pNodeList);
break; break;
} }
...@@ -997,9 +991,8 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, ...@@ -997,9 +991,8 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet; SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
SEp* pOrigEp = &pOrig->eps[pOrig->inUse]; SEp* pOrigEp = &pOrig->eps[pOrig->inUse];
SEp* pNewEp = &pEpSet->eps[pEpSet->inUse]; SEp* pNewEp = &pEpSet->eps[pEpSet->inUse];
tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
pOrig->inUse, pOrig->numOfEps, pOrigEp->fqdn, pOrigEp->port, pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
break; break;
case TARGET_TYPE_VNODE: { case TARGET_TYPE_VNODE: {
...@@ -1415,14 +1408,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -1415,14 +1408,14 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
p += sizeof(uint64_t); p += sizeof(uint64_t);
// check fields // check fields
for(int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int16_t type = *(int16_t*) p; int16_t type = *(int16_t*)p;
p += sizeof(int16_t); p += sizeof(int16_t);
int32_t bytes = *(int32_t*) p; int32_t bytes = *(int32_t*)p;
p += sizeof(int32_t); p += sizeof(int32_t);
// ASSERT(type == pFields[i].type && bytes == pFields[i].bytes); /*ASSERT(type == pFields[i].type && bytes == pFields[i].bytes);*/
} }
int32_t* colLength = (int32_t*)p; int32_t* colLength = (int32_t*)p;
......
...@@ -25,12 +25,13 @@ ...@@ -25,12 +25,13 @@
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "version.h" #include "version.h"
#include "functionMgt.h"
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
static int32_t sentinel = TSC_VAR_NOT_RELEASE; static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt); static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt);
int taos_options(TSDB_OPTION option, const void *arg, ...) { int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0; static int32_t lock = 0;
...@@ -61,6 +62,9 @@ void taos_cleanup(void) { ...@@ -61,6 +62,9 @@ void taos_cleanup(void) {
cleanupTaskQueue(); cleanupTaskQueue();
fmFuncMgtDestroy();
qCleanupKeywordsTable();
id = clientConnRefPool; id = clientConnRefPool;
clientConnRefPool = -1; clientConnRefPool = -1;
taosCloseRef(id); taosCloseRef(id);
...@@ -177,8 +181,8 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -177,8 +181,8 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return pResInfo->userFields; return pResInfo->userFields;
} }
static void syncQueryFn(void* param, void* res, int32_t code) { static void syncQueryFn(void *param, void *res, int32_t code) {
SSyncQueryParam* pParam = param; SSyncQueryParam *pParam = param;
pParam->pRequest = res; pParam->pRequest = res;
pParam->pRequest->code = code; pParam->pRequest->code = code;
...@@ -190,10 +194,10 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { ...@@ -190,10 +194,10 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return NULL; return NULL;
} }
STscObj* pTscObj = (STscObj*)taos; STscObj *pTscObj = (STscObj *)taos;
#if SYNC_ON_TOP_OF_ASYNC #if SYNC_ON_TOP_OF_ASYNC
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam *param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taos_query_a(pTscObj, sql, syncQueryFn, param); taos_query_a(pTscObj, sql, syncQueryFn, param);
...@@ -606,16 +610,30 @@ const char *taos_get_server_info(TAOS *taos) { ...@@ -606,16 +610,30 @@ const char *taos_get_server_info(TAOS *taos) {
} }
typedef struct SqlParseWrapper { typedef struct SqlParseWrapper {
SParseContext* pCtx; SParseContext *pCtx;
SCatalogReq catalogReq; SCatalogReq catalogReq;
SRequestObj* pRequest; SRequestObj *pRequest;
SQuery* pQuery; SQuery *pQuery;
} SqlParseWrapper; } SqlParseWrapper;
void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
SqlParseWrapper *pWrapper = (SqlParseWrapper*) param; taosArrayDestroy(pWrapper->catalogReq.pDbVgroup);
SQuery* pQuery = pWrapper->pQuery; taosArrayDestroy(pWrapper->catalogReq.pDbCfg);
SRequestObj* pRequest = pWrapper->pRequest; taosArrayDestroy(pWrapper->catalogReq.pDbInfo);
taosArrayDestroy(pWrapper->catalogReq.pTableMeta);
taosArrayDestroy(pWrapper->catalogReq.pTableHash);
taosArrayDestroy(pWrapper->catalogReq.pUdf);
taosArrayDestroy(pWrapper->catalogReq.pIndex);
taosArrayDestroy(pWrapper->catalogReq.pUser);
taosArrayDestroy(pWrapper->catalogReq.pTableIndex);
taosMemoryFree(pWrapper->pCtx);
taosMemoryFree(pWrapper);
}
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
SQuery *pQuery = pWrapper->pQuery;
SRequestObj *pRequest = pWrapper->pRequest;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
...@@ -630,22 +648,22 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) { ...@@ -630,22 +648,22 @@ void retrieveMetaCallback(SMetaData* pResultMeta, void* param, int32_t code) {
TSWAP(pRequest->dbList, (pQuery)->pDbList); TSWAP(pRequest->dbList, (pQuery)->pDbList);
TSWAP(pRequest->tableList, (pQuery)->pTableList); TSWAP(pRequest->tableList, (pQuery)->pTableList);
taosMemoryFree(pWrapper); destorySqlParseWrapper(pWrapper);
launchAsyncQuery(pRequest, pQuery); launchAsyncQuery(pRequest, pQuery);
} else { } else {
destorySqlParseWrapper(pWrapper);
tscDebug("error happens, code:%d", code); tscDebug("error happens, code:%d", code);
if (NEED_CLIENT_HANDLE_ERROR(code)) { if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%"PRIx64" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
pRequest->prevCode = code; pRequest->prevCode = code;
doAsyncQuery(pRequest, true); doAsyncQuery(pRequest, true);
return; return;
} }
// return to app directly // return to app directly
taosMemoryFree(pWrapper); tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self,
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, reqId:0x%" PRIx64, pRequest->self, tstrerror(code), tstrerror(code), pRequest->requestId);
pRequest->requestId);
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
...@@ -682,7 +700,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param ...@@ -682,7 +700,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
doAsyncQuery(pRequest, false); doAsyncQuery(pRequest, false);
} }
int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) { int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
const STscObj *pTscObj = pRequest->pTscObj; const STscObj *pTscObj = pRequest->pTscObj;
*pCxt = taosMemoryCalloc(1, sizeof(SParseContext)); *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
...@@ -690,7 +708,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) { ...@@ -690,7 +708,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
**pCxt = (SParseContext){.requestId = pRequest->requestId, **pCxt = (SParseContext){
.requestId = pRequest->requestId,
.requestRid = pRequest->self, .requestRid = pRequest->self,
.acctId = pTscObj->acctId, .acctId = pTscObj->acctId,
.db = pRequest->pDb, .db = pRequest->pDb,
...@@ -704,12 +723,13 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) { ...@@ -704,12 +723,13 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext** pCxt) {
.pUser = pTscObj->user, .pUser = pTscObj->user,
.schemalessType = pTscObj->schemalessType, .schemalessType = pTscObj->schemalessType,
.isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)), .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
.async = true,}; .async = true,
};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SParseContext* pCxt = NULL; SParseContext *pCxt = NULL;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
int32_t code = 0; int32_t code = 0;
...@@ -753,23 +773,24 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) { ...@@ -753,23 +773,24 @@ void doAsyncQuery(SRequestObj* pRequest, bool updateMetaForce) {
.requestObjRefId = pCxt->requestRid, .requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet}; .mgmtEps = pCxt->mgmtEpSet};
code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId, code = catalogAsyncGetAllMeta(pCxt->pCatalog, &conn, pRequest->requestId, &catalogReq, retrieveMetaCallback, pWrapper,
&catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); &pRequest->body.queryJob);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
return; return;
} }
_error: _error:
tscError("0x%"PRIx64" error happens, code:%d - %s, reqId:0x%"PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
pRequest->body.queryFp(pRequest->body.param, pRequest, code); pRequest->body.queryFp(pRequest->body.param, pRequest, code);
} }
static void fetchCallback(void* pResult, void* param, int32_t code) { static void fetchCallback(void *pResult, void *param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*) param; SRequestObj *pRequest = (SRequestObj *)param;
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->pData = pResult; pResultInfo->pData = pResult;
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
...@@ -785,7 +806,8 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { ...@@ -785,7 +806,8 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
return; return;
} }
pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false); pRequest->code =
setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, false);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->code = code; pRequest->code = code;
...@@ -801,7 +823,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { ...@@ -801,7 +823,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
} }
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT (res != NULL && fp != NULL); ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
pRequest->body.fetchFp = fp; pRequest->body.fetchFp = fp;
...@@ -825,7 +847,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -825,7 +847,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
} }
void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) { void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
ASSERT(res != NULL && fp != NULL); ASSERT(res != NULL && fp != NULL);
SRequestObj *pRequest = res; SRequestObj *pRequest = res;
...@@ -838,9 +860,9 @@ void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) { ...@@ -838,9 +860,9 @@ void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) {
taos_fetch_rows_a(res, fp, param); taos_fetch_rows_a(res, fp, param);
} }
const void* taos_get_raw_block(TAOS_RES* res) { const void *taos_get_raw_block(TAOS_RES *res) {
ASSERT(res != NULL); ASSERT(res != NULL);
SRequestObj* pRequest = res; SRequestObj *pRequest = res;
return pRequest->body.resInfo.pData; return pRequest->body.resInfo.pData;
} }
...@@ -924,10 +946,9 @@ int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) { ...@@ -924,10 +946,9 @@ int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
return stmtSetTbTags(stmt, tags); return stmtSetTbTags(stmt, tags);
} }
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); } int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) { int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
if (stmt == NULL || NULL == fieldNum) { if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__); tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
...@@ -937,7 +958,7 @@ int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fiel ...@@ -937,7 +958,7 @@ int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fiel
return stmtGetTagFields(stmt, fieldNum, fields); return stmtGetTagFields(stmt, fieldNum, fields);
} }
int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) { int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
if (stmt == NULL || NULL == fieldNum) { if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__); tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
...@@ -1102,4 +1123,3 @@ int taos_stmt_close(TAOS_STMT *stmt) { ...@@ -1102,4 +1123,3 @@ int taos_stmt_close(TAOS_STMT *stmt) {
return stmtClose(stmt); return stmtClose(stmt);
} }
此差异已折叠。
...@@ -132,6 +132,7 @@ typedef struct { ...@@ -132,6 +132,7 @@ typedef struct {
// statistics // statistics
int64_t pollCnt; int64_t pollCnt;
// offset // offset
int64_t committedOffset;
int64_t currentOffset; int64_t currentOffset;
// connection info // connection info
int32_t vgId; int32_t vgId;
...@@ -193,6 +194,26 @@ typedef struct { ...@@ -193,6 +194,26 @@ typedef struct {
void* userParam; void* userParam;
} SMqCommitCbParam; } SMqCommitCbParam;
typedef struct {
tmq_t* tmq;
int8_t automatic;
int8_t async;
int8_t freeOffsets;
int32_t waitingRspNum;
int32_t totalRspNum;
tmq_resp_err_t rspErr;
tmq_commit_cb* userCb;
SArray* successfulOffsets;
SArray* failedOffsets;
void* userParam;
tsem_t rspSem;
} SMqCommitCbParamSet;
typedef struct {
SMqCommitCbParamSet* params;
STqOffset* pOffset;
} SMqCommitCbParam2;
tmq_conf_t* tmq_conf_new() { tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
conf->withTbName = false; conf->withTbName = false;
...@@ -343,6 +364,139 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -343,6 +364,139 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
return 0; return 0;
} }
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
if (code == 0) {
taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
} else {
taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
}
// count down waiting rsp
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
ASSERT(waitingRspNum >= 0);
if (waitingRspNum == 0) {
// if no more waiting rsp
if (pParamSet->async) {
// call async cb func
if (pParamSet->automatic && pParamSet->tmq->commitCb) {
pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) {
// sem post
pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam);
}
}
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
return 0;
}
int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) {
int32_t code = -1;
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pParamSet->tmq = tmq;
pParamSet->automatic = automatic;
pParamSet->async = async;
pParamSet->freeOffsets = 1;
pParamSet->userCb = userCb;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i);
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t tlen = strlen(tmq->groupId);
memcpy(pOffset->subKey, tmq->groupId, tlen);
pOffset->subKey[tlen] = TMQ_SEPARATOR;
strcpy(pOffset->subKey + tlen + 1, pTopic->topicName);
int32_t len;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
ASSERT(0);
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
// TODO
continue;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = len,
.handle = NULL,
};
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
// send msg
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
pParamSet->waitingRspNum++;
pParamSet->totalRspNum++;
}
}
if (!async) {
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
} else {
code = 0;
}
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, code, NULL, tmq->commitCbUserParam);
} else {
userCb(tmq, code, NULL, userParam);
}
}
if (!async) {
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
return 0;
}
int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) { tmq_commit_cb* userCb, void* userParam) {
SMqCMCommitOffsetReq req; SMqCMCommitOffsetReq req;
...@@ -890,12 +1044,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { ...@@ -890,12 +1044,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset; int64_t offset = pVgEp->offset;
tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); tscDebug("consumer %ld(epoch %d) original offset of vg %d is %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
if (pOffset != NULL) { if (pOffset != NULL) {
offset = *pOffset; offset = *pOffset;
tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey); tscDebug("consumer %ld(epoch %d) receive offset of vg %d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
vgKey);
} }
tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffset = offset, .currentOffset = offset,
...@@ -1226,9 +1381,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1226,9 +1381,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) { if (pollRspWrapper->msg.head.epoch == consumerEpoch) {
/*printf("epoch match\n");*/
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = pollRspWrapper->msg.rspOffset; pVg->currentOffset = pollRspWrapper->msg.rspOffset;
...@@ -1243,7 +1397,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1243,7 +1397,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
/*printf("epoch mismatch\n");*/ tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", pollRspWrapper->msg.head.epoch,
consumerEpoch);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else { } else {
...@@ -1263,10 +1418,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1263,10 +1418,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
SMqRspObj* rspObj; SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
#if 0
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, timeout);
rspObj = tmqHandleAllRsp(tmq, timeout, false); rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) { if (rspObj) {
return (TAOS_RES*)rspObj; return (TAOS_RES*)rspObj;
} }
#endif
// in no topic status also need process delayed task // in no topic status also need process delayed task
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
...@@ -1359,8 +1518,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { ...@@ -1359,8 +1518,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
pRspObj->resIter >= pRspObj->rsp.blockNum) { pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL; return NULL;
} }
const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
return name;
} }
return NULL; return NULL;
} }
......
...@@ -1260,4 +1260,28 @@ TEST(testCase, sml_16368_Test) { ...@@ -1260,4 +1260,28 @@ TEST(testCase, sml_16368_Test) {
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS); pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS);
ASSERT_EQ(taos_errno(pRes), 0); ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes); taos_free_result(pRes);
}*/ }
TEST(testCase, sml_dup_time_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists dup_time schemaless 1");
taos_free_result(pRes);
const char *sql[] = {
//"test_ms,t0=t c0=f 1626006833641",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=false,c1=1i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"xcxvwjvf\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=T,c1=2i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"fixrzcuq\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=3i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"iupzdqub\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=4i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"yvvtzzof\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000",
"ubzlsr,id=qmtcvgd,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=5i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"vbxpilkj\",c8=L\"ncharColValue\",c9=7u64 1626006833639000000"
};
pRes = taos_query(taos, "use dup_time");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
ASSERT_EQ(taos_errno(pRes), 0);
taos_free_result(pRes);
}
*/
...@@ -91,8 +91,8 @@ static const SSysDbTableSchema userDBSchema[] = { ...@@ -91,8 +91,8 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, // {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "retention", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
// {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update
}; };
...@@ -137,7 +137,7 @@ static const SSysDbTableSchema streamSchema[] = { ...@@ -137,7 +137,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
}; };
static const SSysDbTableSchema userTblsSchema[] = { static const SSysDbTableSchema userTblsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
...@@ -221,7 +221,9 @@ static const SSysDbTableSchema transSchema[] = { ...@@ -221,7 +221,9 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "last_action_info",
.bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE,
.type = TSDB_DATA_TYPE_VARCHAR},
}; };
static const SSysDbTableSchema configSchema[] = { static const SSysDbTableSchema configSchema[] = {
...@@ -314,8 +316,6 @@ static const SSysDbTableSchema querySchema[] = { ...@@ -314,8 +316,6 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
}; };
static const SSysTableMeta perfsMeta[] = { static const SSysTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)}, {TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
{TSDB_PERFS_TABLE_QUERIES, querySchema, tListLen(querySchema)}, {TSDB_PERFS_TABLE_QUERIES, querySchema, tListLen(querySchema)},
......
...@@ -1708,6 +1708,7 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { ...@@ -1708,6 +1708,7 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
pTag->keyLen = strlen(pTag->key); pTag->keyLen = strlen(pTag->key);
pTag->type = TSDB_DATA_TYPE_UBIGINT; pTag->type = TSDB_DATA_TYPE_UBIGINT;
pTag->u = groupId; pTag->u = groupId;
pTag->length = sizeof(uint64_t);
taosArrayPush(tags, &pTag); taosArrayPush(tags, &pTag);
void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1); void* cname = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + 1);
...@@ -1728,173 +1729,6 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) { ...@@ -1728,173 +1729,6 @@ char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId) {
return rname.childTableName; return rname.childTableName;
} }
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// cal size
int32_t cap = sizeof(SSubmitReq);
int32_t sz = taosArrayGetSize(pBlocks);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
int32_t rows = pDataBlock->info.rows;
// TODO min
int32_t rowSize = pDataBlock->info.rowSize;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
STag* pTag = NULL;
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
taosArrayDestroy(tagArray);
return NULL;
}
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
}
// assign data
// TODO
ret = taosMemoryCalloc(1, cap + 46);
ret = POINTER_SHIFT(ret, 46);
ret->header.vgId = vgId;
ret->version = htonl(1);
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz);
void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
SSubmitBlk* blkHead = submitBlk;
blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
// TODO
blkHead->suid = htobe64(suid);
// uid is assigned by vnode
blkHead->uid = 0;
int32_t rows = pDataBlock->info.rows;
/*int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);*/
/*blkHead->dataLen = htonl(rows * maxLen);*/
blkHead->dataLen = 0;
void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk));
int32_t schemaLen = 0;
if (createTb) {
SVCreateTbReq createTbReq = {0};
char* cname = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
createTbReq.name = cname;
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
}
blkHead->schemaLen = htonl(schemaLen);
STSRow* rowData = POINTER_SHIFT(blockData, schemaLen);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, rowData);
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k);
} else {
void* data = colDataGetData(pColData, j);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k);
}
}
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
blkHead->dataLen += rowLen;
}
int32_t dataLen = blkHead->dataLen;
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
blkHead = POINTER_SHIFT(blkHead, schemaLen + dataLen);
/*submitBlk = blkHead;*/
}
ret->length = htonl(ret->length);
taosArrayDestroy(tagArray);
return ret;
}
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) { int8_t needCompress) {
// todo extract method // todo extract method
......
此差异已折叠。
...@@ -34,39 +34,31 @@ typedef struct SMnodeMgmt { ...@@ -34,39 +34,31 @@ typedef struct SMnodeMgmt {
SSingleWorker writeWorker; SSingleWorker writeWorker;
SSingleWorker syncWorker; SSingleWorker syncWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica;
bool stopped; bool stopped;
int32_t refCount; int32_t refCount;
TdThreadRwlock lock; TdThreadRwlock lock;
} SMnodeMgmt; } SMnodeMgmt;
// mmFile.c // mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed);
// mmInt.c
int32_t mmAcquire(SMnodeMgmt *pMgmt);
void mmRelease(SMnodeMgmt *pMgmt);
// mmHandle.c // mmHandle.c
SArray *mmGetMsgHandles(); SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg);
// mmWorker.c // mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt); int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) {
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 4096; int32_t maxLen = 4096;
...@@ -52,61 +52,54 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { ...@@ -52,61 +52,54 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
} }
*pDeployed = deployed->valueint; *pDeployed = deployed->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); cJSON *id = cJSON_GetObjectItem(root, "id");
if (mnodes != NULL) { if (id) {
if (!mnodes || mnodes->type != cJSON_Array) { if (id->type != cJSON_Number) {
dError("failed to read %s since nodes not found", file);
goto _OVER;
}
pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto _OVER;
}
for (int32_t i = 0; i < pMgmt->replica; ++i) {
cJSON *node = cJSON_GetArrayItem(mnodes, i);
if (node == NULL) break;
SReplica *pReplica = &pMgmt->replicas[i];
cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file); dError("failed to read %s since id not found", file);
goto _OVER; goto _OVER;
} }
if (pReplica) {
pReplica->id = id->valueint; pReplica->id = id->valueint;
}
}
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *fqdn = cJSON_GetObjectItem(root, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { if (fqdn) {
if (fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file); dError("failed to read %s since fqdn not found", file);
goto _OVER; goto _OVER;
} }
if (pReplica) {
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
}
}
cJSON *port = cJSON_GetObjectItem(node, "port"); cJSON *port = cJSON_GetObjectItem(root, "port");
if (!port || port->type != cJSON_Number) { if (port) {
if (port->type != cJSON_Number) {
dError("failed to read %s since port not found", file); dError("failed to read %s since port not found", file);
goto _OVER; goto _OVER;
} }
pReplica->port = port->valueint; if (pReplica) {
pReplica->port = (uint16_t)port->valueint;
} }
} }
code = 0; code = 0;
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
_OVER: _OVER:
if (content != NULL) taosMemoryFree(content); if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
if (code == 0) {
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
}
terrno = code; terrno = code;
return code; return code;
} }
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
...@@ -124,26 +117,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { ...@@ -124,26 +117,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char *content = taosMemoryCalloc(1, maxLen + 1); char *content = taosMemoryCalloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
if (pReplica != NULL && pReplica->id > 0) {
int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
if (replica > 0) {
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i];
if (pMsg != NULL) {
pReplica = &pMsg->replicas[i];
}
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); len += snprintf(content + len, maxLen - len, " \"port\": %u\n,", pReplica->port);
if (i < replica - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }],\n");
}
} }
}
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed);
len += snprintf(content + len, maxLen - len, "}\n"); len += snprintf(content + len, maxLen - len, "}\n");
......
...@@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) { ...@@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
SMnodeMgmt mgmt = {0}; SMnodeMgmt mgmt = {0};
mgmt.path = pInput->path; mgmt.path = pInput->path;
if (mmReadFile(&mgmt, required) != 0) { if (mmReadFile(&mgmt, NULL, required) != 0) {
return -1; return -1;
} }
...@@ -43,33 +43,19 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu ...@@ -43,33 +43,19 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
pOption->deploy = true; pOption->deploy = true;
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId; pOption->dnodeId = pMgmt->pData->dnodeId;
pOption->replica.id = 1;
pOption->replica = 1; pOption->replica.port = tsServerPort;
pOption->selfIndex = 0; tstrncpy(pOption->replica.fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1;
pReplica->port = tsServerPort;
tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
} }
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) {
pOption->deploy = false;
pOption->standby = false; pOption->standby = false;
pOption->deploy = false;
pOption->msgCb = pMgmt->msgCb; pOption->msgCb = pMgmt->msgCb;
pOption->dnodeId = pMgmt->pData->dnodeId; pOption->dnodeId = pMgmt->pData->dnodeId;
if (pReplica->id > 0) {
if (pMgmt->replica > 0) {
pOption->standby = true; pOption->standby = true;
pOption->replica = 1; pOption->replica = *pReplica;
pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0];
for (int32_t i = 0; i < pMgmt->replica; ++i) {
if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue;
pReplica->id = pMgmt->replicas[i].id;
pReplica->port = pMgmt->replicas[i].port;
memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN);
}
} }
} }
...@@ -105,12 +91,13 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -105,12 +91,13 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutRpcMsgToQueue; pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutMsgToQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL); taosThreadRwlockInit(&pMgmt->lock, NULL);
bool deployed = false; bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) { SReplica replica = {0};
if (mmReadFile(pMgmt, &replica, &deployed) != 0) {
dError("failed to read file since %s", terrstr()); dError("failed to read file since %s", terrstr());
mmClose(pMgmt); mmClose(pMgmt);
return -1; return -1;
...@@ -123,7 +110,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -123,7 +110,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
mmBuildOptionForDeploy(pMgmt, pInput, &option); mmBuildOptionForDeploy(pMgmt, pInput, &option);
} else { } else {
dInfo("mnode start to open"); dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option); mmBuildOptionForOpen(pMgmt, &replica, &option);
} }
pMgmt->pMnode = mndOpen(pMgmt->path, &option); pMgmt->pMnode = mndOpen(pMgmt->path, &option);
...@@ -141,8 +128,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -141,8 +128,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
} }
tmsgReportStartup("mnode-worker", "initialized"); tmsgReportStartup("mnode-worker", "initialized");
if (!deployed || pMgmt->replica > 0) { if (!deployed || replica.id > 0) {
pMgmt->replica = 0;
deployed = true; deployed = true;
if (mmWriteFile(pMgmt, NULL, deployed) != 0) { if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
...@@ -178,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() { ...@@ -178,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() {
return mgmtFunc; return mgmtFunc;
} }
int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}
\ No newline at end of file
...@@ -26,6 +26,9 @@ int32_t mndInitSma(SMnode *pMnode); ...@@ -26,6 +26,9 @@ int32_t mndInitSma(SMnode *pMnode);
void mndCleanupSma(SMnode *pMnode); void mndCleanupSma(SMnode *pMnode);
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma);
int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册