提交 b57c8136 编写于 作者: H Haojun Liao

other: merge feature/3_liaohj

......@@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG cb1e89c
GIT_TAG e02ddb2
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 149ac34
GIT_TAG 0681d8b
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
-DLINUX
-DWEBSOCKET
-I/usr/include
-Iinclude
-Iinclude/os
-Iinclude/common
-Iinclude/util
-Iinclude/libs/transport
-Itools/shell/inc
......@@ -178,7 +178,7 @@ Active: inactive (dead)
:::
## TDengine 命令行(CLI)
**TDengine 命令行(CLI)**
为便于检查 TDengine 的状态,执行数据库(Database)的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI)taos。要进入 TDengine 命令行,您只要在终端执行 `taos` 即可。
......@@ -188,7 +188,7 @@ Active: inactive (dead)
安装后,可以在拥有管理员权限的 cmd 窗口执行 `sc start taosd` 或在 `C:\TDengine` 目录下,运行 `taosd.exe` 来启动 TDengine 服务进程。
## TDengine 命令行(CLI)
**TDengine 命令行(CLI)**
为便于检查 TDengine 的状态,执行数据库(Database)的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI)taos。要进入 TDengine 命令行,您只要在终端执行 `taos` 即可。
......@@ -215,7 +215,7 @@ Active: inactive (dead)
:::
## TDengine 命令行(CLI)
**TDengine 命令行(CLI)**
为便于检查 TDengine 的状态,执行数据库(Database)的各种即席(Ad Hoc)查询,TDengine 提供一命令行应用程序(以下简称为 TDengine CLI)taos。要进入 TDengine 命令行,您只要在 Windows 终端的 C:\TDengine 目录下,运行 taos.exe 来启动 TDengine 命令行。
......
......@@ -231,7 +231,7 @@ bit_add 实现多列的按位与功能。如果只有一列,返回这一列。
</details>
### 聚合函数示例 [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c)
### 聚合函数示例1 返回值为数值类型 [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c)
l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先平方,再累加求和,最后开方。
......@@ -243,3 +243,29 @@ l2norm 实现了输入列的所有数据的二阶范数,即对每个数据先
```
</details>
### 聚合函数示例2 返回值为字符串类型 [max_vol](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/max_vol.c)
max_vol 实现了从多个输入的电压列中找到最大电压,返回由设备ID + 最大电压所在(行,列)+ 最大电压值 组成的组合字符串值
创建表:
```bash
create table battery(ts timestamp, vol1 float, vol2 float, vol3 float, deviceId varchar(16));
```
创建自定义函数:
```bash
create aggregate function max_vol as '/root/udf/libmaxvol.so' outputtype binary(64) bufsize 10240 language 'C';
```
使用自定义函数:
```bash
select max_vol(vol1,vol2,vol3,deviceid) from battery;
```
<details>
<summary>max_vol.c</summary>
```c
{{#include tests/script/sh/max_vol.c}}
```
</details>
\ No newline at end of file
......@@ -208,15 +208,12 @@ typedef struct SSDataBlock {
} SSDataBlock;
enum {
FETCH_TYPE__DATA = 1,
FETCH_TYPE__META,
FETCH_TYPE__SEP,
FETCH_TYPE__DATA = 0,
FETCH_TYPE__NONE,
};
typedef struct {
int8_t fetchType;
STqOffsetVal offset;
union {
SSDataBlock data;
void* meta;
......
......@@ -104,6 +104,7 @@ extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row lo
// query client
extern int32_t tsQueryPolicy;
extern int32_t tsQueryRspPolicy;
extern int64_t tsQueryMaxConcurrentTables;
extern int32_t tsQuerySmaOptimize;
extern int32_t tsQueryRsmaTolerance;
extern bool tsQueryPlannerTrace;
......
......@@ -177,6 +177,12 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_USER_SYSINFO 0xA
#define TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC 0xB
#define TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC 0xC
#define TSDB_ALTER_USER_ADD_READ_TABLE 0xD
#define TSDB_ALTER_USER_REMOVE_READ_TABLE 0xE
#define TSDB_ALTER_USER_ADD_WRITE_TABLE 0xF
#define TSDB_ALTER_USER_REMOVE_WRITE_TABLE 0x10
#define TSDB_ALTER_USER_ADD_ALL_TABLE 0x11
#define TSDB_ALTER_USER_REMOVE_ALL_TABLE 0x12
#define TSDB_ALTER_USER_PRIVILEGES 0x2
......@@ -676,6 +682,9 @@ typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_USET_PASSWORD_LEN];
char objname[TSDB_DB_FNAME_LEN]; // db or topic
char tabName[TSDB_TABLE_NAME_LEN];
char* tagCond;
int32_t tagCondLen;
} SAlterUserReq;
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
......@@ -698,6 +707,9 @@ typedef struct {
SHashObj* createdDbs;
SHashObj* readDbs;
SHashObj* writeDbs;
SHashObj* readTbs;
SHashObj* writeTbs;
SHashObj* useDbs;
} SGetUserAuthRsp;
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
......
......@@ -63,55 +63,55 @@
#define TK_READ 45
#define TK_WRITE 46
#define TK_NK_DOT 47
#define TK_DNODE 48
#define TK_PORT 49
#define TK_DNODES 50
#define TK_NK_IPTOKEN 51
#define TK_FORCE 52
#define TK_LOCAL 53
#define TK_QNODE 54
#define TK_BNODE 55
#define TK_SNODE 56
#define TK_MNODE 57
#define TK_DATABASE 58
#define TK_USE 59
#define TK_FLUSH 60
#define TK_TRIM 61
#define TK_COMPACT 62
#define TK_IF 63
#define TK_NOT 64
#define TK_EXISTS 65
#define TK_BUFFER 66
#define TK_CACHEMODEL 67
#define TK_CACHESIZE 68
#define TK_COMP 69
#define TK_DURATION 70
#define TK_NK_VARIABLE 71
#define TK_MAXROWS 72
#define TK_MINROWS 73
#define TK_KEEP 74
#define TK_PAGES 75
#define TK_PAGESIZE 76
#define TK_TSDB_PAGESIZE 77
#define TK_PRECISION 78
#define TK_REPLICA 79
#define TK_VGROUPS 80
#define TK_SINGLE_STABLE 81
#define TK_RETENTIONS 82
#define TK_SCHEMALESS 83
#define TK_WAL_LEVEL 84
#define TK_WAL_FSYNC_PERIOD 85
#define TK_WAL_RETENTION_PERIOD 86
#define TK_WAL_RETENTION_SIZE 87
#define TK_WAL_ROLL_PERIOD 88
#define TK_WAL_SEGMENT_SIZE 89
#define TK_STT_TRIGGER 90
#define TK_TABLE_PREFIX 91
#define TK_TABLE_SUFFIX 92
#define TK_NK_COLON 93
#define TK_MAX_SPEED 94
#define TK_START 95
#define TK_WITH 96
#define TK_WITH 48
#define TK_DNODE 49
#define TK_PORT 50
#define TK_DNODES 51
#define TK_NK_IPTOKEN 52
#define TK_FORCE 53
#define TK_LOCAL 54
#define TK_QNODE 55
#define TK_BNODE 56
#define TK_SNODE 57
#define TK_MNODE 58
#define TK_DATABASE 59
#define TK_USE 60
#define TK_FLUSH 61
#define TK_TRIM 62
#define TK_COMPACT 63
#define TK_IF 64
#define TK_NOT 65
#define TK_EXISTS 66
#define TK_BUFFER 67
#define TK_CACHEMODEL 68
#define TK_CACHESIZE 69
#define TK_COMP 70
#define TK_DURATION 71
#define TK_NK_VARIABLE 72
#define TK_MAXROWS 73
#define TK_MINROWS 74
#define TK_KEEP 75
#define TK_PAGES 76
#define TK_PAGESIZE 77
#define TK_TSDB_PAGESIZE 78
#define TK_PRECISION 79
#define TK_REPLICA 80
#define TK_VGROUPS 81
#define TK_SINGLE_STABLE 82
#define TK_RETENTIONS 83
#define TK_SCHEMALESS 84
#define TK_WAL_LEVEL 85
#define TK_WAL_FSYNC_PERIOD 86
#define TK_WAL_RETENTION_PERIOD 87
#define TK_WAL_RETENTION_SIZE 88
#define TK_WAL_ROLL_PERIOD 89
#define TK_WAL_SEGMENT_SIZE 90
#define TK_STT_TRIGGER 91
#define TK_TABLE_PREFIX 92
#define TK_TABLE_SUFFIX 93
#define TK_NK_COLON 94
#define TK_MAX_SPEED 95
#define TK_START 96
#define TK_TIMESTAMP 97
#define TK_END 98
#define TK_TABLE 99
......@@ -127,24 +127,24 @@
#define TK_NK_EQ 109
#define TK_USING 110
#define TK_TAGS 111
#define TK_COMMENT 112
#define TK_BOOL 113
#define TK_TINYINT 114
#define TK_SMALLINT 115
#define TK_INT 116
#define TK_INTEGER 117
#define TK_BIGINT 118
#define TK_FLOAT 119
#define TK_DOUBLE 120
#define TK_BINARY 121
#define TK_NCHAR 122
#define TK_UNSIGNED 123
#define TK_JSON 124
#define TK_VARCHAR 125
#define TK_MEDIUMBLOB 126
#define TK_BLOB 127
#define TK_VARBINARY 128
#define TK_DECIMAL 129
#define TK_BOOL 112
#define TK_TINYINT 113
#define TK_SMALLINT 114
#define TK_INT 115
#define TK_INTEGER 116
#define TK_BIGINT 117
#define TK_FLOAT 118
#define TK_DOUBLE 119
#define TK_BINARY 120
#define TK_NCHAR 121
#define TK_UNSIGNED 122
#define TK_JSON 123
#define TK_VARCHAR 124
#define TK_MEDIUMBLOB 125
#define TK_BLOB 126
#define TK_VARBINARY 127
#define TK_DECIMAL 128
#define TK_COMMENT 129
#define TK_MAX_DELAY 130
#define TK_WATERMARK 131
#define TK_ROLLUP 132
......
......@@ -29,6 +29,7 @@ extern "C" {
#include "tmsg.h"
#include "tname.h"
#include "transport.h"
#include "nodes.h"
typedef struct SCatalog SCatalog;
......@@ -49,10 +50,15 @@ typedef enum {
typedef struct SUserAuthInfo {
char user[TSDB_USER_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
SName tbName;
AUTH_TYPE type;
} SUserAuthInfo;
typedef struct SUserAuthRes {
bool pass;
SNode* pCond;
} SUserAuthRes;
typedef struct SDbInfo {
int32_t vgVer;
int32_t tbNum;
......@@ -96,7 +102,7 @@ typedef struct SMetaData {
SArray* pTableIndex; // pRes = SArray<STableIndexInfo>*
SArray* pUdfList; // pRes = SFuncInfo*
SArray* pIndex; // pRes = SIndexInfo*
SArray* pUser; // pRes = bool*
SArray* pUser; // pRes = SUserAuthRes*
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
SArray* pTableCfg; // pRes = STableCfg*
SArray* pDnodeList; // pRes = SArray<SEpSet>*
......@@ -312,11 +318,9 @@ int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp* pRsp);
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type,
bool* pass);
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pAuth, SUserAuthRes* pRes);
int32_t catalogChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool* pass,
bool* exists);
int32_t catalogChkAuthFromCache(SCatalog* pCtg, SUserAuthInfo *pAuth, SUserAuthRes* pRes, bool* exists);
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
......
......@@ -26,6 +26,7 @@ extern "C" {
typedef void* qTaskInfo_t;
typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
......@@ -91,7 +92,9 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor
void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
......@@ -120,7 +123,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
* @param isAdd
* @return
*/
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList);
/**
* Create the exec task object according to task json
......@@ -164,6 +167,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode);
bool qTaskIsExecuting(qTaskInfo_t qinfo);
......@@ -183,28 +187,18 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key);
/**
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
* @param tinfo
* @param uid
* @param ts
* @return
*/
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
// int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver);
//
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
void qStreamSetOpen(qTaskInfo_t tinfo);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
void qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
int64_t qStreamExtractPrepareUid(qTaskInfo_t tinfo);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo);
......
......@@ -458,7 +458,9 @@ typedef struct SGrantStmt {
ENodeType type;
char userName[TSDB_USER_LEN];
char objName[TSDB_DB_NAME_LEN]; // db or topic
char tabName[TSDB_TABLE_NAME_LEN];
int64_t privileges;
SNode* pTagCond;
} SGrantStmt;
typedef SGrantStmt SRevokeStmt;
......
......@@ -298,6 +298,7 @@ typedef struct SSelectStmt {
bool hasUniqueFunc;
bool hasTailFunc;
bool hasInterpFunc;
bool hasInterpPseudoColFunc;
bool hasLastRowFunc;
bool hasLastFunc;
bool hasTimeLineFunc;
......
......@@ -194,6 +194,7 @@ typedef struct SRequestConnInfo {
typedef void (*__freeFunc)(void* param);
// todo add creator/destroyer function
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; // async callback function
STargetInfo target; // for update epset
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "os.h"
#include "executor.h"
#include "query.h"
#include "streamState.h"
#include "tdatablock.h"
......@@ -31,6 +31,7 @@ extern "C" {
#ifndef _STREAM_H_
#define _STREAM_H_
typedef void (*_free_reader_fn_t)(void*);
typedef struct SStreamTask SStreamTask;
enum {
......@@ -50,6 +51,7 @@ enum {
TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2,
TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint
};
enum {
......@@ -103,21 +105,8 @@ typedef struct {
int8_t type;
} SStreamQueueItem;
#if 0
typedef struct {
int8_t type;
int64_t ver;
int32_t* dataRef;
SSubmitReq* data;
} SStreamDataSubmit;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* reqs; // SArray<SSubmitReq*>
} SStreamMergedSubmit;
#endif
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
typedef struct {
int8_t type;
......@@ -219,36 +208,21 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
}
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
//
return queue->qItem;
}
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) {
ASSERT(queue->qItem != NULL);
return streamQueueCurItem(queue);
} else {
queue->qItem = NULL;
taosGetQitem(queue->qall, &queue->qItem);
if (queue->qItem == NULL) {
taosReadAllQitems(queue->queue, queue->qall);
taosGetQitem(queue->qall, &queue->qItem);
}
return streamQueueCurItem(queue);
}
}
void* streamQueueNextItem(SStreamQueue* queue);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit);
void streamDataSubmitRefDec(SStreamDataSubmit2* pDataSubmit);
SStreamDataSubmit2* streamSubmitRefClone(SStreamDataSubmit2* pSubmit);
SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit);
typedef struct {
char* qmsg;
// followings are not applicable to encoder and decoder
void* executor;
void* pExecutor; // not applicable to encoder and decoder
struct STqReader* pTqReader; // not applicable to encoder and decoder
struct SWalReader* pWalReader; // not applicable to encoder and decoder
} STaskExec;
typedef struct {
......@@ -263,14 +237,11 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder
void* vnode;
void* vnode; // not available to encoder and decoder
FTbSink* tbSinkFunc;
STSchema* pTSchema;
} STaskSinkTb;
......@@ -295,24 +266,34 @@ typedef struct {
SEpSet epSet;
} SStreamChildEpInfo;
struct SStreamTask {
typedef struct SStreamId {
int64_t streamId;
int32_t taskId;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
const char* idStr;
} SStreamId;
typedef struct SCheckpointInfo {
int64_t id;
int64_t version; // offset in WAL
} SCheckpointInfo;
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t schedStatus;
} SStreamStatus;
// node info
struct SStreamTask {
SStreamId id;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId;
SEpSet epSet;
int64_t recoverSnapVer;
int64_t startVer;
SCheckpointInfo chkInfo;
STaskExec exec;
// fill history
int8_t fillHistory;
......@@ -322,9 +303,6 @@ struct SStreamTask {
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// exec
STaskExec exec;
// output
union {
STaskDispatcherFixedEp fixedEpDispatcher;
......@@ -336,9 +314,6 @@ struct SStreamTask {
int8_t inputStatus;
int8_t outputStatus;
// STaosQueue* inputQueue1;
// STaosQall* inputQall;
SStreamQueue* inputQueue;
SStreamQueue* outputQueue;
......@@ -346,70 +321,47 @@ struct SStreamTask {
int8_t triggerStatus;
int64_t triggerParam;
void* timer;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
// msg handle
SMsgCb* pMsgCb;
// state backend
SStreamState* pState;
// do not serialize
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
int64_t checkpointingId;
int32_t checkpointAlignCnt;
struct SStreamMeta* pMeta;
_free_reader_fn_t freeFp;
};
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pWalReadTasks;
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
int8_t walScan;
bool quit;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewSStreamTask(int64_t streamId);
int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
int32_t code = 0;
int8_t type = pItem->type;
if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem);
if (pSubmitClone == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone,
pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver);
code = taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
// qStreamInput(pTask->exec.executor, pSubmitClone);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) {
code = taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__CHECKPOINT) {
code = taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
} else if (type == STREAM_INPUT__GET_RES) {
code = taosWriteQitem(pTask->inputQueue->queue, pItem);
// qStreamInput(pTask->exec.executor, pItem);
}
if (code != 0) return code;
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
}
#if 0
// TODO: back pressure
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0;
}
SStreamTask* tNewStreamTask(int64_t streamId);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem);
bool tInputQueueIsFull(const SStreamTask* pTask);
static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) {
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
......@@ -617,32 +569,15 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
// expand and deploy
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pRecoverStatus;
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
} SStreamMeta;
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen);
// SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
......
......@@ -138,7 +138,8 @@ typedef struct {
int8_t enableRef;
} SWalFilterCond;
typedef struct {
// todo hide this struct
typedef struct SWalReader {
SWal *pWal;
int64_t readerId;
TdFilePtr pLogFile;
......@@ -146,8 +147,8 @@ typedef struct {
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int8_t curInvalid;
int8_t curStopped;
// int8_t curInvalid;
// int8_t curStopped;
TdThreadMutex mutex;
SWalFilterCond cond;
// TODO remove it
......@@ -196,6 +197,7 @@ void walReadReset(SWalReader *pReader);
int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader* pReader);
// only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
......
......@@ -242,6 +242,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_ALTER_OPER TAOS_DEF_ERROR_CODE(0, 0x0356)
#define TSDB_CODE_MND_AUTH_FAILURE TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_USER_NOT_AVAILABLE TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_PRIVILEDGE_EXIST TAOS_DEF_ERROR_CODE(0, 0x0359)
// mnode-stable-part1
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
......@@ -762,6 +763,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
......@@ -198,6 +198,7 @@ typedef enum ELogicConditionType {
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 200
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 1024 * 1024
......
......@@ -61,7 +61,7 @@ typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
typedef struct STaosQnode STaosQnode;
typedef struct STaosQnode {
struct STaosQnode {
STaosQnode *next;
STaosQueue *queue;
int64_t timestamp;
......@@ -70,9 +70,9 @@ typedef struct STaosQnode {
int8_t itype;
int8_t reserved[3];
char item[];
} STaosQnode;
};
typedef struct STaosQueue {
struct STaosQueue {
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
......@@ -86,22 +86,22 @@ typedef struct STaosQueue {
int64_t threadId;
int64_t memLimit;
int64_t itemLimit;
} STaosQueue;
};
typedef struct STaosQset {
struct STaosQset {
STaosQueue *head;
STaosQueue *current;
TdThreadMutex mutex;
tsem_t sem;
int32_t numOfQueues;
int32_t numOfItems;
} STaosQset;
};
typedef struct STaosQall {
struct STaosQall {
STaosQnode *current;
STaosQnode *start;
int32_t numOfItems;
} STaosQall;
};
STaosQueue *taosOpenQueue();
void taosCloseQueue(STaosQueue *queue);
......
......@@ -12,7 +12,7 @@ ENV TINI_VERSION v0.19.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
ENV DEBIAN_FRONTEND=noninteractive
WORKDIR /root/
RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat curl gdb vim tmux less net-tools valgrind && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
LC_CTYPE=en_US.UTF-8 \
......
......@@ -572,6 +572,22 @@ function install_config() {
done
}
function install_share_etc() {
[ ! -d ${script_dir}/share/etc ] && return
for c in `ls ${script_dir}/share/etc/`; do
if [ -e /etc/${clientName2}/$c ]; then
out=/etc/${clientName2}/$c.new.`date +%F`
${csudo}cp -f ${script_dir}/share/etc/$c $out ||:
else
${csudo}mkdir -p /etc/${clientName2} >/dev/null 2>/dev/null ||:
${csudo}cp -f ${script_dir}/share/etc/$c /etc/${clientName2}/$c ||:
fi
done
[ ! -d ${script_dir}/share/srv ] && return
${csudo} cp ${script_dir}/share/srv/* ${service_config_dir} ||:
}
function install_log() {
${csudo}rm -rf ${log_dir} || :
${csudo}mkdir -p ${log_dir} && ${csudo}chmod 777 ${log_dir}
......@@ -599,7 +615,7 @@ function install_examples() {
function install_web() {
if [ -d "${script_dir}/share" ]; then
${csudo}cp -rf ${script_dir}/share/* ${install_main_dir}/share
${csudo}cp -rf ${script_dir}/share/* ${install_main_dir}/share > /dev/null 2>&1 ||:
fi
}
......@@ -687,11 +703,33 @@ function clean_service_on_systemd() {
# if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then
# ${csudo}rm -f ${service_config_dir}/${serverName2}.service
# fi
x_service_config="${service_config_dir}/${xName2}.service"
if [ -e "$x_service_config" ]; then
if systemctl is-active --quiet ${xName2}; then
echo "${productName2} ${xName2} is running, stopping it..."
${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${x_service_config}
fi
explorer_service_config="${service_config_dir}/${explorerName2}.service"
if [ -e "$explorer_service_config" ]; then
if systemctl is-active --quiet ${explorerName2}; then
echo "${productName2} ${explorerName2} is running, stopping it..."
${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${explorer_service_config}
${csudo}rm -f /etc/${clientName2}/explorer.toml
fi
}
function install_service_on_systemd() {
clean_service_on_systemd
install_share_etc
[ -f ${script_dir}/cfg/${serverName2}.service ] &&
${csudo}cp ${script_dir}/cfg/${serverName2}.service \
${service_config_dir}/ || :
......
TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing.
To configure TDengine : edit /etc/taos/taos.cfg
To start service : launchctl start com.tdengine.taosd
To start Taos Adapter : launchctl start com.tdengine.taosadapter
To access TDengine : use taos in shell
• To configure TDengine, edit /etc/taos/taos.cfg
• To start service, run launchctl start com.tdengine.taosd
• To start Taos Adapter, run launchctl start com.tdengine.taosadapter
• To access TDengine from your local machine, run taos
If you're experiencing problems installing TDengine, check the file /var/log/taos/tdengine_install.log to help troubleshoot the installation.
TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing.
Once it's installed, please take the steps below:
1: open a terminal/shell in Mac
2: if connecting to Cloud Service, follow the instructions on your cloud service account and configure the environment variable
3: if connecting to another TDengine Service, you can also view help information via "taos --help"
4: execute command taos
After the installation process is complete, perform the following steps to start using TDengine:
1: Open Terminal on your Mac.
2: To connect to a TDengine server using the default settings and credentials, run the taos command.
3: To connect to a TDengine server using custom settings or credentials, run taos --help for more information.
4: To connect to TDengine Cloud, follow the instructions on the Tools - TDengine CLI page in your TDengine Cloud account.
If you're experiencing problems installing TDengine, check the file /var/log/taos/tdengine_install.log to help troubleshoot the installation.
If any issues occur during installation, check the /var/log/taos/tdengine_install.log file to troubleshoot.
\ No newline at end of file
......@@ -53,9 +53,7 @@ if [ -d ${top_dir}/tools/taos-tools/packaging/deb ]; then
cd ${top_dir}/tools/taos-tools/packaging/deb
taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1)
[ -z "$taos_tools_ver" ] && taos_tools_ver="0.1.0"
taostools_ver=$(git for-each-ref --sort=taggerdate --format '%(tag)' refs/tags|grep -v taos | tail -1)
[ -z "$taostools_ver" ] && taostools_ver="0.1.0"
taostools_install_dir="${release_dir}/${clientName2}Tools-${taostools_ver}"
cd ${curr_dir}
......@@ -152,6 +150,7 @@ fi
mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/* || :
mkdir -p ${install_dir}/init.d && cp ${init_file_deb} ${install_dir}/init.d/${serverName}.deb
mkdir -p ${install_dir}/init.d && cp ${init_file_rpm} ${install_dir}/init.d/${serverName}.rpm
mkdir -p ${install_dir}/share && cp -rf ${build_dir}/share/{etc,srv} ${install_dir}/share ||:
if [ $adapterName != "taosadapter" ]; then
mv ${install_dir}/cfg/${clientName2}adapter.toml ${install_dir}/cfg/$adapterName.toml
......
......@@ -192,7 +192,27 @@ function clean_service_on_systemd() {
${csudo}systemctl stop ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable ${tarbitrator_service_name} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${tarbitratord_service_config}
x_service_config="${service_config_dir}/${xName2}.service"
if [ -e "$x_service_config" ]; then
if systemctl is-active --quiet ${xName2}; then
echo "${productName2} ${xName2} is running, stopping it..."
${csudo}systemctl stop ${xName2} &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable ${xName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${x_service_config}
fi
explorer_service_config="${service_config_dir}/${explorerName2}.service"
if [ -e "$explorer_service_config" ]; then
if systemctl is-active --quiet ${explorerName2}; then
echo "${productName2} ${explorerName2} is running, stopping it..."
${csudo}systemctl stop ${explorerName2} &>/dev/null || echo &>/dev/null
fi
${csudo}systemctl disable ${explorerName2} &>/dev/null || echo &>/dev/null
${csudo}rm -f ${explorer_service_config}
${csudo}rm -f /etc/${clientName2}/explorer.toml
fi
}
function clean_service_on_sysvinit() {
......
......@@ -36,14 +36,6 @@ extern "C" {
#include "tconfig.h"
#define CHECK_CODE_GOTO(expr, label) \
do { \
code = expr; \
if (TSDB_CODE_SUCCESS != code) { \
goto label; \
} \
} while (0)
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
......@@ -286,28 +278,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
return (SReqResultInfo*)&msg->resInfo;
}
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
pRspObj->resIter++;
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
if (pRspObj->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(pRspObj->resInfo.row);
taosMemoryFreeClear(pRspObj->resInfo.pCol);
taosMemoryFreeClear(pRspObj->resInfo.length);
taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
taosMemoryFreeClear(pRspObj->resInfo.convertJson);
}
setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
return &pRspObj->resInfo;
}
return NULL;
}
SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4);
static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo);
......@@ -320,7 +291,6 @@ extern int32_t clientConnRefPool;
extern int32_t timestampDeltaLimit;
extern int64_t lastClusterId;
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
......@@ -373,7 +343,6 @@ void taos_close_internal(void* taos);
// global, called by mgmt
int hbMgrInit();
void hbMgrCleanUp();
int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
......@@ -386,9 +355,6 @@ void stopAllRequests(SHashObj* pRequests);
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
// --- mq
void hbMgrInitMqHbRspHandle();
typedef struct SSqlCallbackWrapper {
SParseContext* pParseCtx;
SCatalogReq* pCatalogReq;
......
......@@ -26,6 +26,7 @@ extern "C" {
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", DEBUG_FATAL, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", DEBUG_ERROR, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", DEBUG_WARN, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarnL(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLongString("TSC WARN ", DEBUG_WARN, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", DEBUG_INFO, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", DEBUG_DEBUG, cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", DEBUG_TRACE, cDebugFlag, __VA_ARGS__); }} while(0)
......
......@@ -102,6 +102,7 @@ typedef struct STscStmt {
SStmtBindInfo bInfo;
int64_t reqid;
int32_t errCode;
} STscStmt;
extern char *gStmtStatusStr[];
......@@ -121,6 +122,7 @@ extern char *gStmtStatusStr[];
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
pStmt->errCode = _code; \
return _code; \
} \
} while (0)
......@@ -129,6 +131,7 @@ extern char *gStmtStatusStr[];
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
pStmt->errCode = _code; \
} \
return _code; \
} while (0)
......@@ -137,9 +140,19 @@ extern char *gStmtStatusStr[];
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
pStmt->errCode = code; \
goto _return; \
} \
} while (0)
#define STMT_ERRI_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__)
#define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__)
......
......@@ -107,6 +107,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
if (duration >= SLOW_QUERY_INTERVAL) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
tscWarnL("slow query: %s, duration:%" PRId64, pRequest->sqlstr, duration);
}
releaseTscObj(pTscObj->id);
......
......@@ -1039,7 +1039,6 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
.sysInfo = pRequest->pTscObj->sysInfo,
.allocatorId = pRequest->allocatorRefId};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SQueryPlan* pDag = NULL;
int64_t st = taosGetTimestampUs();
......@@ -1052,7 +1051,6 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
}
pRequest->metric.execStart = taosGetTimestampUs();
pRequest->metric.planCostUs = pRequest->metric.execStart - st;
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
......
......@@ -174,6 +174,7 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName) {
if (childTableNameLen == tag->keyLen && strncmp(tag->key, tsSmlChildTableName, tag->keyLen) == 0) {
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, tag->value, (tag->length < TSDB_TABLE_NAME_LEN ? tag->length : TSDB_TABLE_NAME_LEN));
taosArrayRemove(tags, i);
break;
}
}
......@@ -533,8 +534,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
if (index) {
if (colField[*index].type != kv->type) {
uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
kv->key, colField[*index].type, kv->type);
uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
return TSDB_CODE_TSC_INVALID_VALUE;
}
......@@ -699,7 +699,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
pReq.numOfTags = 1;
SField field = {0};
field.type = TSDB_DATA_TYPE_NCHAR;
field.bytes = 1;
field.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
strcpy(field.name, tsSmlTagName);
taosArrayPush(pReq.pTags, &field);
}
......
......@@ -32,8 +32,14 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
STMT_LOG_SEQ(newStatus);
}
if (pStmt->errCode && newStatus != STMT_PREPARE) {
STMT_DLOG("stmt already failed with err: %s", tstrerror(pStmt->errCode));
return pStmt->errCode;
}
switch (newStatus) {
case STMT_PREPARE:
pStmt->errCode = 0;
break;
case STMT_SETTBNAME:
if (STMT_STATUS_EQ(INIT) || STMT_STATUS_EQ(BIND) || STMT_STATUS_EQ(BIND_COL)) {
......@@ -197,7 +203,10 @@ int32_t stmtGetExecInfo(TAOS_STMT* stmt, SHashObj** pVgHash, SHashObj** pBlockHa
STscStmt* pStmt = (STscStmt*)stmt;
*pVgHash = pStmt->sql.pVgHash;
pStmt->sql.pVgHash = NULL;
*pBlockHash = pStmt->exec.pBlockHash;
pStmt->exec.pBlockHash = NULL;
return TSDB_CODE_SUCCESS;
}
......@@ -325,6 +334,8 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
}
int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
STMT_DLOG_E("start to free SQL info");
taosMemoryFree(pStmt->sql.queryRes.fields);
taosMemoryFree(pStmt->sql.queryRes.userFields);
taosMemoryFree(pStmt->sql.sqlStr);
......@@ -351,6 +362,8 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
memset(&pStmt->sql, 0, sizeof(pStmt->sql));
STMT_DLOG_E("end to free SQL info");
return TSDB_CODE_SUCCESS;
}
......@@ -441,11 +454,10 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
.mgmtEps = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp)};
int32_t code = catalogGetTableMeta(pStmt->pCatalog, &conn, &pStmt->bInfo.sname, &pTableMeta);
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
STMT_ERR_RET(stmtCleanBindInfo(pStmt));
tscDebug("tb %s not exist", pStmt->bInfo.tbFName);
stmtCleanBindInfo(pStmt);
return TSDB_CODE_SUCCESS;
STMT_ERR_RET(code);
}
STMT_ERR_RET(code);
......@@ -922,9 +934,13 @@ _return:
int stmtClose(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_DLOG_E("start to free stmt");
stmtCleanSQLInfo(pStmt);
taosMemoryFree(stmt);
STMT_DLOG_E("stmt freed");
return TSDB_CODE_SUCCESS;
}
......@@ -959,15 +975,17 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
}
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int32_t code = 0;
STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get tag fields");
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
......@@ -979,27 +997,33 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
STMT_ERRI_JRET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchTagFields(stmt, nums, fields));
STMT_ERRI_JRET(stmtFetchTagFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
_return:
pStmt->errCode = preCode;
return code;
}
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int32_t code = 0;
STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get col fields");
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
......@@ -1011,15 +1035,19 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
pStmt->exec.pRequest = NULL;
}
STMT_ERR_RET(stmtCreateRequest(pStmt));
STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
STMT_ERRI_JRET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchColFields(stmt, nums, fields));
STMT_ERRI_JRET(stmtFetchColFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
_return:
pStmt->errCode = preCode;
return code;
}
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
......
......@@ -107,7 +107,6 @@ struct tmq_t {
STaosQueue* mqueue; // queue of rsp
STaosQall* qall;
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
TdThreadMutex lock; // used to protect the operation on each topic, when updating the epsets.
tsem_t rspSem;
};
......@@ -188,7 +187,6 @@ typedef struct {
SMqClientVg* pVg;
SMqClientTopic* pTopic;
int32_t vgId;
tsem_t rspSem;
uint64_t requestId; // request id for debug purpose
} SMqPollCbParam;
......@@ -212,6 +210,11 @@ typedef struct {
tmq_t* pTmq;
} SMqCommitCbParam;
typedef struct SSyncCommitInfo {
tsem_t sem;
int32_t code;
} SSyncCommitInfo;
static int32_t doAskEp(tmq_t* tmq);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
......@@ -523,11 +526,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsgSendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = sizeof(SMsgHead) + len,
.handle = NULL,
};
pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0;
......@@ -788,11 +787,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
goto OVER;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = tlen,
.handle = NULL,
};
sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
......@@ -979,7 +974,6 @@ void tmqFreeImpl(void* handle) {
taosFreeQall(tmq->qall);
tsem_destroy(&tmq->rspSem);
taosThreadMutexDestroy(&tmq->lock);
taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
taos_close_internal(tmq->pTscObj);
......@@ -1024,7 +1018,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->delayedTask = taosOpenQueue();
pTmq->qall = taosAllocateQall();
taosThreadMutexInit(&pTmq->lock, NULL);
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
conf->groupId[0] == 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -1155,6 +1148,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
};
if (tsem_init(&param.rspSem, 0, 0) != 0) {
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto FAIL;
}
......@@ -1190,6 +1184,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto FAIL;
}
......@@ -1233,7 +1229,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
......@@ -1270,6 +1265,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
taosWriteQitem(tmq->mqueue, pRspWrapper);
} else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert
taosMsleep(500);
}
goto CREATE_MSG_FAIL;
......@@ -1346,8 +1343,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData);
taosWriteQitem(tmq->mqueue, pRspWrapper);
int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId);
tmq->consumerId, rspType, vgId, total, requestId);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
......@@ -1422,7 +1420,7 @@ static void freeClientVgInfo(void* param) {
taosArrayDestroy(pTopic->vgs);
}
static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false;
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
......@@ -1431,6 +1429,9 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
if (epoch <= tmq->epoch) {
return false;
}
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) {
......@@ -1474,14 +1475,11 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
taosHashCleanup(pVgOffsetHashMap);
taosThreadMutexLock(&tmq->lock);
// destroy current buffered existed topics info
if (tmq->clientTopics) {
taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
}
tmq->clientTopics = newTopics;
taosThreadMutexUnlock(&tmq->lock);
int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
atomic_store_8(&tmq->status, flag);
......@@ -1537,8 +1535,8 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
} else {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
}
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
......@@ -1745,7 +1743,7 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/
tDeleteSMqAskEpRsp(rspMsg);
*pReset = true;
......@@ -2125,13 +2123,8 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void*
}
}
typedef struct SSyncCommitInfo {
tsem_t sem;
int32_t code;
} SSyncCommitInfo;
static void commitCallBackFn(tmq_t* pTmq, int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
pInfo->code = code;
tsem_post(&pInfo->sem);
}
......@@ -2168,7 +2161,7 @@ void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* par
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
tmqUpdateEp(pTmq, head->epoch, &rsp);
doUpdateLocalEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
}
......@@ -2308,3 +2301,26 @@ void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, cons
waitingRspNum);
}
}
SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
pRspObj->resIter++;
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
if (pRspObj->rsp.withSchema) {
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
taosMemoryFreeClear(pRspObj->resInfo.row);
taosMemoryFreeClear(pRspObj->resInfo.pCol);
taosMemoryFreeClear(pRspObj->resInfo.length);
taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
taosMemoryFreeClear(pRspObj->resInfo.convertJson);
}
setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
return &pRspObj->resInfo;
}
return NULL;
}
\ No newline at end of file
......@@ -162,9 +162,11 @@ void* queryThread(void* arg) {
return NULL;
}
static int32_t numOfThreads = 1;
int32_t numOfThreads = 1;
void tmq_commit_cb_print(tmq_t* pTmq, int32_t code, void* param) { printf("success, code:%d\n", code); }
void tmq_commit_cb_print(tmq_t* pTmq, int32_t code, void* param) {
printf("auto commit success, code:%d\n\n\n\n", code);
}
void* doConsumeData(void* param) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -172,7 +174,7 @@ void* doConsumeData(void* param) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName12");
tmq_conf_set(conf, "group.id", "cgrpName41");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
......@@ -1059,7 +1061,7 @@ TEST(clientCase, sub_tb_test) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName27");
tmq_conf_set(conf, "group.id", "cgrpName45");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
......
......@@ -299,7 +299,9 @@ static const SSysDbTableSchema vnodesSchema[] = {
static const SSysDbTableSchema userUserPrivilegesSchema[] = {
{.name = "user_name", .bytes = TSDB_USER_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "privilege", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "object_name", .bytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "table_name", .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "condition", .bytes = TSDB_PRIVILEDGE_CONDITION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysTableMeta infosMeta[] = {
......
......@@ -1590,12 +1590,13 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
i += 1;
}
} else if (n > 8) {
int32_t gap = len - newLen;
int32_t remain = (total % 8 != 0 && total % 8 <= tail) ? 1 : 0;
int32_t gap = len - newLen - remain;
while (i < newLen) {
uint8_t v = p[i + gap];
p[i] = (v << tail);
if (i < newLen - 1) {
if (i < newLen - 1 + remain) {
uint8_t next = p[i + gap + 1];
p[i] |= (next >> (8 - tail));
}
......
......@@ -20,6 +20,10 @@
#include "tlog.h"
#include "tmisce.h"
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h"
#endif
GRANT_CFG_DECLARE;
SConfig *tsCfg = NULL;
......@@ -99,6 +103,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
// query
int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = false;
int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
......@@ -338,6 +343,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
......@@ -735,6 +741,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsUseAdapter = cfgGetItem(pCfg, "useAdapter")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsQueryMaxConcurrentTables = cfgGetItem(pCfg, "queryMaxConcurrentTables")->i64;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
......
......@@ -1368,6 +1368,12 @@ int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq)
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->pass) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->objname) < 0) return -1;
int32_t len = strlen(pReq->tabName);
if (tEncodeI32(&encoder, len) < 0) return -1;
if (len > 0) {
if (tEncodeCStr(&encoder, pReq->tabName) < 0) return -1;
}
if (tEncodeBinary(&encoder, pReq->tagCond, pReq->tagCondLen) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -1387,6 +1393,16 @@ int32_t tDeserializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->pass) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->objname) < 0) return -1;
if (!tDecodeIsEnd(&decoder)) {
int32_t len = 0;
if (tDecodeI32(&decoder, &len) < 0) return -1;
if (len > 0) {
if (tDecodeCStrTo(&decoder, pReq->tabName) < 0) return -1;
}
uint64_t tagCondLen = 0;
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->tagCond, &tagCondLen) < 0) return -1;
pReq->tagCondLen = tagCondLen;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -1429,6 +1445,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
int32_t numOfCreatedDbs = taosHashGetSize(pRsp->createdDbs);
int32_t numOfReadDbs = taosHashGetSize(pRsp->readDbs);
int32_t numOfWriteDbs = taosHashGetSize(pRsp->writeDbs);
if (tEncodeI32(pEncoder, numOfCreatedDbs) < 0) return -1;
if (tEncodeI32(pEncoder, numOfReadDbs) < 0) return -1;
if (tEncodeI32(pEncoder, numOfWriteDbs) < 0) return -1;
......@@ -1451,6 +1468,54 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
db = taosHashIterate(pRsp->writeDbs, db);
}
int32_t numOfReadTbs = taosHashGetSize(pRsp->readTbs);
int32_t numOfWriteTbs = taosHashGetSize(pRsp->writeTbs);
int32_t numOfUseTbs = taosHashGetSize(pRsp->useDbs);
if (tEncodeI32(pEncoder, numOfReadTbs) < 0) return -1;
if (tEncodeI32(pEncoder, numOfWriteTbs) < 0) return -1;
if (tEncodeI32(pEncoder, numOfUseTbs) < 0) return -1;
char *tb = taosHashIterate(pRsp->readTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
size_t valueLen = 0;
valueLen = strlen(tb);
if (tEncodeI32(pEncoder, valueLen) < 0) return -1;
if (tEncodeCStr(pEncoder, tb) < 0) return -1;
tb = taosHashIterate(pRsp->readTbs, tb);
}
tb = taosHashIterate(pRsp->writeTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
size_t valueLen = 0;
valueLen = strlen(tb);
if (tEncodeI32(pEncoder, valueLen) < 0) return -1;
if (tEncodeCStr(pEncoder, tb) < 0) return -1;
tb = taosHashIterate(pRsp->writeTbs, tb);
}
int32_t *useDb = taosHashIterate(pRsp->useDbs, NULL);
while (useDb != NULL) {
size_t keyLen = 0;
void *key = taosHashGetKey(useDb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
if (tEncodeI32(pEncoder, *useDb) < 0) return -1;
useDb = taosHashIterate(pRsp->useDbs, useDb);
}
return 0;
}
......@@ -1473,7 +1538,11 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pRsp->readDbs == NULL || pRsp->writeDbs == NULL) {
pRsp->readTbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pRsp->writeTbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pRsp->useDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pRsp->createdDbs == NULL || pRsp->readDbs == NULL || pRsp->writeDbs == NULL || pRsp->readTbs == NULL ||
pRsp->writeTbs == NULL || pRsp->useDbs == NULL) {
return -1;
}
......@@ -1512,6 +1581,63 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
taosHashPut(pRsp->writeDbs, db, len, db, len);
}
if (!tDecodeIsEnd(pDecoder)) {
int32_t numOfReadTbs = 0;
int32_t numOfWriteTbs = 0;
int32_t numOfUseDbs = 0;
if (tDecodeI32(pDecoder, &numOfReadTbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfWriteTbs) < 0) return -1;
if (tDecodeI32(pDecoder, &numOfUseDbs) < 0) return -1;
for (int32_t i = 0; i < numOfReadTbs; ++i) {
int32_t keyLen = 0;
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
int32_t valuelen = 0;
if (tDecodeI32(pDecoder, &valuelen) < 0) return -1;
char *value = taosMemoryCalloc(valuelen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, value) < 0) return -1;
taosHashPut(pRsp->readTbs, key, strlen(key), value, valuelen + 1);
taosMemoryFree(key);
taosMemoryFree(value);
}
for (int32_t i = 0; i < numOfWriteTbs; ++i) {
int32_t keyLen = 0;
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
int32_t valuelen = 0;
if (tDecodeI32(pDecoder, &valuelen) < 0) return -1;
char *value = taosMemoryCalloc(valuelen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, value) < 0) return -1;
taosHashPut(pRsp->writeTbs, key, strlen(key), value, valuelen + 1);
taosMemoryFree(key);
taosMemoryFree(value);
}
for (int32_t i = 0; i < numOfUseDbs; ++i) {
int32_t keyLen = 0;
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
int32_t ref = 0;
if (tDecodeI32(pDecoder, &ref) < 0) return -1;
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
}
}
return 0;
}
......@@ -1533,6 +1659,9 @@ void tFreeSGetUserAuthRsp(SGetUserAuthRsp *pRsp) {
taosHashCleanup(pRsp->createdDbs);
taosHashCleanup(pRsp->readDbs);
taosHashCleanup(pRsp->writeDbs);
taosHashCleanup(pRsp->writeTbs);
taosHashCleanup(pRsp->readTbs);
taosHashCleanup(pRsp->useDbs);
}
int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnodeReq *pReq) {
......@@ -7317,6 +7446,7 @@ void tDestroySSubmitReq2(SSubmitReq2 *pReq, int32_t flag) {
tDestroySSubmitTbData(&aSubmitTbData[i], flag);
}
taosArrayDestroy(pReq->aSubmitTbData);
pReq->aSubmitTbData = NULL;
}
int32_t tEncodeSSubmitRsp2(SEncoder *pCoder, const SSubmitRsp2 *pRsp) {
......
......@@ -114,11 +114,11 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr(code));
dGError("vnodeProcessFetchMsg vgId:%d, msg:%p failed to fetch since %s", pVnode->vgId, pMsg, terrstr());
vmSendRsp(pMsg, code);
}
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
dGTrace("vnodeProcessFetchMsg vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -163,7 +163,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
dGWarn("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d contLen:%d", pHead->vgId, pMsg,
terrstr(), TMSG_INFO(pMsg->msgType), qtype, pHead->contLen);
terrno = (terrno != 0) ? terrno : -1;
return terrno;
......
......@@ -24,10 +24,10 @@ extern "C" {
enum {
MQ_CONSUMER_STATUS__MODIFY = 1,
MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__READY,
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__LOST_REBD,
MQ_CONSUMER_STATUS__REMOVED,
};
......
......@@ -281,6 +281,9 @@ typedef struct {
SHashObj* readDbs;
SHashObj* writeDbs;
SHashObj* topics;
SHashObj* readTbs;
SHashObj* writeTbs;
SHashObj* useDbs;
SRWLatch lock;
} SUserObj;
......
......@@ -38,6 +38,7 @@ void mndFreeStb(SStbObj *pStb);
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen);
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
const char *mndGetStbStr(const char *src);
......
......@@ -31,6 +31,7 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser);
// for trans test
SSdbRaw *mndUserActionEncode(SUserObj *pUser);
SHashObj *mndDupDbHash(SHashObj *pOld);
SHashObj *mndDupTableHash(SHashObj *pOld);
SHashObj *mndDupTopicHash(SHashObj *pOld);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
......
......@@ -67,7 +67,7 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
return 0;
}
static SClusterObj *mndAcquireCluster(SMnode *pMnode) {
static SClusterObj *mndAcquireCluster(SMnode *pMnode, void **ppIter) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
......@@ -76,23 +76,27 @@ static SClusterObj *mndAcquireCluster(SMnode *pMnode) {
pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster);
if (pIter == NULL) break;
*ppIter = pIter;
return pCluster;
}
return NULL;
}
static void mndReleaseCluster(SMnode *pMnode, SClusterObj *pCluster) {
static void mndReleaseCluster(SMnode *pMnode, SClusterObj *pCluster, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pCluster);
}
int64_t mndGetClusterId(SMnode *pMnode) {
int64_t clusterId = 0;
SClusterObj *pCluster = mndAcquireCluster(pMnode);
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
clusterId = pCluster->id;
mndReleaseCluster(pMnode, pCluster);
mndReleaseCluster(pMnode, pCluster, pIter);
}
return clusterId;
......@@ -100,10 +104,11 @@ int64_t mndGetClusterId(SMnode *pMnode) {
int64_t mndGetClusterCreateTime(SMnode *pMnode) {
int64_t createTime = 0;
SClusterObj *pCluster = mndAcquireCluster(pMnode);
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
createTime = pCluster->createdTime;
mndReleaseCluster(pMnode, pCluster);
mndReleaseCluster(pMnode, pCluster, pIter);
}
return createTime;
......@@ -121,10 +126,11 @@ static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) {
float mndGetClusterUpTime(SMnode *pMnode) {
int64_t upTime = 0;
SClusterObj *pCluster = mndAcquireCluster(pMnode);
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
upTime = mndGetClusterUpTimeImp(pCluster);
mndReleaseCluster(pMnode, pCluster);
mndReleaseCluster(pMnode, pCluster, pIter);
}
return upTime / 86400.0f;
......@@ -321,11 +327,12 @@ static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter) {
static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SClusterObj clusterObj = {0};
SClusterObj *pCluster = mndAcquireCluster(pMnode);
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
clusterObj.upTime += tsUptimeInterval;
mndReleaseCluster(pMnode, pCluster);
mndReleaseCluster(pMnode, pCluster, pIter);
}
if (clusterObj.id <= 0) {
......
......@@ -335,7 +335,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
} else if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
......@@ -873,17 +873,11 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
int32_t status = pConsumer->status;
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
if (status == MQ_CONSUMER_STATUS__MODIFY) {
pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
} else if (status == MQ_CONSUMER_STATUS__LOST) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST;
}
}
}
......@@ -1192,10 +1186,8 @@ static const char *mndConsumerStatusName(int status) {
return "ready";
case MQ_CONSUMER_STATUS__LOST:
case MQ_CONSUMER_STATUS__LOST_REBD:
case MQ_CONSUMER_STATUS__LOST_IN_REB:
return "lost";
case MQ_CONSUMER_STATUS__MODIFY:
case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
return "rebalancing";
default:
return "unknown";
......
......@@ -70,7 +70,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j);
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
}
}
......@@ -130,7 +130,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
taosArrayDestroy(pArray);
return -1;
}
if (tDecodeSStreamTask(pDecoder, pTask) < 0) {
if (tDecodeStreamTask(pDecoder, pTask) < 0) {
taosMemoryFree(pTask);
taosArrayDestroy(pArray);
return -1;
......@@ -158,7 +158,10 @@ void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);
if (pStream->outputSchema.nCols) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
int32_t sz = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < sz; i++) {
......@@ -166,11 +169,14 @@ void tFreeStreamObj(SStreamObj *pStream) {
int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
tFreeSStreamTask(pTask);
tFreeStreamTask(pTask);
}
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
// tagSchema.pSchema
if (pStream->tagSchema.nCols > 0) {
taosMemoryFree(pStream->tagSchema.pSchema);
......
......@@ -35,6 +35,7 @@ int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) {
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = 1;
pRsp->enable = pUser->enable;
pRsp->version = pUser->authVersion;
return 0;
}
......
......@@ -138,7 +138,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
pVgInfo->taskId = pLastLevelTask->id.taskId;
break;
}
}
......@@ -149,7 +149,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
......@@ -224,7 +224,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -260,7 +260,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -350,7 +350,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
pInnerTask = tNewSStreamTask(pStream->uid);
pInnerTask = tNewStreamTask(pStream->uid);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan);
......@@ -421,7 +421,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
......@@ -440,7 +440,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->fixedEpDispatcher.taskId = pInnerTask->taskId;
pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
......@@ -460,7 +460,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->taskId;
pEpInfo->taskId = pTask->id.taskId;
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
sdbRelease(pSdb, pVgroup);
}
......@@ -491,7 +491,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
......
......@@ -2614,6 +2614,13 @@ void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst) {
tNameGetFullDbName(&name, dst);
}
void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst) {
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetDbName(&name, dst);
}
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
int32_t pos = -1;
int32_t num = 0;
......
......@@ -35,12 +35,12 @@
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
// static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
......@@ -418,7 +418,7 @@ FAIL:
int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeSStreamTask(&encoder, pTask);
tEncodeStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tEncoderClear(&encoder);
......@@ -430,7 +430,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
((SMsgHead *)buf)->vgId = htonl(pTask->nodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, size);
tEncodeSStreamTask(&encoder, pTask);
tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
STransAction action = {0};
......@@ -601,7 +601,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
......@@ -1209,7 +1209,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->taskId, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->id.taskId, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
......
......@@ -65,7 +65,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(0);
pTask->outputQueue = streamQueueOpen(0);
......@@ -77,21 +77,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb;
pTask->startVer = ver;
pTask->chkInfo.version = ver;
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
}
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState,
};
int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
ASSERT(pTask->exec.executor);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
ASSERT(pTask->exec.pExecutor);
streamSetupTrigger(pTask);
return 0;
......@@ -143,7 +140,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
code = tDecodeSStreamTask(&decoder, pTask);
code = tDecodeStreamTask(&decoder, pTask);
if (code < 0) {
tDecoderClear(&decoder);
taosMemoryFree(pTask);
......@@ -154,7 +151,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// 2.save task
code = streamMetaAddTask(pSnode->pMeta, -1, pTask);
code = streamMetaAddDeployedTask(pSnode->pMeta, -1, pTask);
if (code < 0) {
return -1;
}
......
......@@ -57,6 +57,7 @@ target_sources(
# tq
"src/tq/tq.c"
"src/tq/tqUtil.c"
"src/tq/tqScan.c"
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
......@@ -64,6 +65,7 @@ target_sources(
"src/tq/tqPush.c"
"src/tq/tqSink.c"
"src/tq/tqCommit.c"
"src/tq/tqRestore.c"
"src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
)
......
......@@ -228,19 +228,12 @@ typedef struct SSnapContext {
SArray *idList;
int32_t index;
bool withMeta;
bool queryMetaOrData; // true-get meta, false-get data
bool queryMeta; // true-get meta, false-get data
} SSnapContext;
typedef struct STqReader {
// const SSubmitReq *pMsg;
// SSubmitBlk *pBlock;
// SSubmitMsgIter msgIter;
// SSubmitBlkIter blkIter;
int64_t ver;
SPackedData msg2;
int8_t setMsg;
SSubmitReq2 submit;
int32_t nextBlk;
......@@ -263,15 +256,16 @@ void tqCloseReader(STqReader *);
void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
void tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
// int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock2(STqReader *pReader);
bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
......
......@@ -80,7 +80,7 @@ typedef struct {
typedef struct {
int8_t subType;
STqReader* pExecReader;
STqReader* pTqReader;
qTaskInfo_t task;
union {
STqExecCol execCol;
......@@ -128,6 +128,10 @@ typedef struct {
tmr_h timer;
} STqMgmt;
typedef struct {
int32_t size;
} STqOffsetHead;
static STqMgmt tqMgmt = {0};
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
......@@ -154,10 +158,6 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
typedef struct {
int32_t size;
} STqOffsetHead;
STqOffsetStore* tqOffsetOpen(STQ* pTq);
void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
......@@ -176,6 +176,18 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqStreamTasksScanWal(STQ* pTq);
// tq util
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq);
int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
#ifdef __cplusplus
}
......
......@@ -123,12 +123,12 @@ int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema)
void tsdbRowClose(STSDBRowIter *pIter);
SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger
int32_t tsdbRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
// int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowMergerClear(SRowMerger *pMerger);
int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
// TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
......@@ -224,7 +224,7 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
......
......@@ -192,9 +192,10 @@ void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type);
int tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......
......@@ -532,9 +532,10 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
}
SHashObj* pHashObj = (SHashObj*)p[0];
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
{
if (pEntry != NULL && (*pEntry) != NULL) {
int64_t st = taosGetTimestampUs();
SListIter iter = {0};
......@@ -547,9 +548,9 @@ static void freePayload(const void* key, size_t keyLen, void* value) {
void* tmp = tdListPopNode(&((*pEntry)->list), pNode);
taosMemoryFree(tmp);
int64_t et = taosGetTimestampUs();
metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
(et - st) / 1000.0);
double el = (taosGetTimestampUs() - st) / 1000.0;
metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)),
el);
break;
}
}
......
......@@ -268,7 +268,7 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
ctx->snapVersion = snapVersion;
ctx->suid = suid;
ctx->subType = subType;
ctx->queryMetaOrData = withMeta;
ctx->queryMeta = withMeta;
ctx->withMeta = withMeta;
ctx->idVersion = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (ctx->idVersion == NULL) {
......@@ -475,7 +475,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void** pBuf, int32_t* contLen, in
if (ctx->index >= taosArrayGetSize(ctx->idList)) {
metaDebug("tmqsnap get meta end");
ctx->index = 0;
ctx->queryMetaOrData = false; // change to get data
ctx->queryMeta = false; // change to get data
return 0;
}
......
......@@ -168,7 +168,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids,
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) {
if ((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) {
if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd, NULL)) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr());
......
此差异已折叠。
......@@ -16,10 +16,13 @@
#include "tq.h"
int tqCommit(STQ* pTq) {
#if 0
// stream meta commit does not be aligned to the vnode commit
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr());
return -1;
}
#endif
return tqOffsetCommitFile(pTq->pOffsetStore);
}
......@@ -320,15 +320,15 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
code = -1;
goto end;
}
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
if (handle.execHandle.pExecReader == NULL) {
handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
if (handle.execHandle.pTqReader == NULL) {
tqError("cannot extract exec reader for %s", handle.subKey);
code = -1;
goto end;
}
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext));
......@@ -343,8 +343,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
}
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList);
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
......
......@@ -128,31 +128,35 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
}
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
if (!pStore->needCommit) return 0;
if (!pStore->needCommit) {
return 0;
}
// TODO file name should be with a newer version
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t sysErr = errno;
const char* sysErrStr = strerror(errno);
tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname,
sysErrStr);
const char* err = strerror(errno);
tqError("vgId:%d, failed to open offset file %s, since %s", TD_VID(pStore->pTq->pVnode), fname, err);
taosMemoryFree(fname);
return -1;
}
taosMemoryFree(fname);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pStore->pHash, pIter);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
STqOffset* pOffset = (STqOffset*)pIter;
int32_t bodyLen;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
if (code < 0) {
taosHashCancelIterate(pStore->pHash, pIter);
return -1;
......@@ -166,6 +170,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
SEncoder encoder;
tEncoderInit(&encoder, abuf, bodyLen);
tEncodeSTqOffset(&encoder, pOffset);
// write file
int64_t writeLen;
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
......@@ -174,8 +179,10 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
taosMemoryFree(buf);
return -1;
}
taosMemoryFree(buf);
}
// close and rename file
taosCloseFile(&pFile);
pStore->needCommit = 0;
......
......@@ -30,7 +30,7 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
streamDataSubmitDestroy(pSubmit);
if (pRsp->blockNum > 0) {
*ppSubmit = pSubmit;
return 0;
......@@ -58,7 +58,7 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
}
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitRefDec(pSubmit);
streamDataSubmitDestroy(pSubmit);
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
if (pSubmit == NULL) break;
}
......@@ -120,7 +120,7 @@ int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHan
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone(pSubmit);
SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit);
if (pSubmitClone == NULL) {
return -1;
}
......@@ -212,28 +212,13 @@ typedef struct {
} SItem;
static void recordPushedEntry(SArray* cachedKey, void* pIter);
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq);
static void freeItem(void* param) {
SItem* p = (SItem*)param;
taosMemoryFree(p->pKey);
}
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t)taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*)pItem->pKey);
}
}
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
int32_t dataLen, SArray* pCachedKey) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
......@@ -253,7 +238,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6
if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return;
}
qStreamSetOpen(pTaskInfo);
// here start to scan submit block to extract the subscribed data
int32_t totalRows = 0;
......@@ -338,24 +323,34 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
taosWUnLockLatch(&pTq->lock);
}
// push data for stream processing
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks));
// push data for stream processing:
// 1. the vnode has already been restored.
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) {
return 0;
}
if (msgType == TDMT_VND_SUBMIT) {
#if 0
void* data = taosMemoryMalloc(len);
if (data == NULL) {
// todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId);
return -1;
}
memcpy(data, pReq, len);
SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver};
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq);
tqProcessSubmitReq(pTq, submit);
#endif
SPackedData submit = {0};
tqProcessSubmitReq(pTq, submit);
}
......@@ -367,14 +362,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
return 0;
}
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
......@@ -411,7 +399,7 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest,
return 0;
}
int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t vgId = TD_VID(pTq->pVnode);
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
......@@ -431,3 +419,26 @@ int32_t tqUnregisterPushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64
return 0;
}
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t)taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*)pItem->pKey);
}
}
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
static int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle);
static int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
// this function should be executed by stream threads.
// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure
// will not stop eventually.
int tqStreamTasksScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs();
while (1) {
tqInfo("vgId:%d continue check if data in wal are available", vgId);
// check all restore tasks
bool allFull = true;
streamTaskReplayWal(pTq->pStreamMeta, pTq->pOffsetStore, &allFull);
int32_t times = 0;
if (allFull) {
taosWLockLatch(&pMeta->lock);
pMeta->walScan -= 1;
times = pMeta->walScan;
if (pMeta->walScan <= 0) {
taosWUnLockLatch(&pMeta->lock);
break;
}
taosWUnLockLatch(&pMeta->lock);
tqInfo("vgId:%d scan wal for stream tasks for %d times", vgId, times);
}
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqInfo("vgId:%d scan wal for stream tasks completed, elapsed time:%.2f sec", vgId, el);
// restore wal scan flag
// atomic_store_8(&pTq->pStreamMeta->walScan, 0);
return 0;
}
//int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
// int32_t numOfTask = taosArrayGetSize(pTaskList);
// if (numOfTask <= 0) {
// return TSDB_CODE_SUCCESS;
// }
//
// // todo: add lock
// for (int32_t i = 0; i < numOfTask; ++i) {
// SStreamTask* pTask = taosArrayGetP(pTaskList, i);
// tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64,
// pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id);
// taosHashRemove(pStreamMeta->pWalReadTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
//
// // NOTE: do not change the following order
// atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
// taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
// }
//
// return TSDB_CODE_SUCCESS;
//}
int32_t streamTaskReplayWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetStore, bool* pScanIdle) {
void* pIter = NULL;
int32_t vgId = pStreamMeta->vgId;
*pScanIdle = true;
bool allWalChecked = true;
tqDebug("vgId:%d start to check wal to extract new submit block", vgId);
while (1) {
pIter = taosHashIterate(pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE ||
pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
continue;
}
// check if offset value exists
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
if (tInputQueueIsFull(pTask)) {
tqDebug("vgId:%d s-task:%s input queue is full, do nothing", vgId, pTask->id.idStr);
continue;
}
*pScanIdle = false;
// check if offset value exists
STqOffset* pOffset = tqOffsetRead(pOffsetStore, key);
ASSERT(pOffset != NULL);
// seek the stored version and extract data from WAL
int32_t code = walReadSeekVer(pTask->exec.pWalReader, pOffset->val.version);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
continue;
}
// append the data for the stream
tqDebug("vgId:%d wal reader seek to ver:%" PRId64 " %s", vgId, pOffset->val.version, pTask->id.idStr);
SPackedData packData = {0};
code = extractSubmitMsgFromWal(pTask->exec.pWalReader, &packData);
if (code != TSDB_CODE_SUCCESS) { // failed, continue
continue;
}
SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT);
if (p == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr);
continue;
}
allWalChecked = false;
tqDebug("s-task:%s submit data extracted from WAL", pTask->id.idStr);
code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver);
if (code == TSDB_CODE_SUCCESS) {
pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pOffset->val.version);
} else {
// do nothing
}
streamDataSubmitDestroy(p);
taosFreeQitem(p);
}
if (allWalChecked) {
*pScanIdle = true;
}
return 0;
}
此差异已折叠。
......@@ -131,7 +131,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int32_t blockSz = taosArrayGetSize(pBlocks);
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
tqDebug("vgId:%d, s-task:%s write results blocks:%d into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
void* pBuf = NULL;
SArray* tagArray = NULL;
......@@ -224,10 +224,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
STagVal tagVal = {
.cid = pTSchema->numOfCols + step,
.type = pTagData->info.type,
};
STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type};
void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) {
continue;
......
此差异已折叠。
......@@ -549,6 +549,9 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
pVnode->restored = true;
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);
// start to restore all stream tasks
tqStartStreamTasks(pVnode->pTq);
}
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
......
......@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries(
catalog
PRIVATE os util transport qcom
PRIVATE os util transport qcom nodes
)
if(${BUILD_TEST})
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册