提交 67faf7cd 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 8a5e336
GIT_TAG 3c7dafe
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -2,6 +2,7 @@ package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -10,7 +11,7 @@ func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
defer conn.Close()
if err != nil {
fmt.Println("failed to connect, err:", err)
log.Fatalln("failed to connect, err:", err)
} else {
fmt.Println("connected")
}
......
......@@ -3,6 +3,7 @@ package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
......@@ -11,7 +12,7 @@ func main() {
var taosDSN = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
log.Fatalln("failed to connect TDengine, err:", err)
return
}
fmt.Println("Connected")
......
......@@ -3,6 +3,7 @@ package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
......@@ -11,7 +12,7 @@ func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
log.Fatalln("failed to connect TDengine, err:", err)
return
}
fmt.Println("Connected")
......
package main
import (
"fmt"
"github.com/taosdata/driver-go/v3/wrapper"
)
func main() {
conn, err := wrapper.TaosConnect("localhost", "root", "taosdata", "", 6030)
defer wrapper.TaosClose(conn)
if err != nil {
fmt.Println("fail to connect, err:", err)
} else {
fmt.Println("connected")
}
}
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -20,7 +20,7 @@ func prepareDatabase(conn *af.Connector) {
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
log.Fatalln("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
......@@ -32,6 +32,6 @@ func main() {
err = conn.OpenTSDBInsertJsonPayload(payload)
if err != nil {
fmt.Println("insert error:", err)
log.Fatalln("insert error:", err)
}
}
......@@ -2,6 +2,7 @@ package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -33,6 +34,6 @@ func main() {
err = conn.InfluxDBInsertLines(lines, "ms")
if err != nil {
fmt.Println("insert error:", err)
log.Fatalln("insert error:", err)
}
}
......@@ -3,6 +3,7 @@ package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
......@@ -10,28 +11,26 @@ import (
func createStable(taos *sql.DB) {
_, err := taos.Exec("CREATE DATABASE power")
if err != nil {
fmt.Println("failed to create database, err:", err)
log.Fatalln("failed to create database, err:", err)
}
_, err = taos.Exec("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
if err != nil {
fmt.Println("failed to create stable, err:", err)
log.Fatalln("failed to create stable, err:", err)
}
}
func insertData(taos *sql.DB) {
sql := `INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
sql := `INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)`
result, err := taos.Exec(sql)
if err != nil {
fmt.Println("failed to insert, err:", err)
return
log.Fatalln("failed to insert, err:", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
fmt.Println("failed to get affected rows, err:", err)
return
log.Fatalln("failed to get affected rows, err:", err)
}
fmt.Println("RowsAffected", rowsAffected)
}
......@@ -40,8 +39,7 @@ func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
createStable(taos)
......
......@@ -5,8 +5,8 @@ import (
"time"
"github.com/taosdata/driver-go/v3/af"
"github.com/taosdata/driver-go/v3/af/param"
"github.com/taosdata/driver-go/v3/common"
"github.com/taosdata/driver-go/v3/common/param"
)
func checkErr(err error, prompt string) {
......
package main
import (
"fmt"
"log"
"github.com/taosdata/driver-go/v3/af"
)
......@@ -20,7 +20,7 @@ func prepareDatabase(conn *af.Connector) {
func main() {
conn, err := af.Open("localhost", "root", "taosdata", "", 6030)
if err != nil {
fmt.Println("fail to connect, err:", err)
log.Fatalln("fail to connect, err:", err)
}
defer conn.Close()
prepareDatabase(conn)
......@@ -37,6 +37,6 @@ func main() {
err = conn.OpenTSDBInsertTelnetLines(lines)
if err != nil {
fmt.Println("insert error:", err)
log.Fatalln("insert error:", err)
}
}
......@@ -2,7 +2,7 @@ package main
import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
......@@ -12,14 +12,12 @@ func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/power"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
log.Fatalln("failed to connect TDengine, err:", err)
}
defer taos.Close()
rows, err := taos.Query("SELECT ts, current FROM meters LIMIT 2")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
log.Fatalln("failed to select from table, err:", err)
}
defer rows.Close()
......@@ -30,9 +28,9 @@ func main() {
}
err := rows.Scan(&r.ts, &r.current)
if err != nil {
fmt.Println("scan error:\n", err)
log.Fatalln("scan error:\n", err)
return
}
fmt.Println(r.ts, r.current)
log.Fatalln(r.ts, r.current)
}
}
......@@ -3,6 +3,6 @@
```
:::tip
driver-go 的模块 `github.com/taosdata/driver-go/v2/wrapper` 是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。
driver-go 的模块 `github.com/taosdata/driver-go/v3/wrapper` 是 C 接口的底层封装。使用这个模块也可以实现参数绑定写入。
:::
......@@ -147,7 +147,7 @@ import (
"fmt"
"time"
_ "github.com/taosdata/driver-go/v2/taosSql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
type config struct {
......
......@@ -103,7 +103,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
* @return
*/
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model);
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model);
/**
*
......
......@@ -202,6 +202,7 @@ bool fmIsForbidStreamFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId);
bool fmIsInterpFunc(int32_t funcId);
bool fmIsLastRowFunc(int32_t funcId);
bool fmIsReturnNotNullFunc(int32_t funcId);
bool fmIsSelectValueFunc(int32_t funcId);
bool fmIsSystemInfoFunc(int32_t funcId);
bool fmIsImplicitTsFunc(int32_t funcId);
......
......@@ -43,11 +43,12 @@ typedef struct SRpcHandleInfo {
// rpc info
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
STraceId traceId;
int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp)
int8_t persistHandle; // persist handle or not
int8_t hasEpSet;
STraceId traceId;
// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
......@@ -69,8 +70,9 @@ typedef struct SRpcMsg {
SRpcHandleInfo info;
} SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
......@@ -84,12 +86,15 @@ typedef struct SRpcInit {
// the following is for client app ecurity only
char *user; // user name
// call back to process incoming msg, code shall be ignored by server app
// call back to process incoming msg
RpcCfp cfp;
// user defined retry func
// retry not not for particular msg
RpcRfp rfp;
// set up timeout for particular msg
RpcTfp tfp;
void *parent;
} SRpcInit;
......
......@@ -46,6 +46,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
#define TSDB_CODE_RPC_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0018)
#define TSDB_CODE_RPC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0019)
//common & util
#define TSDB_CODE_TIME_UNSYNCED TAOS_DEF_ERROR_CODE(0, 0x0013)
......
......@@ -109,6 +109,14 @@ static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
}
}
// start timer for particular msgType
static bool clientRpcTfp(int32_t code, tmsg_t msgType) {
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_CREATE_TABLE) {
return true;
}
return false;
}
// TODO refactor
void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit;
......@@ -118,6 +126,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp;
rpcInit.tfp = clientRpcTfp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
......
......@@ -990,7 +990,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model);
char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
......
......@@ -341,7 +341,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) {
assert(pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
......
......@@ -1437,7 +1437,8 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
pAggInfo->groupId = groupId;
}
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
struct SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowCellOffset);
if (!isRowEntryInitialized(pResInfo)) {
......@@ -1447,6 +1448,15 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
}
if (fmIsReturnNotNullFunc(pCtx[j].functionId)) {
returnNotNull = true;
}
}
// if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
// except for first/last, which require not null output, output no rows
if (pRow->numOfRows == 0 && !returnNotNull) {
pRow->numOfRows = 1;
}
}
......@@ -1458,7 +1468,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset);
if (pRow->numOfRows == 0) {
releaseBufPage(pBuf, page);
return 0;
......@@ -1514,7 +1524,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset);
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseBufPage(pBuf, page);
......@@ -4492,7 +4502,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
const char* sql, EOPTR_EXEC_MODEL model) {
char* sql, EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -4503,6 +4513,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
(*pTaskInfo)->sql = sql;
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList,
pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
......@@ -4515,6 +4526,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
return code;
_complete:
taosMemoryFree(sql);
doDestroyTask(*pTaskInfo);
terrno = code;
return code;
......
......@@ -64,6 +64,9 @@ typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
STuplePos tuplePos;
STuplePos nullTuplePos;
bool nullTupleSaved;
} SMinmaxResInfo;
typedef struct STopBotResItem {
......@@ -75,6 +78,10 @@ typedef struct STopBotResItem {
typedef struct STopBotRes {
int32_t maxSize;
int16_t type;
STuplePos nullTuplePos;
bool nullTupleSaved;
STopBotResItem* pItems;
} STopBotRes;
......@@ -221,6 +228,10 @@ typedef struct SSampleInfo {
int32_t numSampled;
uint8_t colType;
int16_t colBytes;
STuplePos nullTuplePos;
bool nullTupleSaved;
char* data;
STuplePos* tuplePos;
} SSampleInfo;
......@@ -1134,6 +1145,9 @@ bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
SMinmaxResInfo* buf = GET_ROWCELL_INTERBUF(pResultInfo);
buf->assign = false;
buf->tuplePos.pageId = -1;
buf->nullTupleSaved = false;
buf->nullTuplePos.pageId = -1;
return true;
}
......@@ -1575,6 +1589,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
_min_max_over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos);
pBuf->nullTupleSaved = true;
}
return numOfElems;
}
......@@ -1615,7 +1633,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (pEntryInfo->numOfRes > 0) {
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
} else {
setNullSelectivityValue(pCtx, pBlock, currentRow);
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
}
return pEntryInfo->numOfRes;
......@@ -3366,6 +3384,8 @@ bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pRes->maxSize = pCtx->param[1].param.i;
pRes->nullTupleSaved = false;
pRes->nullTuplePos.pageId = -1;
return true;
}
......@@ -3403,6 +3423,10 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
}
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
pRes->nullTupleSaved = true;
}
return TSDB_CODE_SUCCESS;
}
......@@ -3427,6 +3451,11 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
}
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
pRes->nullTupleSaved = true;
}
return TSDB_CODE_SUCCESS;
}
......@@ -3625,6 +3654,11 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
// todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows;
if (pEntryInfo->numOfRes <= 0) {
colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
return pEntryInfo->numOfRes;
}
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STopBotResItem* pItem = &pRes->pItems[i];
if (type == TSDB_DATA_TYPE_FLOAT) {
......@@ -4897,7 +4931,8 @@ bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo)
pInfo->numSampled = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
pInfo->nullTuplePos.pageId = -1;
pInfo->nullTupleSaved = false;
pInfo->data = (char*)pInfo + sizeof(SSampleInfo);
pInfo->tuplePos = (STuplePos*)((char*)pInfo + sizeof(SSampleInfo) + pInfo->samples * pInfo->colBytes);
......@@ -4943,6 +4978,11 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
doReservoirSample(pCtx, pInfo, data, i);
}
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos);
pInfo->nullTupleSaved = true;
}
SET_VAL(pResInfo, pInfo->numSampled, pInfo->numSampled);
return TSDB_CODE_SUCCESS;
}
......@@ -4957,6 +4997,11 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t currentRow = pBlock->info.rows;
if (pInfo->numSampled == 0) {
colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
return pInfo->numSampled;
}
for (int32_t i = 0; i < pInfo->numSampled; ++i) {
colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false);
setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i);
......
......@@ -221,6 +221,18 @@ bool fmIsLastRowFunc(int32_t funcId) {
return FUNCTION_TYPE_LAST_ROW == funcMgtBuiltins[funcId].type;
}
bool fmIsReturnNotNullFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
}
return FUNCTION_TYPE_LAST == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_LAST_PARTIAL == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_LAST_MERGE == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_FIRST == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_FIRST_PARTIAL == funcMgtBuiltins[funcId].type ||
FUNCTION_TYPE_FIRST_MERGE == funcMgtBuiltins[funcId].type;
}
bool fmIsSelectValueFunc(int32_t funcId) {
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return false;
......
......@@ -25,7 +25,7 @@ extern "C" {
int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF);
int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql);
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql);
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
......
......@@ -508,7 +508,7 @@ _return:
}
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char* sql) {
int32_t code = 0;
bool queryRsped = false;
SSubplan *plan = NULL;
......@@ -536,6 +536,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
}
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
sql = NULL;
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
......@@ -561,6 +562,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) {
_return:
taosMemoryFree(sql);
input.code = code;
input.msgType = qwMsg->msgType;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
......
......@@ -95,8 +95,9 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
typedef SRpcMsg STransMsg;
typedef SRpcCtx STransCtx;
......
......@@ -53,6 +53,7 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType);
int index;
void* parent;
......
......@@ -48,6 +48,7 @@ void* rpcOpen(const SRpcInit* pInit) {
// register callback handle
pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp;
if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
......
......@@ -24,6 +24,7 @@ typedef struct SCliConn {
uv_connect_t connReq;
uv_stream_t* stream;
queue wreqQueue;
uv_timer_t* timer;
void* hostThrd;
......@@ -65,12 +66,13 @@ typedef struct SCliThrd {
int64_t pid; // pid
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_idle_t* idle;
uv_prepare_t* prepare;
uv_timer_t timer;
void* pool; // conn pool
SArray* timerList;
// msg queue
queue msg;
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
......@@ -108,6 +110,8 @@ static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r;
}
// register timer for read
static void cliReadTimeoutCb(uv_timer_t* handle);
// register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv
......@@ -330,6 +334,16 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (conn->timer) {
if (uv_is_active((uv_handle_t*)conn->timer)) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
uv_timer_stop(conn->timer);
}
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
}
STransMsgHead* pHead = NULL;
transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
pHead->code = htonl(pHead->code);
......@@ -409,7 +423,7 @@ void cliHandleResp(SCliConn* conn) {
uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
}
void cliHandleExcept(SCliConn* pConn) {
void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
if (transQueueEmpty(&pConn->cliMsgs)) {
if (pConn->broken == true && CONN_NO_PERSIST_BY_APP(pConn)) {
tTrace("%s conn %p handle except, persist:0", CONN_GET_INST_LABEL(pConn), pConn);
......@@ -428,7 +442,7 @@ void cliHandleExcept(SCliConn* pConn) {
STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL;
STransMsg transMsg = {0};
transMsg.code = pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL;
......@@ -459,31 +473,17 @@ void cliHandleExcept(SCliConn* pConn) {
} while (!transQueueEmpty(&pConn->cliMsgs));
transUnrefCliHandle(pConn);
}
void cliHandleExcept(SCliConn* conn) {
tTrace("%s conn %p except ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
cliHandleExceptImpl(conn, -1);
}
// void cliTimeoutCb(uv_timer_t* handle) {
// SCliThrd* pThrd = handle->data;
// STrans* pTransInst = pThrd->pTransInst;
// int64_t currentTime = pThrd->nextTimeout;
// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label);
//
// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL);
// while (p != NULL) {
// while (!QUEUE_IS_EMPTY(&p->conn)) {
// queue* h = QUEUE_HEAD(&p->conn);
// SCliConn* c = QUEUE_DATA(h, SCliConn, q);
// if (c->expireTime < currentTime) {
// QUEUE_REMOVE(h);
// transUnrefCliHandle(c);
// } else {
// break;
// }
// }
// p = taosHashIterate((SHashObj*)pThrd->pool, p);
// }
//
// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0);
// }
void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb
SCliConn* conn = handle->data;
tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
}
void* createConnPool(int size) {
// thread local, no lock
......@@ -654,13 +654,23 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
return conn;
}
static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
if (conn->task != NULL) {
transDQCancel(pThrd->timeoutQueue, conn->task);
conn->task = NULL;
}
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
......@@ -673,8 +683,15 @@ static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return;
}
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
if (conn->timer != NULL) {
uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip);
conn->stream->data = NULL;
......@@ -764,6 +781,19 @@ void cliSend(SCliConn* pConn) {
CONN_SET_PERSIST_BY_APP(pConn);
}
if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) {
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) {
tDebug("no avaiable timer, create");
timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, timer);
}
timer->data = pConn;
pConn->timer = timer;
tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType));
uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0);
}
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue);
uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
return;
......@@ -781,8 +811,8 @@ void cliConnCb(uv_connect_t* req, int status) {
}
// int addrlen = sizeof(pConn->addr);
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
int addrlen = sizeof(peername);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
transGetSockDebugInfo(&peername, pConn->dst);
......@@ -806,7 +836,6 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg);
destroyConnPool(pThrd->pool);
uv_timer_stop(&pThrd->timer);
uv_walk(pThrd->loop, cliWalkCb, NULL);
}
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
......@@ -879,8 +908,8 @@ void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr) {
}
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pCtx->epSet)) {
......@@ -966,36 +995,6 @@ static void cliAsyncCb(uv_async_t* handle) {
}
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
}
static void cliIdleCb(uv_idle_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("do idle work");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("prepare work start");
......@@ -1085,19 +1084,20 @@ static SCliThrd* createThrdObj() {
uv_loop_init(pThrd->loop);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb);
uv_timer_init(pThrd->loop, &pThrd->timer);
pThrd->timer.data = pThrd;
// pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t));
// uv_idle_init(pThrd->loop, pThrd->idle);
// pThrd->idle->data = pThrd;
// uv_idle_start(pThrd->idle, cliIdleCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t timerSize = 512;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
uv_timer_init(pThrd->loop, timer);
taosArrayPush(pThrd->timerList, &timer);
}
pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue);
......@@ -1120,7 +1120,12 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyCmsg);
transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->idle);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
taosMemoryFree(timer);
}
taosArrayDestroy(pThrd->timerList);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
......
......@@ -333,7 +333,7 @@ int32_t taosWriteMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
return -1;
}
int32_t nleft, nwritten;
char * ptr = (char *)buf;
char *ptr = (char *)buf;
nleft = nbytes;
......@@ -362,7 +362,7 @@ int32_t taosReadMsg(TdSocketPtr pSocket, void *buf, int32_t nbytes) {
return -1;
}
int32_t nleft, nread;
char * ptr = (char *)buf;
char *ptr = (char *)buf;
nleft = nbytes;
......@@ -912,7 +912,7 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr * sa = result->ai_addr;
struct sockaddr *sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr;
......
......@@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish c
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
......
#!/bin/bash
set -e
taosd >>/dev/null 2>&1 &
taosadapter >>/dev/null 2>&1 &
cd ../../docs/examples/go
go mod tidy
go run ./connect/afconn/main.go
go run ./connect/cgoexample/main.go
go run ./connect/restexample/main.go
taos -s "drop database if exists test"
go run ./insert/json/main.go
taos -s "drop database if exists test"
go run ./insert/line/main.go
taos -s "drop database if exists power"
go run ./insert/sql/main.go
taos -s "drop database if exists power"
go run ./insert/stmt/main.go
taos -s "drop database if exists test"
go run ./insert/telnet/main.go
go run ./query/sync/main.go
taos -s "drop topic if exists example_tmq_topic"
taos -s "drop database if exists example_tmq"
go run ./sub/main.go
......@@ -327,8 +327,8 @@ endi
print =================>td-2610
sql select stddev(k) from tm2 where ts='2020-12-29 18:46:19.109'
if $rows != 0 then
print expect 0, actual:$rows
if $rows != 1 then
print expect 1, actual:$rows
return -1
endi
sql select twa(k) from tm2 where ts='2020-12-29 18:46:19.109'
......@@ -406,7 +406,7 @@ if $data00 != 1.378704626 then
endi
sql select stddev(c) from m1
if $rows != 0 then
if $rows != 1 then
return -1
endi
......
......@@ -295,7 +295,7 @@ class TDTestCase:
tdSql.checkData(0, 0, 4.500000000)
tdSql.query(f" select avg(c1) from {dbname}.stb1 where c1 is null ")
tdSql.checkRows(0)
tdSql.checkRows(1)
def avg_func_filter(self, dbname="db"):
......
......@@ -86,7 +86,7 @@ class TDTestCase:
def distribute_agg_query(self, dbname="testdb"):
# basic filter
tdSql.query(f"select apercentile(c1 , 20) from {dbname}.stb1 where c1 is null")
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.query(f"select apercentile(c1 , 20) from {dbname}.stb1 where t1=1")
tdSql.checkData(0,0,2.800000000)
......
......@@ -168,7 +168,7 @@ class TDTestCase:
def distribute_agg_query(self, dbname="testdb"):
# basic filter
tdSql.query(f"select max(c1) from {dbname}.stb1 where c1 is null")
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.query(f"select max(c1) from {dbname}.stb1 where t1=1")
tdSql.checkData(0,0,10)
......
......@@ -167,7 +167,7 @@ class TDTestCase:
def distribute_agg_query(self, dbname="testdb"):
# basic filter
tdSql.query(f"select min(c1) from {dbname}.stb1 where c1 is null")
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.query(f"select min(c1) from {dbname}.stb1 where t1=1")
tdSql.checkData(0,0,2)
......
......@@ -195,7 +195,7 @@ class TDTestCase:
def distribute_agg_query(self):
# basic filter
tdSql.query("select spread(c1) from stb1 where c1 is null")
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.query("select spread(c1) from stb1 where t1=1")
tdSql.checkData(0,0,8.000000000)
......
......@@ -743,7 +743,7 @@ class TDTestCase:
# filter data
tdSql.query(" select sample(c1, 20 ) from t1 where c1 is null ")
tdSql.checkRows(0)
tdSql.checkRows(1)
tdSql.query(" select sample(c1, 20 ) from t1 where c1 =6 ")
tdSql.checkRows(1)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册